# `Dask`-aided Parallel Run of SuPy 

In [1]:
import supy as sp
import pandas as pd
import numpy as np
from dask import compute, delayed
import dask.multiprocessing
import dask.threaded
%matplotlib inline

In [2]:
sp.__version__

'0.5.9'

In [3]:
ser_cfg, df_init, df_forcing = sp.load_SampleData()

In [4]:
%%time
df_output,df_final=sp.run_supy(df_forcing.loc['2012 01'],df_init)

CPU times: user 2.59 s, sys: 2.14 s, total: 4.72 s
Wall time: 4.83 s


In [None]:
df_final.filter(like='state_id')

### create multiple grids.

In [5]:
df_init_multi=pd.concat({grid:df_init.loc[[1]] for grid in np.arange(64)},axis=0,ignore_index=True)
df_init_multi.index.set_names('grid',inplace=True)

In [6]:
# define a grid level wrapper for supy
def run_grid(grid):
    df_output,df_final=sp.run_supy(df_forcing.loc['2012 01'],df_init_multi.loc[[grid]])
    return df_output,df_final

In [7]:
%%time
xx=run_grid(1)

CPU times: user 2.62 s, sys: 2.19 s, total: 4.81 s
Wall time: 5.01 s


### parallel run

In [8]:
from dask.distributed import Client
client = Client()

In [9]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:61453  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 17.18 GB


In [None]:
%%time
values = [delayed(run_grid)(grid) for grid in np.arange(4)]
results = compute(*values, scheduler='distributed',optimize_graph=False)

In [None]:
%%time
values = [delayed(run_grid)(grid) for grid in np.arange(4)]
results = compute(*values, scheduler='threads')
# results = compute(*values, scheduler='processes')

In [None]:
%%time
values = [delayed(run_grid)(grid) for grid in np.arange(4)]
results = compute(*values, scheduler='processes')

In [None]:
%%time
values = [delayed(run_grid)(grid) for grid in np.arange(4)]
results = compute(*values, scheduler='multiprocessing',optimize_graph=False)

In [None]:
%%time
values_ser = [run_grid(grid) for grid in np.arange(8)]

In [None]:
%%time
# df_forcing_d=delayed(df_forcing.loc['2012 01'])
df_forcing_d=df_forcing.loc['2012 01']
values = [delayed(sp.run_supy)(df_forcing_d,df_init_multi.loc[[grid]]) for grid in np.arange(5,9)]
results = compute(*values, scheduler='distributed')

## using `Future`

In [None]:
client

In [10]:
%%time
res_func=client.map(run_grid,np.arange(0,4))

CPU times: user 5.36 s, sys: 1.06 s, total: 6.42 s
Wall time: 6.45 s


In [None]:
res_func