# Exercising PBP/PyPAM on NRS11 data

The main steps in this notebook are:

- Check PBP package
- Do preparations in terms of working space for downloaded and generated files
- Generate HMB for a single day
- Generate HMB for multiple days in parallel using Dask

## Preparations

In [12]:
# Basic check that the PBP package is in place:
# (This package brings in all associated dependencies)
import pbp
print(f"PBP version   : {pbp.get_pbp_version()}")
print(f"PyPAM version : {pbp.get_pypam_version()}")

PBP version   : 1.0.10
PyPAM version : 0.3.0


## Code imports

In [13]:
from pbp.process_helper import ProcessHelper
from pbp.file_helper import FileHelper
from pbp.logging_helper import create_logger
from pbp import get_pbp_version

from google.cloud.storage import Client as GsClient

import xarray as xr
import dask
import pandas as pd
import time
import sys


## Some parameters for PBP

## Inputs

In [14]:
# TODO: update this when we incorporate the JSON generation
json_base_dir        = 'NRS11/noaa-passive-bioacoustic_nrs_11_2019-2021'

global_attrs_uri     = 'NRS11/INPUT/globalAttributes_NRS11.yaml'
variable_attrs_uri   = 'NRS11/INPUT/variableAttributes_NRS11.yaml'

voltage_multiplier   = 2.5
sensitivity_uri      = 'NRS11/INPUT/NRS11_H5R6_sensitivity_hms5kHz.nc'
subset_to            = (10, 2_000)

## Outputs

In [15]:
# Downloaded files are stored here while being processed:
download_dir         = 'NRS11/DOWNLOADS'

# Location for generated files:
output_dir           = 'NRS11/OUTPUT'
# A prefix for the name of generate files:
output_prefix         = 'NRS11_'


# Supporting functions

PBP includes these two main modules that we will be using below:

- `FileHelper`: Facilitates input file reading. It supports reading local files as well as from GCP (`gs://` URIs) and AWS (`s3://` URIs).
- `ProcessHelper`: The main processing module.

We first define a function that takes care of HMB generation for a given date.

Based on that function, we then define one other function to dispatch multiple dates in parallel.


## A function to process a given day

Supported by those PBP modules, we define a function that takes care of processing a given day:

In [16]:
def process_date(date: str, gen_netcdf: bool = True):
    """
    Main function to generate the HMB product for a given day.

    It makes use of supporting elements in PBP in terms of logging,
    file handling, and PyPAM based HMB generation.

    :param date: Date to process, in YYYYMMDD format.

    :param gen_netcdf:  Allows caller to skip the `.nc` creation here
    and instead save the datasets after all days have been generated
    (see parallel execution below).

    :return: the generated xarray dataset.
    """

    log_filename = f"{output_dir}/{output_prefix}{date}.log"

    log = create_logger(
        log_filename_and_level=(log_filename, "INFO"),
        console_level=None,
    )

    # we are only downloading publicly accessible datasets:
    gs_client = GsClient.create_anonymous_client()

    file_helper = FileHelper(
        log=log,
        json_base_dir=json_base_dir,
        gs_client=gs_client,
        download_dir=download_dir,
    )

    process_helper = ProcessHelper(
        log=log,
        file_helper=file_helper,
        output_dir=output_dir,
        output_prefix=output_prefix,
        gen_netcdf=gen_netcdf,
        global_attrs_uri=global_attrs_uri,
        variable_attrs_uri=variable_attrs_uri,
        voltage_multiplier=voltage_multiplier,
        sensitivity_uri=sensitivity_uri,
        subset_to=subset_to,
    )

    ## now, get the HMB result:
    print(f'::: Started processing {date=}')
    result = process_helper.process_day(date)

    if gen_netcdf:
        nc_filename = f"{output_dir}/{output_prefix}{date}.nc"
        print(f':::   Ended processing {date=} =>  {nc_filename=}')
    else:
        print(f':::   Ended processing {date=} => (dataset generated in memory)')

    if result is not None:
        return result.dataset
    else:
        print(f'::: UNEXPECTED: no segments were processed for {date=}')

## A function to process multiple days

