## Exercising PBP - PyPAM Based Processing

PBP repo: https://github.com/mbari-org/pypam-based-processing

In short, the steps in this notebook are:

- Clone PBP to support the HMB generation
- Install dependencies, including PyPAM
- Do preparations in terms of working space for downloaded and generated files
- Generate HMB for a single day
- Generate HMB for multiple days in parallel using Dask

In [1]:
!pwd

/home/jovyan/mbari


## Code preparations

### PBP clone

In [3]:
## If not already cloned:
# !git clone https://github.com/mbari-org/pypam-based-processing.git
# %cd pypam-based-processing/


In [2]:
## Else:
%cd pypam-based-processing/
# !git pull

/home/jovyan/mbari/pypam-based-processing
remote: Enumerating objects: 1, done.[K
remote: Counting objects: 100% (1/1), done.[K
remote: Total 1 (delta 1), reused 1 (delta 1), pack-reused 0[K
Unpacking objects: 100% (1/1), 110 bytes | 110.00 KiB/s, done.
From https://github.com/mbari-org/pypam-based-processing
   880b6ad..19c008d  2023-10-18/change_logging -> origin/2023-10-18/change_logging
Already up to date.


In [3]:
# !git checkout main
# !git pull origin main

In [4]:
# !git status
!git log -1

[33mcommit 53a55c57093cfeb0fd7bcea91527fc2079252650[m[33m ([m[1;36mHEAD -> [m[1;32mmain[m[33m, [m[1;31morigin/main[m[33m, [m[1;31morigin/HEAD[m[33m)[m
Author: Carlos Rueda <carueda@mbari.org>
Date:   Thu Oct 19 10:26:26 2023 -0700

    refactor logging handling
    
    and use unique logger names, which helps in environments where multiple runs of the same day may be performed


### Install requirements

In [None]:
!pip install -r requirements.txt --no-cache-dir
!pip install --no-cache-dir git+https://github.com/lifewatch/pypam.git

### Basic tests

In [6]:
## PBP unit tests (to verify basic functionality)
!python -m pytest 

platform linux -- Python 3.10.6, pytest-7.4.0, pluggy-1.3.0
rootdir: /home/jovyan/mbari/pypam-based-processing
plugins: anyio-3.6.1, syrupy-4.0.6, cov-4.1.0
collected 9 items                                                              [0m[1m

tests/test_file_helper.py [32m.[0m[32m                                              [ 11%][0m
tests/test_json_support.py [32m.[0m[32m.[0m[32m.[0m[32m                                           [ 44%][0m
tests/test_metadata.py [32m.[0m[32m.[0m[32m.[0m[32m                                               [ 77%][0m
tests/test_misc.py [32m.[0m[32m.[0m[32m                                                    [100%][0m

--------------------------- snapshot report summary ----------------------------
[1m[38;5;2m9[0m[0m snapshots passed.


## Workspace preparations

In [7]:
## Create workspace:
!mkdir -p NB_SPACE/JSON/2022
!mkdir -p NB_SPACE/DOWNLOADS
!mkdir -p NB_SPACE/OUTPUT


In [8]:
## Manually upload the JSON files for the desired days (NB_SPACE/JSON/2022/)

## If `aws` is available:
#!aws s3 sync --no-sign-request s3://pacific-sound-metadata/mb05/2022/ NB_SPACE/JSON/2022/


## Imports

In [9]:
import logging
import os
import sys
sys.path = ['.'] + sys.path
from src.process_helper import ProcessHelper
from src.file_helper import FileHelper
from src.logging_helper import create_logger

import xarray as xr
import numpy as np
import dask
import time

## NOTE: these ones only temporarily while we get the necessary files in place
import boto3
from botocore import UNSIGNED
from botocore.client import Config


## A function to process a given day

In [10]:
def process_date(date: str):
    """
    Main function to generate the HMB product for a given day.
    
    It makes use of supporting elements in PBP in terms of logging,
    file handling, and PyPAM based HMB generation.

    :param date: Date to process in YYYYMMDD format.
    :return: the generated xarray dataset.
    """
    
    ## Definitions below to be consistent with our workspace preparations.

    output_dir             = 'NB_SPACE/OUTPUT'
    output_prefix          = 'MB05_'

    log_filename = f"{output_dir}/{output_prefix}{date}.log"

    logger = create_logger(
        log_filename_and_level=(log_filename, logging.INFO),
        console_level=None,  # logging.INFO,
    )

    ## Note: we use S3 URIS and boto as general mechanism to get our files from AWS.
    ## In this notebook, we have already downloaded the necessary files for the demonstration.
    ## The settings below allow us to still continue using the original S3 URIs without
    ## triggering any new downloads.
    s3_client = boto3.client("s3", config=Config(signature_version=UNSIGNED))
    ## Note: in our general cloud-based setup, downloaded files are removed after been used.
    ## The following allows us to keep the downloaded files:
    os.environ["ASSUME_DOWNLOADED_FILES"] = "yes"
    os.environ["REMOVE_DOWNLOADED_FILES"] = "no"

    file_helper = FileHelper(
        logger=logger,
        json_base_dir     = 'NB_SPACE/JSON',
        s3_client         = s3_client,
        download_dir      = 'NB_SPACE/DOWNLOADS',
    )

    process_helper = ProcessHelper(
        logger=logger,
        file_helper=file_helper,
        output_dir             = output_dir,
        output_prefix          = output_prefix,
        global_attrs_uri       = 'metadata/mb05/globalAttributes_MB05.yaml',
        variable_attrs_uri     = 'metadata/mb05/variableAttributes_MB05.yaml',
        voltage_multiplier     = 1,
        sensitivity_flat_value = 176,
        subset_to              = (10, 24_000),
        # max_segments=1, ## NOTE: this one only used while testing the notebook! -- TO BE REMOVED
    )

    ## For reporting purposes below (we know this will be the generated NetCDF path)
    nc_filename = f"{output_dir}/{output_prefix}{date}.nc"

    ## now, get the HMB result:
    print(f'::: Started processing {date=}    {log_filename=}')
    result = process_helper.process_day(date)
    print(f':::   Ended processing {date=} =>  {nc_filename=}')

    if result is not None:
        return result.dataset
    else:
        print(f'::: UNEXPECTED: no segments were processed for {date=}')

## Generating the HMB products

### Processing a day

In [11]:
start_time = time.time()
generated_dataset = process_date('20220812')
elapsed_time = time.time() - start_time
print(f'===> date completed. Elapsed time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} mins)')
generated_dataset

::: Started processing date='20220812'    log_filename='NB_SPACE/OUTPUT/MB05_20220812.log'
:::   Ended processing date='20220812' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220812.nc'
===> date completed. Elapsed time: 229.6 seconds (3.8 mins)


### Prepare process_date for parallel execution

Let's Dask to dispatch multiple instances of `process_date` in parallel.

In [12]:
print(f'CPU info: {os.cpu_count()=}  {len(os.sched_getaffinity(0))=}')

CPU info: os.cpu_count()=80  len(os.sched_getaffinity(0))=80


In [13]:
def process_multiple_dates(dates: list[str]) -> list[xr.Dataset]:
    """
    Generates HMB for multiple days in parallel using Dask.
    Returns the resulting HMB datasets.
    """
    
    @dask.delayed
    def delayed_process_date(date: str):
        return process_date(date)

    ## To display total elapsed time at the end the processing:
    start_time = time.time()

    ## This will be called when all dates have completed:
    def aggregate(*datasets) -> list[xr.Dataset]:
        elapsed_time = time.time() - start_time
        print(f'===> All dates completed. Elapsed time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} mins)')
        return datasets


    ## Prepare the processes:
    delayed_processes = [delayed_process_date(date) for date in dates]
    aggregation = dask.delayed(aggregate)(*delayed_processes)

    ## And launch them:
    return aggregation.compute()


### Processing multiple days

In [14]:
## The dates for this demo:
dates = ['20220812', '20220813','20220814', '20220815', '20220816']

generated_datasets = process_multiple_dates(dates)

print(f'Generated datasets: {len(generated_datasets)}')


::: Started processing date='20220815'    log_filename='NB_SPACE/OUTPUT/MB05_20220815.log'
::: Started processing date='20220814'    log_filename='NB_SPACE/OUTPUT/MB05_20220814.log'
::: Started processing date='20220816'    log_filename='NB_SPACE/OUTPUT/MB05_20220816.log'
::: Started processing date='20220812'    log_filename='NB_SPACE/OUTPUT/MB05_20220812.log'
::: Started processing date='20220813'    log_filename='NB_SPACE/OUTPUT/MB05_20220813.log'
:::   Ended processing date='20220812' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220812.nc'
:::   Ended processing date='20220814' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220814.nc'
:::   Ended processing date='20220815' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220815.nc'
:::   Ended processing date='20220816' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220816.nc'
:::   Ended processing date='20220813' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220813.nc'
===> All dates completed. Elapsed time: 269.7 seconds (4.5 mins)
Generated datasets: 5


# NOTE: A "quick" test with 10 days to see performance scalability

Not repeating the exercise below, but we got about 40 secs per day:
```
===> All dates completed. Elapsed time: 402.5 seconds (6.7 mins)
Generated datasets: 10
```

In [None]:
## 10 days:  20220812 .. 20220821
datetimes = np.arange('2022-08-12', '2022-08-22', dtype='datetime64[D]')
dates = [str(d).replace('-', '') for d in datetimes]

generated_datasets = process_multiple_dates(dates)

print(f'Generated datasets: {len(generated_datasets)}')
