# Process raw data to HMD - VLIZ workflow

This notebook will guide you through all the steps of using [pbp](https://github.com/mbari-org/pbp) to process raw acoustic data into calibrated HMD 1-minute resolution bands. 

## Setup

We'll start installing the required dependencies 

In [None]:
import sys

In [None]:
!{sys.executable} -m pip uninstall mbari-pbp -y

In [None]:
!{sys.executable} -m pip install git+https://github.com/lifewatch/pbp.git@rtsys_metadata
!{sys.executable} -m pip install cblind

In [None]:
# Import package modules
import xarray as xr
import dask
import pandas as pd
import time
import json
import pathlib
import yaml
from urllib.parse import urlparse
import os
import pyhydrophone as pyhy
import configparser
import cblind
import matplotlib.pyplot as plt
import pypam
import re


In [None]:
from pbp.meta_gen.gen_soundtrap import SoundTrapMetadataGenerator
from pbp.meta_gen.gen_resea import ReseaMetadataGenerator
from pbp.logging_helper import create_logger_info, create_logger

from pbp.process_helper import ProcessHelper
from pbp.file_helper import FileHelper

We will first indicate where is the json file containing all the metadata for the deployment we want to process

In [None]:
# Location for generated files:
path_to_config_file = '/home/jovyan/work/shared/data/AXOLOTL_workshop/metadata/deployments/bpns-Grafton_2022-10-28_EA-SDA14_2003001.json'

pbp uses yaml files to store metadata on the generated netCDF files - this should be adapted to each institute

In [None]:
global_attrs_uri = "/home/jovyan/work/shared/data/AXOLOTL_workshop/metadata/pbp_yaml/globalAttributes.yaml"
variable_attrs_uri = "/home/jovyan/work/shared/data/AXOLOTL_workshop/metadata/pbp_yaml/variableAttributes.yaml"

We'll load the config file information, and the corresponding instrument config file (for this it's necessary to keep the folder structure the same as in this workshop)

In [None]:
config_file = pathlib.Path(path_to_config_file)
f = open(path_to_config_file, 'r')
deployment_config = json.load(f)

instrument_file = config_file.parent.parent.joinpath('receivers', deployment_config['RECORDER_ID'] + '.json')
f_i = open(instrument_file, 'r')
instrument_config = json.load(f_i)

From the config file, set up the paths to the raw data and the output

In [None]:
# A prefix for the name of generate files:
deployment_name = deployment_config['DEPLOYMENT_NAME']

In [None]:
# Audio data input specifications
wav_uri = deployment_config['FOLDER_PATH']
# file storage location for the input audio data
wav_path = pathlib.Path(urlparse(wav_uri).path)
output_dir = config_file.parent.parent.parent.joinpath('HMD', deployment_name)

In [None]:
json_base_dir = wav_path.parent.joinpath('metadata', 'json')  # location to store generated data in JSON format
xml_dir = wav_path  # file storage location for the input audio data

serial_number = instrument_config['recorder']['serial_number']
start_date = pd.to_datetime(deployment_config['AUDIO_START_DEPLOYMENT_DATETIME']).to_pydatetime()  # start date
end_date = pd.to_datetime(deployment_config['AUDIO_END_DEPLOYMENT_DATETIME']).to_pydatetime()

print(f'We will analyze the period from {start_date} to {end_date}')

We'll read some information from the config file and populate the corresponding fields on the pbp yaml files

In [None]:
# Populate deployment-specific yaml attributes
deployment_attrs_uri = output_dir.joinpath(f'globalAttributes_{deployment_name}.yml')

