# Parallel Processing of Feature Detection with `dask`

This notebook demonstrates how to run *tobac* feature detection in parallel using the `dask` library as the parallel processor.

## Imports and Dask Cluster Setup

In [1]:
%matplotlib inline

In [2]:
import tobac
import dask.bag as db
import xarray as xr
import s3fs

There are many different ways to initialize a dask cluster. This is just one example, running two workers on a single local machine. 

In [3]:
from dask.distributed import Client, progress

client = Client(n_workers=2, threads_per_worker=1)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 36.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:53906,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 36.00 GiB

0,1
Comm: tcp://127.0.0.1:53913,Total threads: 1
Dashboard: http://127.0.0.1:53915/status,Memory: 18.00 GiB
Nanny: tcp://127.0.0.1:53909,
Local directory: /var/folders/cw/ddjy53ds5h51szdq1630n50r0000gp/T/dask-scratch-space/worker-l3nv659y,Local directory: /var/folders/cw/ddjy53ds5h51szdq1630n50r0000gp/T/dask-scratch-space/worker-l3nv659y

0,1
Comm: tcp://127.0.0.1:53914,Total threads: 1
Dashboard: http://127.0.0.1:53916/status,Memory: 18.00 GiB
Nanny: tcp://127.0.0.1:53911,
Local directory: /var/folders/cw/ddjy53ds5h51szdq1630n50r0000gp/T/dask-scratch-space/worker-_rhve5vk,Local directory: /var/folders/cw/ddjy53ds5h51szdq1630n50r0000gp/T/dask-scratch-space/worker-_rhve5vk


## Read in Data

Here, we are using the NOAA Global Mosaic of Geostationary Satellite Imagery (GMGSI) as our input data source from AWS s3.

In [4]:
fs = s3fs.S3FileSystem(anon=True)
aws_urls = [
    "s3://noaa-gmgsi-pds/GMGSI_LW/2024/01/01/00/GLOBCOMPLIR_nc.2024010100",
    "s3://noaa-gmgsi-pds/GMGSI_LW/2024/01/01/01/GLOBCOMPLIR_nc.2024010101",
]

all_ds = list()
for aws_url in aws_urls:
    fileObj = fs.open(aws_url)
    all_ds.append(xr.open_dataset(fileObj, engine="h5netcdf"))

We loaded in two files and we will use xarray to concatenate them.

In [5]:
combined_ds = xr.concat(all_ds, dim="time")

In [6]:
combined_ds

These feature detection parameters are just examples.

## *tobac* Feature Detection

In [7]:
parameters_features = {}
parameters_features["position_threshold"] = "weighted_diff"
parameters_features["sigma_threshold"] = 0.5
parameters_features["n_min_threshold"] = 4
parameters_features["target"] = "minimum"
parameters_features["threshold"] = [180, 170]
parameters_features["PBC_flag"] = "hdim_2"

While future versions (1.6 and greater) of *tobac* will support xarray natively in feature detection and segmentation, current versions of *tobac* rely on Iris for gridded data. Because of this, we have to make some conversions to have this data be compatible with iris. 

In [8]:
# iris issues
combined_ds["data"].attrs["units"] = "kelvin"
combined_ds["data"]["time"].attrs["long_name"] = "time"

Now, we will use a *dask bag* to parallelize our feature detection over time. 

In [9]:
b = db.from_sequence(
    [
        combined_ds["data"][x : x + 1][0:500, 0:500]
        for x in range(len(combined_ds["time"]))
    ],
    npartitions=1,
)
out_feature_dfs = db.map(
    lambda x: tobac.feature_detection_multithreshold(
        x, 4000, **parameters_features
    ),
    b,
).compute()

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


## Combining parallel-detected features into one coherent DataFrame

In [10]:
tobac.utils.general.combine_feature_dataframes(out_feature_dfs)

Unnamed: 0,frame,idx,hdim_1,hdim_2,num,threshold_value,feature,time,timestr,lat,lon
0,0,1,0.975325,60.718132,26,180,1,2024-01-01 00:00:00,2024-01-01 00:00:00,72.694528,-175.628134
1,0,2,0.670018,79.074818,16,180,2,2024-01-01 00:00:00,2024-01-01 00:00:00,72.701065,-174.306289
2,0,5,0.358115,1492.979778,15,180,3,2024-01-01 00:00:00,2024-01-01 00:00:00,72.707742,-72.492469
3,0,6,0.482579,1531.520215,26,180,4,2024-01-01 00:00:00,2024-01-01 00:00:00,72.705077,-69.717214
4,0,7,3.409896,2113.185770,285,180,5,2024-01-01 00:00:00,2024-01-01 00:00:00,72.642292,-27.832080
...,...,...,...,...,...,...,...,...,...,...,...
1565,1,1283,498.180906,30.005031,16,170,1566,2024-01-01 01:00:00,2024-01-01 01:00:00,58.262049,-177.839756
1566,1,1284,497.817803,715.879216,7,170,1567,2024-01-01 01:00:00,2024-01-01 01:00:00,58.275803,-128.450668
1567,1,1285,498.158232,3295.186171,63,170,1568,2024-01-01 01:00:00,2024-01-01 01:00:00,58.262908,57.282537
1568,1,1288,498.454404,3793.095530,5,170,1569,2024-01-01 01:00:00,2024-01-01 01:00:00,58.251682,93.136465