We use [Dask](https://examples.dask.org/delayed.html) to dispatch, in parallel, multiple instances of the `process_date` function defined above.

In [17]:
def process_multiple_dates(dates: list[str], gen_netcdf: bool = False) -> list[xr.Dataset]:
    """
    Generates HMB for multiple days in parallel using Dask.
    Returns the resulting HMB datasets.

    :param dates: The dates to process, each in YYYYMMDD format.

    :param gen_netcdf:  Allows caller to skip the `.nc` creation here
    and instead save the datasets after all days have been generated.

    :return: the list of generated datasets.
    """

    @dask.delayed
    def delayed_process_date(date: str):
        return process_date(date, gen_netcdf=gen_netcdf)

    ## To display total elapsed time at the end the processing:
    start_time = time.time()

    ## This will be called by Dask when all dates have completed processing:
    def aggregate(*datasets) -> list[xr.Dataset]:
        elapsed_time = time.time() - start_time
        print(f'===> All {len(datasets)} dates completed. Elapsed time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} mins)')
        return datasets


    ## Prepare the processes:
    delayed_processes = [delayed_process_date(date) for date in dates]
    aggregation = dask.delayed(aggregate)(*delayed_processes)

    ## And launch them:
    return aggregation.compute()


# Generating the HMB products

## Processing a single day

In general, we are more interested in processing multiple dates, but we could process a single date by just calling `process_date` directly:

In [18]:
## Just uncomment the following line:
# process_date('20200101')

## Processing multiple days

We use the `process_multiple_dates` defined above to launch the generation of multiple HMB datasets in parallel.

**NOTE**:
- Included JSON files in the current PBP image only cover Jan 01–31, 2020.
- Such JSON files could alternatively be located in external buckets.

In [19]:
## Here, we set `dates` as the list of 'YYYYMMDD' dates we want to process:
## Let's use pandas to help generate the list:
date_range = pd.date_range(start='2020-01-01', end='2020-01-10')
dates = date_range.strftime('%Y%m%d').tolist()
dates

['20200101',
 '20200102',
 '20200103',
 '20200104',
 '20200105',
 '20200106',
 '20200107',
 '20200108',
 '20200109',
 '20200110']

In [20]:
## Now, launch the generation:

print(f"Launching HMB generation for {len(dates)} {dates=}")

## NOTE: due to issues observed when concurrently saving the resulting netCDF files,
## this flag allows to postpone the saving for after all datasets have been generated:
gen_netcdf = False   # True for each process to save its generated file.

## Get all HMB datasets:
generated_datasets = process_multiple_dates(dates, gen_netcdf=gen_netcdf)

print(f'Generated datasets: {len(generated_datasets)}\n')

if gen_netcdf:
    print('DONE. Datasets should have been saved already.')
else:
    # Do the file saving here:
    print('Saving generated datasets...')
    for date, ds in zip(dates, generated_datasets):
        nc_filename = f'{output_dir}/{output_prefix}{date}.nc'
        print(f'  - Saving {nc_filename=}')
        try:
            ds.to_netcdf(nc_filename,
                         engine="h5netcdf",
                         encoding={
                            "effort": {"_FillValue": None},
                            "frequency": {"_FillValue": None},
                            "sensitivity": {"_FillValue": None},
                         },
            )
        except Exception as e:  # pylint: disable=broad-exception-caught
            print(f"         Unable to save {nc_filename}: {e}")


Launching HMB generation for 10 dates=['20200101', '20200102', '20200103', '20200104', '20200105', '20200106', '20200107', '20200108', '20200109', '20200110']
::: Started processing date='20200104'
::: Started processing date='20200107'
::: Started processing date='20200103'
::: Started processing date='20200108'
::: Started processing date='20200106'
::: Started processing date='20200110'
::: Started processing date='20200109'
:::   Ended processing date='20200109' => (dataset generated in memory)
:::   Ended processing date='20200104' => (dataset generated in memory)
:::   Ended processing date='20200108' => (dataset generated in memory)
:::   Ended processing date='20200103' => (dataset generated in memory)
:::   Ended processing date='20200106' => (dataset generated in memory)


AttributeError: 'SoundStatus' object has no attribute 'sound_file'

:::   Ended processing date='20200110' => (dataset generated in memory)
:::   Ended processing date='20200107' => (dataset generated in memory)


In [21]:
!ls -l NRS11/OUTPUT

total 84
-rw-r--r-- 1 jovyan users 8585 Jul 18 21:14 NRS11_20200101.log
-rw-r--r-- 1 jovyan users 4699 Jul 18 21:14 NRS11_20200102.log
-rw-r--r-- 1 jovyan users 4699 Jul 18 21:14 NRS11_20200103.log
-rw-r--r-- 1 jovyan users 4699 Jul 18 21:14 NRS11_20200104.log
-rw-r--r-- 1 jovyan users 4699 Jul 18 21:14 NRS11_20200105.log
-rw-r--r-- 1 jovyan users 4699 Jul 18 21:14 NRS11_20200106.log
-rw-r--r-- 1 jovyan users 4699 Jul 18 21:14 NRS11_20200107.log
-rw-r--r-- 1 jovyan users 4699 Jul 18 21:14 NRS11_20200108.log
-rw-r--r-- 1 jovyan users 4699 Jul 18 21:14 NRS11_20200109.log
-rw-r--r-- 1 jovyan users 4699 Jul 18 21:14 NRS11_20200110.log