yaml_config = yaml.safe_load(open(global_attrs_uri))
yaml_config['creator_name'] = deployment_config['AUTHOR_NAME']
yaml_config['creator_email'] = deployment_config['AUTHOR_EMAIL']
lon = deployment_config['location']['DEP_LON_DEG']
lat = deployment_config['location']['DEP_LAT_DEG']
yaml_config['geospatial_bounds'] = f'POINT ({lon} {lat})'
yaml_config['comment'] = deployment_config['DEPLOYMENT_COMMENTS']
yaml_config['platform'] = deployment_config['location']['MOORING_TYPE']
yaml_config['instrument'] = deployment_config['RECORDER_ID']

And we'll create the output directory and copy there the config file used 

In [None]:
if not output_dir.exists():
    print('Creating a new directory...', output_dir)
    os.mkdir(output_dir)

with open(deployment_attrs_uri, 'w') as file:
    yaml.dump(yaml_config, file)

In [None]:
voltage_multiplier = 1.0
subset_to = (10, 24000)

## Instruments

In [None]:
update_sens = True 
# this only applies for RESEA instruments!

In [None]:
if 'SOUNDTRAP' in deployment_config['RECORDER_ID']:
    hydrophone = pyhy.SoundTrap(model=instrument_config['recorder']['model'].replace('_', ' '),
                                serial_number=int(instrument_config['recorder']['serial_number']),
                                name=deployment_config['RECORDER_ID'],
                                gain_type='High')

    print('SoundTrap settings to:')
    print('sensitivity: ', hydrophone.sensitivity)
    print('Vpp: ', hydrophone.Vpp)
    print('preamp_gain: ', hydrophone.preamp_gain)
    print('gain_type: ', 'High')

    wav_prefix = [f'{serial_number}.']  # prefix for the audio files
    seconds_per_file = 300

elif 'RESEA' in deployment_config['RECORDER_ID']:
    txt_metadata_file = wav_path.parent.joinpath('metadata', 'config.txt')
    config_rec = configparser.ConfigParser()
    config_rec.read(txt_metadata_file)

    byte_per_sample = int(config_rec['Recorder']['format'].split('_')[1]) / 8
    sampling_freq = int(config_rec['Recorder']['lp_frequency'].replace('K', '000'))
    seconds_per_file = int(config_rec['Recorder']['file_max_size']) * 1E6 / (sampling_freq * byte_per_sample)

    hydrophone = pyhy.RTSys(model=instrument_config['recorder']['model'],
                            sensitivity=config_rec['Hydrophones']['cha_sh'],
                            serial_number=int(instrument_config['recorder']['serial_number'].split('_')[-1]),
                            name=deployment_config['RECORDER_ID'],
                            Vpp=5,
                            preamp_gain=0,
                            mode=config_rec['Recorder']['record_mode'])
    wav_prefix = ['channelA_', 'channelAC_']  # prefix for the audio files , 'channelA_', 'channelC_'
    update_sens = input('Should the RESEA sensitivity be updated with a wav file? '
                        'Write no if you do not want any metadata update. '
                        'Give the file you wish to use or press enter to use by default the first file.')
    if update_sens:
        wav_file = list(wav_path.glob('*/*.wav'))[0]
        print(f'Using wav file {wav_file} for sensitivity adjustment...')
        hydrophone = hydrophone.update_metadata(wav_file)
    elif not update_sens:
        pass
    else:
        raise ValueError(f'The value {update_sens} is not accepted as an answer to update the RESEA metadata')

    print('RESEA settings to:')
    print('sensitivity: ', hydrophone.sensitivity)
    print('Vpp: ', hydrophone.Vpp)
    print('preamp_gain: ', hydrophone.preamp_gain)
    print('mode: ', config_rec['Recorder']['record_mode'])

## Start the metadata generation (daily json files pointing to the each minute)

In [None]:
log = create_logger_info(deployment_config['DEPLOYMENT_NAME'])

In [None]:
if 'SOUNDTRAP' in deployment_config['RECORDER_ID']:
    meta_gen = SoundTrapMetadataGenerator(
        log=log,
        uri=wav_uri,
        json_base_dir=str(json_base_dir),
        xml_dir=str(xml_dir),
        start=start_date.date(),
        end=end_date.date(),
        prefixes=wav_prefix,
        seconds_per_file=seconds_per_file)
