# Dask: a flexible library for parallel computing in Python.

Dask is composed of two parts:

- Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
- “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers.

Many classes of problems cannot fit on a single computer or compute node. 
Some require more memory to process very large datasets, or could simply benefit from being spread across multiple compute nodes. 
One tool that can span from laptops to large clusters is `dask` (https://docs.dask.org/en/stable/), which can be installed via `conda`.

In [28]:
import dask.dataframe as dd
import dask.array as da

In [29]:
data = np.random.normal(size=100000).reshape(200, 500)
a = da.from_array(data, chunks=(100, 100))
a

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,78.12 kiB
Shape,"(200, 500)","(100, 100)"
Dask graph,10 chunks in 1 graph layer,10 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 781.25 kiB 78.12 kiB Shape (200, 500) (100, 100) Dask graph 10 chunks in 1 graph layer Data type float64 numpy.ndarray",500  200,

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,78.12 kiB
Shape,"(200, 500)","(100, 100)"
Dask graph,10 chunks in 1 graph layer,10 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [30]:
a[:50, 200]

Unnamed: 0,Array,Chunk
Bytes,400 B,400 B
Shape,"(50,)","(50,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 400 B 400 B Shape (50,) (50,) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",50  1,

Unnamed: 0,Array,Chunk
Bytes,400 B,400 B
Shape,"(50,)","(50,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [31]:
a[:50, 100].compute()

array([-1.43243322e+00, -1.16493758e+00, -1.87967293e+00,  1.47886369e-01,
        1.28971173e+00, -5.20397767e-01,  7.38853770e-01, -6.29040623e-01,
        1.08457857e+00, -1.65313786e+00,  4.96115440e-02,  1.14667021e+00,
       -1.84882693e-01, -4.90448809e-01, -1.98556060e-01, -4.59505020e-01,
       -1.23344705e+00, -4.75947121e-01, -1.08411314e+00,  1.10891156e+00,
       -3.80255131e-01, -2.05722915e-01,  1.76842336e+00,  2.14955832e-01,
        4.06571959e-01,  1.07101227e+00,  1.70421791e+00, -2.57276521e+00,
        2.11179670e+00,  9.94971691e-04,  6.76553884e-02,  1.02370386e+00,
       -1.28329307e+00,  2.35121424e-01, -1.23902863e-01,  7.26158694e-01,
        1.21912210e-01,  1.06211882e+00, -5.90280767e-01,  8.41471468e-01,
       -4.44307566e-01, -2.65149637e-01,  1.28920793e+00,  1.83402709e+00,
        1.66868151e+00,  3.36507699e-01, -1.54065640e-01,  1.04694489e+00,
        4.46484496e-01, -9.05282945e-01])

In [32]:
a.mean().compute()

0.00214943349359884

## Dask Distributed with Slurm

In [None]:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

In [None]:
# Define single unit of the Dask Distributed "Cluster"
cluster = SLURMCluster(queue='admintest', cores=1, memory="20GB")

# Scale up the cluster to have 10 members
cluster.scale(12)

# Initialize the "client" so that the script is connected to the Cluster
client = Client(cluster)


In [None]:
client

In [None]:
data = np.random.normal(size=200000000).reshape(40000, 5000)
a = da.from_array(data, chunks=(2000, 1000))
a

In [None]:
a.std().compute()

## Example 3: NYC Taxi Data

- Collected data from all taxi and ride-share trips
- Very large data sets, too big to work with all at once on a single computer
- Let's use `dask` to explore some facets of the data


In [None]:
yellow_cab = glob.glob('/home/tl397/ycrc/workshops/taxi/yellow_tripdata_2022-*parquet')
ride_share = glob.glob('/home/tl397/ycrc/workshops/taxi/fhvhv_tripdata_2022-*parquet')


In [None]:
yc = dd.read_parquet(yellow_cab)
rs = dd.read_parquet(ride_share) 


In [None]:
yc = yc[(yc.fare_amount > 0)]
rs = rs[(rs.base_passenger_fare > 0)]


### Question: Do people tip cabs or Ubers/Lyfts better?

In [None]:
h_yc, bins = da.histogram(np.divide(yc.tip_amount, yc.fare_amount), bins=200, range=[0.01, 2])
h_rs, bins = da.histogram(np.divide(rs.tips, rs.base_passenger_fare), bins=200, range=[0.01, 2])


In [None]:
plt.subplots(1,1)
plt.stairs(h_yc, bins, label="yellow cab")
plt.stairs(h_rs, bins, label="uber/lyft")

plt.yscale('log');
plt.ylabel('Rides');
plt.xlabel('Tip percentage (%)');
plt.legend();

### Mean tip percentage

In [None]:
print(f"Yellow Cab: {100*yc.tip_amount.divide(yc.fare_amount).mean().compute():.2f}%")

In [None]:
print(f"Ride-share: {100*rs.tips.divide(rs.base_passenger_fare).mean().compute():.2f}%")

## Key Take-aways
1. Dask is able to orchestrate lots of parallel workers, either locally or across the cluster
2. It's easier to not tip when it's on an app?
