# Orcasound Pegasus Workflow

The [Ocean Observatories Initiative (OOI)](https://oceanobservatories.org/), through a network of sensors, supports critical research in ocean science and marine life. [Orcasound](https://www.orcasound.net/) is a community driven project that leverages hydrophone sensors deployed in **three locations** in the state of **Washington** (San Juan Island, Point Bush, and Port Townsend as shown in the figure below) in order to study Orca whales in the Pacific Northwest region.

Throughout the course of this project, code to process and analyze the hydrophone data has been developed, and machine learning models have been trained to automatically identify the whistles of the Orcas. All of the code is available publicly on GitHub, and the hydrophone data are free to access, stored in an **AWS S3** bucket. In this paper, we have developed an Orcasound workflow using Pegasus. This version of the pipeline is based on the [Orcasound GitHub actions](https://github.com/orcasound/orca-action-workflow) workflow, and incorporates inference components of the [OrcaHello AI](https://github.com/orcasound/aifororcas-livesystem) notification system.

The workflow processes the hydrophone data of one or more sensors in batches for each timestamp, and converts them to a WAV format. Using the WAV output it creates spectrogram images that are stored in the final output location. Furthermore, using the pretrained Orcasound model, the workflow scans the WAV files to identify potential sounds produced by the orcas. These predictions are merged into a JSON file for each sensor, and if data from more than one sensor are being processed, the workflow will create a final merged JSON output for all. In our experiments, we used data from a single hydrophone sensor over the span of a day.

<br>
<img src="images/orca_sound_sensors.png" style="width: 400px;"/>
<br>

**Machine Learning steps in the workflow :**
<br>
<img src="images/ml_steps2.png" style="width: 500px;"/>
<br>

## Containers
All tools required to execute the jobs are all included in two containers available on Dockerhub : 
<br>[Orcasound Container](https://hub.docker.com/r/papajim/orcasound-processing) which runs on python defined in `Docker/Orca_Dockerfile` with the basic tools - 
* awscli
* matplotlib
* scipy
* m3u8

<br>[Orcasound ML Processing Container](https://hub.docker.com/r/papajim/orcasound-ml-processing) which runs on python and uses some additional machine learning libraries (including the aforementioned ones) defined in `Docker/Orca_ML_Dockerfile` as -
* scikit-learn
* torch
* torchvision
* pytorchtools
* torch-summary
* librosa


## Accessing the Input Data
The hydrophone data recordings are free to access and reside in an **AWS S3** bucket, thus no setup is required regarding it.


## Workflow
The workflow processes and analyzes the hydrophone data and uses trained machine learning models to automatically identify the whistles of the Orcas.

![Pegasus Orcasound Workflow graph](images/orcasound-workflow.png)

<br>The descriptions for various jobs in the worklfow are listed in a table below

| Job Label                 | Description                                      |
| --------------------------|--------------------------------------------------|
| convert2wav               | converts the input hydrophone data to WAV format |
| convert2spectrogram       | converts the WAV output to spectrogram images    |
| inference                 | identifies the sound using a pretrained ML model |
| merge predictions         | merges the predictions from all sensors          |


## 1. Create the Orcasound Workflow

By now, you have a good idea about the Pegasus Workflow API.
We now create the workflow for the Orcasound prediction based on the picture above

First step is to update your AWS account username, and then enter the range of dates which will be used
to extract the hydrophone data from the main public database from S3 bucket

In [None]:
# update the sensor names in list format, choices from : ["rpi_bush_point","rpi_port_townsend","rpi_orcasound_lab"]
SENSORS = ['rpi_bush_point']

# update the start and end dates regarding the data to be extracted from sensors
START_DATE = '2021-08-10'
# default end date is set as +1 day
END_DATE = None

**Note:** End date from the dataset is set to be limited upto a maximum of 7 days

In [None]:
import os
import sys
import logging
import tarfile
import requests
import numpy as np
import pandas as pd
from pathlib import Path
from argparse import ArgumentParser
from datetime import datetime
from datetime import timedelta

# --- Import Pegasus API -----------------------------------------------------------
from Pegasus.api import *
logging.basicConfig(level=logging.DEBUG)

# --- Main workflow class ----------------------------------------------------------
class OrcasoundWorkflow():
    wf = None
    sc = None
    tc = None
    rc = None
    props = None

    dagfile = None
    wf_dir = None
    shared_scratch_dir = None
    local_storage_dir = None
    wf_name = "orcasound"
    
    #data details in order to pull data from public s3 bucket
    s3_cache = None
    s3_files = None
    s3_bucket = "streaming-orcasound-net"
    s3_cache_location = ".s3_cache"
    s3_cache_file = ".s3_cache/streaming-orcasound-net-7-days.csv"
    s3_cache_xz = "streaming-orcasound-net-7-days.tar.xz"
    #data catalog url, used to filter out the data from the main dataset stored in S3 bucket
    s3_cache_xz_url = "https://workflow.isi.edu/Panorama/Data/Orcasound/streaming-orcasound-net-7-days.tar.xz"
    
    # --- Init ---------------------------------------------------------------------
    def __init__(self, sensors, start_date, end_date, max_files, dagfile="workflow.yml"):
        self.dagfile = dagfile
        self.wf_dir = str(Path(".").resolve())
        self.shared_scratch_dir = os.path.join(self.wf_dir, "scratch")
        self.local_storage_dir = os.path.join(self.wf_dir, "output")
        self.sensors = sensors
        self.max_files = max_files
        self.start_date = int(start_date.timestamp())
        self.end_date = int(end_date.timestamp())

    
    # --- Write files in directory -------------------------------------------------
    def write(self):
        if not self.sc is None:
            self.sc.write()
        self.props.write()
        self.rc.write()
        self.tc.write()
        
        try:
            self.wf.write()
        except PegasusClientError as e:
            print(e)


    # --- Plan and Submit the workflow ----------------------------------------------
    def plan_submit(self):
        try:
            self.wf.plan(submit=True)
        except PegasusClientError as e:
            print(e)
            
            
    # --- Get status of the workflow -----------------------------------------------
    def status(self):
        try:
            self.wf.status(long=True)
        except PegasusClientError as e:
            print(e)
            
    # --- Get statistics of the workflow -----------------------------------------------
    def statistics(self):
        try:
            self.wf.statistics()
        except PegasusClientError as e:
            print(e)
            

    # --- Configuration (Pegasus Properties) ---------------------------------------
    def create_pegasus_properties(self):
        self.props = Properties()
        self.props["pegasus.transfer.threads"] = "16"
        return


    # --- Site Catalog -------------------------------------------------------------
    def create_sites_catalog(self, exec_site_name="condorpool"):
        self.sc = SiteCatalog()

        local = (Site("local")
                    .add_directories(
                        Directory(Directory.SHARED_SCRATCH, self.shared_scratch_dir)
                            .add_file_servers(FileServer("file://" + self.shared_scratch_dir, Operation.ALL)),
                        Directory(Directory.LOCAL_STORAGE, self.local_storage_dir)
                            .add_file_servers(FileServer("file://" + self.local_storage_dir, Operation.ALL))
                    )
                )

        exec_site = (Site(exec_site_name)
                        .add_condor_profile(universe="vanilla")
                        .add_pegasus_profile(
                            style="condor"
                        )
                    )
        self.sc.add_sites(local, exec_site)
        

    # --- Transformation Catalog (Executables and Containers) ----------------------
    def create_transformation_catalog(self, exec_site_name="condorpool"):
        self.tc = TransformationCatalog()
        
        orcasound_container = Container("orcasound_container",
            container_type = Container.SINGULARITY,
            image="docker://papajim/orcasound-processing:latest",
            image_site="docker_hub"
        )
        
        orcasound_ml_container = Container("orcasound_ml_container",
            container_type = Container.SINGULARITY,
            image="docker://papajim/orcasound-ml-processing:latest",
            image_site="docker_hub"
        )

        # Add the orcasound processing
        mkdir = Transformation("mkdir", site="local", pfn="/bin/mkdir", is_stageable=False)
        # converts the input hydrophone data to WAV format
        convert2wav = Transformation("convert2wav", site=exec_site_name, pfn=os.path.join(self.wf_dir, "bin/convert2wav.py"), is_stageable=True, container=orcasound_container)
        # converts the WAV output to spectrogram images
        convert2spectrogram = Transformation("convert2spectrogram", site=exec_site_name, pfn=os.path.join(self.wf_dir, "bin/convert2spectrogram.py"), is_stageable=True, container=orcasound_container)
        # identifies the sound using a pretrained ML model
        inference = Transformation("inference", site=exec_site_name, pfn=os.path.join(self.wf_dir, "bin/inference.py"), is_stageable=True, container=orcasound_ml_container)
        # merges the predictions from all sensors
        merge = Transformation("merge", site=exec_site_name, pfn=os.path.join(self.wf_dir, "bin/merge.py"), is_stageable=True, container=orcasound_container)

        
        self.tc.add_containers(orcasound_container, orcasound_ml_container)
        self.tc.add_transformations(convert2wav, convert2spectrogram, inference, merge, mkdir)

    
    # --- Fetch s3 catalog ---------------------------------------------------------
    def fetch_s3_catalog(self):
        print("Downloading S3 cache...")
        data = requests.get(self.s3_cache_xz_url)
        if data.status_code != 200:
            raise ConnectionError("Download for {} failed with error code: {}".format(self.s3_cache_xz_url, data.status_code))

        with open(self.s3_cache_xz, "wb") as f:
            f.write(data.content)

        print("Unpacking S3 cache...")
        with tarfile.open(self.s3_cache_xz) as f:
            f.extractall('.')

        os.remove(self.s3_cache_xz)
        print("S3 cache fetched successfully...")


    # --- Check s3 catalog for files -----------------------------------------------
    def check_s3_cache(self):
        s3_files = self.s3_cache[self.s3_cache["Sensor"].isin(self.sensors) & (self.s3_cache["Timestamp"] >= self.start_date) & (self.s3_cache["Timestamp"] <= self.end_date)]

        if s3_files.empty:
            print("No files found for sensors between {} and {}".format(self.start_date, self.end_date))
            exit()

        for sensor in self.sensors:
            if s3_files[s3_files["Sensor"] == sensor].empty:
                print("No files found for sensor {} between {} and {}".format(sensor, self.start_date, self.end_date))
        
        self.s3_files = s3_files


    # --- Read s3 catalog files ----------------------------------------------------
    def read_s3_cache(self):
        if not os.path.isfile(self.s3_cache_file):
            self.fetch_s3_catalog()
        
        print("Reading S3 cache...")
        self.s3_cache = pd.read_csv(self.s3_cache_file)
        self.check_s3_cache()

    
    # --- Replica Catalog ----------------------------------------------------------
    def create_replica_catalog(self):
        self.rc = ReplicaCatalog()

        self.read_s3_cache()
        if (self.s3_files is None):
            exit()

        # Add s3 files as deep lfns
        for f in self.s3_files["Key"]:
            self.rc.add_replica("AmazonS3", f, "https://streaming-orcasound-net.s3.us-west-2.amazonaws.com/{}".format(f))

        # Add inference dependencies
        self.rc.add_replica("local", "model.py", os.path.join(self.wf_dir, "bin/model.py"))
        self.rc.add_replica("local", "dataloader.py", os.path.join(self.wf_dir, "bin/dataloader.py"))
        self.rc.add_replica("local", "params.py", os.path.join(self.wf_dir, "bin/params.py"))
        self.rc.add_replica("local", "model.pkl", os.path.join(self.wf_dir, "input/model.pkl"))
     

    # --- Create Workflow ----------------------------------------------------------
    def create_workflow(self):
        self.wf = Workflow(self.wf_name, infer_dependencies=True)
        
        model_py = File("model.py")
        dataloader_py = File("dataloader.py")
        params_py = File("params.py")
        model_file = File("model.pkl")

        # Create a job for each Sensor and Timestamp
        predictions_files = []
        # Creating a linear pipeline of convert2wav-->convert2spectrogram-->predict
        # regarding each sensor and timestamp
        for sensor in self.sensors:
            predictions_sensor_files = []
            for ts in self.s3_files[self.s3_files["Sensor"] == sensor]["Timestamp"].unique():
                predictions_sensor_ts_files = []
                sensor_ts_files = self.s3_files[(self.s3_files["Sensor"] == sensor) & (self.s3_files["Timestamp"] == ts) & (self.s3_files["Filename"] != "live.m3u8")]
                sensor_ts_files_len = len(sensor_ts_files.index)
                # -2 if m3u8 in the list else -1
                sensor_ts_files = sensor_ts_files[sensor_ts_files["Filename"] != "live{}.ts".format(sensor_ts_files_len-1)]
                sensor_ts_files_len -= 1

                num_of_splits = -(-sensor_ts_files_len//self.max_files)

                mkdir_job = (Job("mkdir", _id="scratch_mkdir_{0}_{1}".format(sensor, ts), node_label="scratch_mkdir_{0}_{1}".format(sensor, ts))
                                .add_args("-p ${0}/png/{1}/{2} ${0}/wav/{1}/{2}".format("_PEGASUS_INITIAL_DIR", sensor, ts))
                                .add_profiles(Namespace.SELECTOR, key="execution.site", value="local")
                )
                self.wf.add_jobs(mkdir_job)

                counter = 1
                for job_files in np.array_split(sensor_ts_files, num_of_splits):
                    input_files = job_files["Key"]
                    wav_files = []
                    png_files = []
                    for f in job_files["Filename"]:
                        wav_files.append("wav/{0}/{1}/{2}".format(sensor, ts, f.replace(".ts", ".wav")))
                        png_files.append("png/{0}/{1}/{2}".format(sensor, ts, f.replace(".ts", ".png")))
                
                    convert2wav_job = (Job("convert2wav", _id="wav_{0}_{1}_{2}".format(sensor, ts, counter), node_label="wav_{0}_{1}_{2}".format(sensor, ts, counter))
                                        .add_args("-i {0}/hls/{1} -o wav/{0}/{1}".format(sensor, ts))
                                        .add_inputs(*input_files, bypass_staging=True)
                                        .add_outputs(*wav_files, stage_out=False, register_replica=False)
                                        .add_pegasus_profiles(label="{0}_{1}_{2}".format(sensor, ts, counter))
                                    )
                    
                    convert2spectrogram_job = (Job("convert2spectrogram", _id="png_{0}_{1}_{2}".format(sensor, ts, counter), node_label="spectrogram_{0}_{1}_{2}".format(sensor, ts, counter))
                                        .add_args("-i wav/{0}/{1} -o png/{0}/{1}".format(sensor, ts))
                                        .add_inputs(*wav_files)
                                        .add_outputs(*png_files, stage_out=True, register_replica=False)
                                        .add_pegasus_profiles(label="{0}_{1}_{2}".format(sensor, ts, counter))
                                    )
                    
                    predictions = File("predictions_{0}_{1}_{2}.json".format(sensor, ts, counter))
                    predictions_sensor_ts_files.append(predictions)
                    inference_job = (Job("inference", _id="predict_{0}_{1}_{2}".format(sensor, ts, counter), node_label="inference_{0}_{1}_{2}".format(sensor, ts, counter))
                                        .add_args("-i wav/{0}/{1} -s {0} -t {1} -m {3} -o predictions_{0}_{1}_{2}.json".format(sensor, ts, counter, model_file.lfn))
                                        .add_inputs(model_file, model_py, dataloader_py, params_py, *wav_files)
                                        .add_outputs(predictions, stage_out=False, register_replica=False)
                                        .add_pegasus_profiles(label="{0}_{1}_{2}".format(sensor, ts, counter))
                                    )
                    

                    # Increase counter
                    counter += 1

                    # Share files to jobs
                    self.wf.add_jobs(convert2wav_job, convert2spectrogram_job, inference_job)
                    self.wf.add_dependency(mkdir_job, children=[convert2wav_job, convert2spectrogram_job])

                #merge predictions for sensor timestamps
                merged_predictions = File("predictions_{0}_{1}.json".format(sensor, ts))
                predictions_sensor_files.append(merged_predictions)
                merge_job_ts = (Job("merge", _id="merge_{0}_{1}".format(sensor, ts), node_label="merge_{0}_{1}".format(sensor, ts))
                                    .add_args("-i {0} -o {1}".format(" ".join([x.lfn for x in predictions_sensor_ts_files]), merged_predictions.lfn))
                                    .add_inputs(*predictions_sensor_ts_files)
                                    .add_outputs(merged_predictions, stage_out=True, register_replica=False)
                                    .add_pegasus_profiles(label="{0}_{1}".format(sensor, ts))
                                )

                self.wf.add_jobs(merge_job_ts)

            #merge predictions for sensor if more than 1 files
            if len(predictions_sensor_files) > 1:
                merged_predictions = File("predictions_{0}.json".format(sensor))
                predictions_files.append(merged_predictions)
                merge_job_sensor = (Job("merge", _id="merge_{0}".format(sensor, ts), node_label="merge_{0}".format(sensor, ts))
                                        .add_args("-i {0} -o {1}".format(" ".join([x.lfn for x in predictions_sensor_files]), merged_predictions.lfn))
                                        .add_inputs(*predictions_sensor_files)
                                        .add_outputs(merged_predictions, stage_out=True, register_replica=False)
                                        .add_pegasus_profiles(label="{0}".format(sensor))
                                    )

                self.wf.add_jobs(merge_job_sensor)

        #merge predictions for all sensors if more than 1 files
        if len(predictions_files) > 1:
            merged_predictions = File("predictions_all.json")
            merge_job_all = (Job("merge", _id="merge_all".format(sensor, ts), node_label="merge_all".format(sensor, ts))
                                    .add_args("-i {0} -o {1}".format(" ".join([x.lfn for x in predictions_files]), merged_predictions.lfn))
                                    .add_inputs(*predictions_files)
                                    .add_outputs(merged_predictions, stage_out=True, register_replica=False)
                            )

            self.wf.add_jobs(merge_job_all)
            

#
start_date = datetime.strptime(START_DATE, '%Y-%m-%d')
if END_DATE:
    end_date = datetime.strptime(END_DATE, '%Y-%m-%d')
else:
    end_date = start_date + timedelta(days=1)
max_files = 200
dagfile = 'workflow.yml'

workflow = OrcasoundWorkflow(sensors=SENSORS, 
                             start_date=start_date, 
                             end_date=end_date, 
                             max_files=max_files, 
                             dagfile=dagfile)

print("Creating execution sites...")
workflow.create_sites_catalog("condorpool")

print("Creating workflow properties...")
workflow.create_pegasus_properties()

print("Creating transformation catalog...")
workflow.create_transformation_catalog("condorpool")

print("Creating replica catalog...")
workflow.create_replica_catalog()

print("Creating orcasound workflow dag...")
workflow.create_workflow()

workflow.write()
print("Orcasound Workflow has been generated!")

## 2. Plan and Submit the Workflow

We will now plan and submit the workflow for execution. By default we are running jobs on site **condorpool** i.e the selected ACCESS resource.

In [None]:
workflow.plan_submit()

After the workflow has been successfully planned and submitted, you can use the Python `Workflow` object in order to monitor the status of the workflow. It shows in detail the counts of jobs of each status and also the whether the job is idle or running.

In [None]:
workflow.status()

## 3.  Launch Pilots Jobs on ACCESS resources

At this point you should have some idle jobs in the queue. They are idle because there are no resources yet to execute on. Resources can be brought in with the HTCondor Annex tool, by sending pilot jobs (also called glideins) to the ACCESS resource providers. These pilots have the following properties:

A pilot can run multiple user jobs - it stays active until no more user jobs are available or until end of life has been reached, whichever comes first.

A pilot is partitionable - job slots will dynamically be created based on the resource requirements in the user jobs. This means you can fit multiple user jobs on a compute node at the same time.

A pilot will only run jobs for the user who started it.

The process of starting pilots is described in the [ACCESS Pegasus Documentation](https://xsedetoaccess.ccs.uky.edu/confluence/redirect/ACCESS+Pegasus.html)

## 4. Statistics

Depending on if the workflow finished successfully or not, you have options on what to do next. If the workflow failed you can use `workflow.analyze()` do get help finding out what went wrong. If the workflow finished successfully, we can pull out some statistcs from the provenance database:

In [None]:
workflow.statistics()