if 'RESEA' in deployment_config['RECORDER_ID']:
    meta_gen = ReseaMetadataGenerator(
        log=log,
        uri=wav_uri,
        json_base_dir=str(json_base_dir),
        start=start_date.date(),
        end=end_date.date(),
        prefixes=wav_prefix,
        seconds_per_file=seconds_per_file)


meta_gen.run()

## Process 

First we will define a couple of handy functions to process the data

In [None]:
def process_date(date: str, gen_netcdf: bool = True):
    """
    Main function to generate the HMB product for a given day.

    It makes use of supporting elements in PBP in terms of logging,
    file handling, and PyPAM based HMB generation.

    :param date: Date to process, in YYYYMMDD format.

    :param gen_netcdf:  Allows caller to skip the `.nc` creation here
    and instead save the datasets after all days have been generated
    (see parallel execution below).

    :return: the generated xarray dataset.
    """

    log_filename = f"{output_dir}/{deployment_name}{date}.log"

    log = create_logger(
        log_filename_and_level=(log_filename, "INFO"),
        console_level=None,
    )

    file_helper = FileHelper(
        log=log,
        json_base_dir=json_base_dir,
        gs_client=None,
        download_dir=None,
    )

    process_helper = ProcessHelper(
        log=log,
        file_helper=file_helper,
        output_dir=str(output_dir),
        output_prefix=deployment_name + '_',
        global_attrs_uri=str(deployment_attrs_uri),
        variable_attrs_uri=variable_attrs_uri,
        voltage_multiplier=voltage_multiplier,
        sensitivity_uri=None,
        sensitivity_flat_value=-float(hydrophone.sensitivity),
        subset_to=subset_to,
    )

    # now, get the HMB result:
    print(f"::: Started processing {date=}")
    result = process_helper.process_day(date)

    if gen_netcdf:
        nc_filename = f"{output_dir}/{deployment_name}{date}.nc"
        print(f":::   Ended processing {date=} =>  {nc_filename=}")
    else:
        print(f":::   Ended processing {date=} => (dataset generated in memory)")

    if result is not None:
        return result.dataset
    else:
        print(f"::: UNEXPECTED: no segments were processed for {date=}")


In [None]:
def process_multiple_dates(
        dates: list[str], gen_netcdf: bool = False
) -> list[xr.Dataset]:
    """
    Generates HMB for multiple days in parallel using Dask.
    Returns the resulting HMB datasets.

    :param dates: The dates to process, each in YYYYMMDD format.

    :param gen_netcdf:  Allows caller to skip the `.nc` creation here
    and instead save the datasets after all days have been generated.

    :return: the list of generated datasets.
    """

    @dask.delayed
    def delayed_process_date(date: str):
        return process_date(date, gen_netcdf=True)

    # To display total elapsed time at the end the processing:
    start_time = time.time()

    # This will be called by Dask when all dates have completed processing:
    def aggregate(*datasets):  # -> list[xr.Dataset]:
        elapsed_time = time.time() - start_time
        print(
            f"===> All {len(datasets)} dates completed. Elapsed time: {elapsed_time:.1f} seconds ({elapsed_time / 60:.1f} mins)"
        )
        return datasets

    # Prepare the processes:
    delayed_processes = [delayed_process_date(date) for date in dates]
    aggregation = dask.delayed(aggregate)(*delayed_processes)

    # And launch them:
    return aggregation.compute()

In [None]:
# Define all the dates to process considering start and end date
date_range = pd.date_range(start=start_date, end=end_date, freq='1D')
dates = date_range.strftime("%Y%m%d").tolist()

In [None]:
# Now, launch the generation:
print(f"Launching HMD generation for {len(dates)} {dates=}")

