## Exercising PBP - PyPAM Based Processing

**NOTE**: WIP

PBP repo: https://github.com/mbari-org/pypam-based-processing

In short, the main steps in this notebook are:

- Clone PBP to support the HMB generation
- Install dependencies, including PyPAM
- Do preparations in terms of working space for downloaded and generated files
- Generate HMB for a single day
- Generate HMB for multiple days in parallel using Dask

## Basic checks

In [3]:
!pwd

/home/jovyan/mbari


In [4]:
!python --version

Python 3.10.6


## Code preparations

### PBP clone

In [5]:
## If PBP has not been cloned already:
# !git clone https://github.com/mbari-org/pypam-based-processing.git
# %cd pypam-based-processing/

## Else:
%cd pypam-based-processing/
# !git pull

## In any case, we want to be located in the pypam-based-processing folder.
!pwd

/home/jovyan/mbari/pypam-based-processing
/home/jovyan/mbari/pypam-based-processing


### Install requirements

In [6]:
!pip install -r requirements.txt --no-cache-dir
!pip install --no-cache-dir git+https://github.com/lifewatch/pypam.git

Collecting git+https://github.com/lifewatch/pypam.git
  Cloning https://github.com/lifewatch/pypam.git to /tmp/pip-req-build-2o6meuwg
  Running command git clone --filter=blob:none --quiet https://github.com/lifewatch/pypam.git /tmp/pip-req-build-2o6meuwg
  Resolved https://github.com/lifewatch/pypam.git to commit 7c6423aeea77b9b1d8b4299c31cb1fe485d0b731
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Collecting cryptography<42.0.0,>=41.0.2 (from lifewatch-pypam==0.2.1)
  Downloading cryptography-41.0.4-cp37-abi3-manylinux_2_28_x86_64.whl.metadata (5.2 kB)