# Get all HMB datasets:
generated_datasets = process_multiple_dates(dates, gen_netcdf=True)
print(f"Generated datasets: {len(generated_datasets)}\n")

In [None]:
ds_10min = pypam.utils.join_all_ds_output_deployment(
    output_dir,
    data_vars=['psd'],
    datetime_coord='time',
    load=True,
    parallel=False,
    freq_band=None,
    freq_coord='frequency',
    engine='netcdf4'
)
ds = ds_10min.resample(time='1h').median()

## Clustering

In [None]:
!{sys.executable} -m pip install umap-learn

In [None]:
import umap
import pandas as pd
import seaborn as sns

In [None]:
df = ds_10min['psd'].to_pandas()
frequency_columns = df.columns

In [None]:
df['embedding'] = df[frequency_columns].values.tolist()
df['month'] = df.index.month 
df['hour'] = df.index.hour

In [None]:
df = df.dropna()

umap_box = umap.UMAP(n_components=2, n_neighbors=20, min_dist=0.05)
umap_box.fit(df[frequency_columns].values)
embedding = umap_box.transform(df[frequency_columns])



In [None]:
ax = sns.scatterplot(x=embedding[:, 0], y=embedding[:, 1],
                     s=8, alpha=0.9, hue=df['hour'],
                     legend=True)
ax.set_xlabel('UMAP x')
ax.set_ylabel('UMAP y')
ax.set_facecolor('white')
plt.show()


## Plot the results

In [None]:
cmap = plt.get_cmap('cb.solstice')
ds_10min = pypam.utils.join_all_ds_output_deployment(output_dir,
                                               data_vars=['psd'],
                                               datetime_coord='time',
                                               load=True,
                                               parallel=False,
                                               freq_band=None,
                                               freq_coord='frequency',
                                               time_resample='10min'
                                           )

ds = ds_10min.resample(time='1h').median()

In [None]:
import datetime
cmap = plt.get_cmap('cb.solstice')

plots_folder = output_dir.parent.parent.joinpath('plots', config_file.name.replace('.json', ''))
if not plots_folder.exists():
    os.mkdir(plots_folder)

start_plot = pd.to_datetime(datetime.datetime(2022, 10, 30, 13, 0, 0))
end_plot = pd.to_datetime(datetime.datetime(2022, 10, 30, 15, 0, 0))
ds = ds.where((ds.time > start_plot)  & (ds.time < end_plot), drop=True) 
# Plot LTSA
fig, ax = plt.subplots(figsize=(15, 7))
pypam.plots.plot_2d(ds=ds['psd'], x='time', y='frequency', cmap=cmap, vmin=40, vmax=110,
                    cbar_label=r'%s [$%s$]' % (re.sub('_', ' ', ds['psd'].standard_name).title(), ds['psd'].units),
                    xlabel='Time', ylabel='Frequency [Hz]', title='Long Term Spectrogram', ylog=True, ax=ax)
ax.set_ylim(bottom=50)


In [None]:
# Plot SPD
percentiles = [1, 10, 50, 90, 99]
min_psd = 35  # in db
max_psd = 135  # in db
h = 1  # in db
ds_spd = pypam.utils.compute_spd(ds, data_var='psd', percentiles=percentiles, min_val=min_psd,
                                 max_val=max_psd, h=h)

pypam.plots.plot_spd(ds_spd, log=True, save_path=plots_folder.joinpath('SPD.png'))

In [None]:
# Plot the SPL of each band for quality check
selected_bands = [[10, 1000], [1000, 10000], [10, float(ds.frequency.max().values)]]

fig, ax = plt.subplots(figsize=(15, 7))
for band in selected_bands:
    ds_freq = ds.where((ds.frequency >= band[0]) & (ds.frequency <= band[1]), drop=True)
    spl = ds_freq.mean(dim='frequency')
    spl.psd.plot(x='time', ax=ax, label=str(band))
plt.xlabel('UTC time 10 min aggregation')
plt.legend()
plt.show()