Downloading cryptography-41.0.4-cp37-abi3-manylinux_2_28_x86_64.whl (4.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.4/4.4 MB[0m [31m58.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: cryptography
  Attempting uninstall: cryptography
    Found existing in

In [9]:
!python -m pytest 

platform linux -- Python 3.10.6, pytest-7.4.0, pluggy-1.3.0
rootdir: /home/jovyan/mbari/pypam-based-processing
plugins: anyio-3.6.1, syrupy-4.0.6, cov-4.1.0
collected 8 items / 1 error                                                    [0m[1m

[31m[1m__________________ ERROR collecting tests/test_file_helper.py __________________[0m
[1m[31m/opt/conda/lib/python3.10/site-packages/_pytest/runner.py[0m:341: in from_call
    result: Optional[TResult] = func()
[1m[31m/opt/conda/lib/python3.10/site-packages/_pytest/runner.py[0m:372: in <lambda>
    call = CallInfo.from_call([94mlambda[39;49;00m: [96mlist[39;49;00m(collector.collect()), [33m"[39;49;00m[33mcollect[39;49;00m[33m"[39;49;00m)
[1m[31m/opt/conda/lib/python3.10/site-packages/_pytest/python.py[0m:531: in collect
    [96mself[39;49;00m._inject_setup_module_fixture()
[1m[31m/opt/conda/lib/python3.10/site-packages/_pytest/python.py[0m:545: in _inject_setup_module_fixture
    [96mself[39;49;00m.obj, ([33m"

## Workspace preparations

In [13]:
## Our input files for the demo were copied and located here:
!ls -l ~/shared/readonly/data/mbari/pypam-based-processing/NB_SPACE/
!echo "HOME=$HOME"

total 8
drwxr-x--- 2 root users 4096 Oct 20 12:17 DOWNLOADS
drwxr-x--- 3 root users   18 Oct 20 12:11 JSON
drwxr-x--- 3 root users 4096 Oct 20 12:17 OUTPUT
HOME=/home/jovyan


In [7]:
## so, a convenient definition we will use to instruct PBP where to get the input files from:
NB_SPACE = '/home/jovyan/shared/readonly/data/mbari/pypam-based-processing/NB_SPACE'

## Imports

In [14]:
import logging
import os
import sys
import xarray as xr
import numpy as np
import dask
import time

In [1]:
sys.path = ['.'] + sys.path
from src.process_helper import ProcessHelper
from src.file_helper import FileHelper
from src.logging_helper import create_logger



NameError: name 'sys' is not defined

In [None]:


## NOTE: The needed files are already downloaded for this demo,
## but still use the associated S3 URIs.
import boto3
from botocore import UNSIGNED
from botocore.client import Config


## A function to process a given day

In [22]:
def process_date(date: str):
    """
    Main function to generate the HMB product for a given day.

    It makes use of supporting elements in PBP in terms of logging,
    file handling, and PyPAM based HMB generation.

    :param date: Date to process in YYYYMMDD format.
    :return: the generated xarray dataset.
    """

    output_dir             = f'NB_OUTPUT'
    output_prefix          = 'MB05_'

    log_filename = f"{output_dir}/{output_prefix}{date}.log"

    logger = create_logger(
        log_filename_and_level=(log_filename, logging.INFO),
        console_level=None,  # logging.INFO,
    )

    ## Note: we use S3 URIS and boto as general mechanism to get our files from AWS.
    ## In this notebook, we have already downloaded the necessary files for the demonstration.
    ## The settings below allow us to still continue using the original S3 URIs without
    ## triggering any new downloads.
    s3_client = boto3.client("s3", config=Config(signature_version=UNSIGNED))
    ## In our general cloud-based setup, downloaded files are removed after been used.
    ## The following allows us to keep the downloaded files:
    os.environ["ASSUME_DOWNLOADED_FILES"] = "yes"
    os.environ["REMOVE_DOWNLOADED_FILES"] = "no"

    file_helper = FileHelper(
        logger=logger,
        json_base_dir     = f'{NB_SPACE}/JSON',
        s3_client         = s3_client,
        download_dir      = f'{NB_SPACE}/DOWNLOADS',
    )

    process_helper = ProcessHelper(
        logger=logger,
        file_helper=file_helper,
        output_dir             = output_dir,
        output_prefix          = output_prefix,
        global_attrs_uri       = 'metadata/mb05/globalAttributes_MB05.yaml',
        variable_attrs_uri     = 'metadata/mb05/variableAttributes_MB05.yaml',
        voltage_multiplier     = 1,
        sensitivity_flat_value = 176,
        subset_to              = (10, 24_000),
    )

    ## For reporting purposes below (we know this will be the generated NetCDF path)
    nc_filename = f"{output_dir}/{output_prefix}{date}.nc"

    ## now, get the HMB result:
    print(f'::: Started processing {date=}    {log_filename=}')
    result = process_helper.process_day(date)
    print(f':::   Ended processing {date=} =>  {nc_filename=}')

    if result is not None:
        return result.dataset
    else:
        print(f'::: UNEXPECTED: no segments were processed for {date=}')

## Generating the HMB products

### Processing a day

In [None]:
start_time = time.time()

generated_dataset = process_date('20220812')

elapsed_time = time.time() - start_time
print(f'===> date completed. Elapsed time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} mins)')
generated_dataset

### Prepare process_date for parallel execution

Let's use Dask to dispatch multiple instances of `process_date` in parallel.

In [13]:
print(f'CPU info: {os.cpu_count()=}  {len(os.sched_getaffinity(0))=}')

CPU info: os.cpu_count()=80  len(os.sched_getaffinity(0))=80


In [15]:
def process_multiple_dates(dates: list[str]) -> list[xr.Dataset]:
    """
    Generates HMB for multiple days in parallel using Dask.
    Returns the resulting HMB datasets.
    """

    @dask.delayed
    def delayed_process_date(date: str):
        return process_date(date)

    ## To display total elapsed time at the end the processing:
    start_time = time.time()

    ## This will be called by Dask when all dates have completed processing:
    def aggregate(*datasets) -> list[xr.Dataset]:
        elapsed_time = time.time() - start_time
        print(f'===> All {len(datasets)} dates completed. Elapsed time: {elapsed_time:.1f} seconds ({elapsed_time/60:.1f} mins)')
        return datasets


    ## Prepare the processes:
    delayed_processes = [delayed_process_date(date) for date in dates]
    aggregation = dask.delayed(aggregate)(*delayed_processes)

    ## And launch them:
    return aggregation.compute()


### Processing multiple days

In [11]:
## The dates for this demo:
dates = ['20220812', '20220813','20220814', '20220815', '20220816']

generated_datasets = process_multiple_dates(dates)

print(f'Generated datasets: {len(generated_datasets)}')


::: Started processing date='20220815'    log_filename='NB_SPACE/OUTPUT/MB05_20220815.log'
::: Started processing date='20220812'    log_filename='NB_SPACE/OUTPUT/MB05_20220812.log'
::: Started processing date='20220816'    log_filename='NB_SPACE/OUTPUT/MB05_20220816.log'
::: Started processing date='20220813'    log_filename='NB_SPACE/OUTPUT/MB05_20220813.log'
::: Started processing date='20220814'    log_filename='NB_SPACE/OUTPUT/MB05_20220814.log'
:::   Ended processing date='20220814' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220814.nc'
:::   Ended processing date='20220812' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220812.nc'
:::   Ended processing date='20220815' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220815.nc'
:::   Ended processing date='20220813' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220813.nc'
:::   Ended processing date='20220816' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220816.nc'
===> All dates completed. Elapsed time: 271.3 seconds (4.5 mins)
Generated datasets: 5


# NOTE: A "quick" test with 10 days to see performance scalability

Not repeating the exercise below, but we got about 40 secs per day:
```
===> All dates completed. Elapsed time: 402.5 seconds (6.7 mins)
Generated datasets: 10
```

In [14]:
## 10 days:  20220812 .. 20220821
datetimes = np.arange('2022-08-12', '2022-08-22', dtype='datetime64[D]')
dates = [str(d).replace('-', '') for d in datetimes]

generated_datasets = process_multiple_dates(dates)

print(f'Generated datasets: {len(generated_datasets)}')


::: Started processing date='20220819'    log_filename='NB_SPACE/OUTPUT/MB05_20220819.log'
::: Started processing date='20220813'    log_filename='NB_SPACE/OUTPUT/MB05_20220813.log'
::: Started processing date='20220818'    log_filename='NB_SPACE/OUTPUT/MB05_20220818.log'
::: Started processing date='20220812'    log_filename='NB_SPACE/OUTPUT/MB05_20220812.log'
::: Started processing date='20220821'    log_filename='NB_SPACE/OUTPUT/MB05_20220821.log'
::: Started processing date='20220814'    log_filename='NB_SPACE/OUTPUT/MB05_20220814.log'
::: Started processing date='20220817'    log_filename='NB_SPACE/OUTPUT/MB05_20220817.log'::: Started processing date='20220816'    log_filename='NB_SPACE/OUTPUT/MB05_20220816.log'

::: Started processing date='20220815'    log_filename='NB_SPACE/OUTPUT/MB05_20220815.log'
::: Started processing date='20220820'    log_filename='NB_SPACE/OUTPUT/MB05_20220820.log'
:::   Ended processing date='20220821' =>  nc_filename='NB_SPACE/OUTPUT/MB05_20220821.nc'
