<img src="https://raw.githubusercontent.com/dask/dask/main/docs/source/images/dask_horizontal.svg"
     width="60%"
     alt="Dask logo\" />

# Time for a Test Drive!

You've spent some time walking around the Dascar lot, hearing about all the awesome features and specs...

That's enough talk. Let's jump into this racecar and see what it can do...

We'll test drive:

1. Dask DataFrames for faster & scalable pandas
2. Dask Arrays for faster & scalable NumPy
3. Dask-ML for faster & scalable scikit-learn
4. Coiled for cluster spin-up

![](images/race-car.png "Title")

In [1]:
!pip install dask



## Dask DataFrames

The pandas car...with the Dask engine!

In [2]:
# !pip install dask
import dask.dataframe as dd

In [3]:
%run ../prep_data.py -d flights

data_dir='/Users/samia/Downloads/PyDataLondon2022/Friday_TS2_1330/data'
- Downloading NYC Flights dataset... done
- Extracting flight data... done
** Created flights dataset! in 19.85s**


In [4]:
import os

files = os.path.join('../data', 'nycflights', '*.csv')
files

'../data/nycflights/*.csv'

In [5]:
df = dd.read_csv(files,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={"TailNum": str,
                        "CRSElapsedTime": float,
                        "Cancelled": bool})

In [6]:
df.head()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,False,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,False,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,False,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,False,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,False,0


In [7]:
%%time
df.groupby("Origin")["DepDelay"].mean().compute()

CPU times: user 5.46 s, sys: 779 ms, total: 6.24 s
Wall time: 2.61 s


Origin
EWR    10.295469
JFK    10.351299
LGA     7.431142
Name: DepDelay, dtype: float64

### A slight difference with pandas
Notice the `.compute()` call: this is necessary because Dask operates using something called **lazy evaluation**.

If you haven't heard about lazy evaluation before, check out [the Beginner's Guide to Distributed Computing](https://towardsdatascience.com/the-beginners-guide-to-distributed-computing-6d6833796318).

In [8]:
df

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],int64,float64,int64,float64,int64,object,int64,object,float64,float64,float64,float64,float64,object,object,float64,float64,float64,bool,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


## Dask Arrays

The Numpy car...with Dask engine superpowers!

In [9]:
import dask.array as da

In [10]:
array = da.random.random((10_000, 10_000), chunks=(1_000, 1_000))

In [11]:
array

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (10000, 10000) (1000, 1000) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


In [12]:
array[:10,:5]

Unnamed: 0,Array,Chunk
Bytes,400 B,400 B
Shape,"(10, 5)","(10, 5)"
Count,101 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 400 B 400 B Shape (10, 5) (10, 5) Count 101 Tasks 1 Chunks Type float64 numpy.ndarray",5  10,

Unnamed: 0,Array,Chunk
Bytes,400 B,400 B
Shape,"(10, 5)","(10, 5)"
Count,101 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [13]:
array[:10,:5].compute()

array([[0.03033067, 0.11362239, 0.19616836, 0.64284839, 0.16897578],
       [0.84903037, 0.43905098, 0.65457323, 0.65168446, 0.39127317],
       [0.00159672, 0.9149072 , 0.80971459, 0.26396348, 0.6163673 ],
       [0.04506115, 0.09604897, 0.99482738, 0.51226322, 0.97215484],
       [0.68345574, 0.08160307, 0.5678611 , 0.33031469, 0.85237942],
       [0.10911614, 0.79416483, 0.77907233, 0.64757093, 0.83714992],
       [0.53317414, 0.29623487, 0.10063719, 0.36918281, 0.24015463],
       [0.57412448, 0.02365471, 0.35031677, 0.91510247, 0.92118979],
       [0.7923882 , 0.25029837, 0.46154954, 0.62797051, 0.67783049],
       [0.30137745, 0.32843627, 0.10339478, 0.3927776 , 0.92809416]])

In [14]:
%%time
array.sum(axis=1).compute()

CPU times: user 790 ms, sys: 82.5 ms, total: 872 ms
Wall time: 186 ms


array([5009.14898836, 4973.15042682, 5011.34447765, ..., 5042.35303707,
       4959.78284663, 4998.92410136])

## Dask ML

The scikit-learn car with.... you guessed it -- Dask rocketfuel!

In [18]:
from dask_ml.linear_model import LogisticRegression
from dask_ml.datasets import make_classification

In [19]:
X, y = make_classification(n_samples=1_000, chunks=50)

In [20]:
X

Unnamed: 0,Array,Chunk
Bytes,156.25 kiB,7.81 kiB
Shape,"(1000, 20)","(50, 20)"
Count,20 Tasks,20 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 156.25 kiB 7.81 kiB Shape (1000, 20) (50, 20) Count 20 Tasks 20 Chunks Type float64 numpy.ndarray",20  1000,

Unnamed: 0,Array,Chunk
Bytes,156.25 kiB,7.81 kiB
Shape,"(1000, 20)","(50, 20)"
Count,20 Tasks,20 Chunks
Type,float64,numpy.ndarray


In [21]:
y

Unnamed: 0,Array,Chunk
Bytes,7.81 kiB,400 B
Shape,"(1000,)","(50,)"
Count,241 Tasks,20 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 7.81 kiB 400 B Shape (1000,) (50,) Count 241 Tasks 20 Chunks Type int64 numpy.ndarray",1000  1,

Unnamed: 0,Array,Chunk
Bytes,7.81 kiB,400 B
Shape,"(1000,)","(50,)"
Count,241 Tasks,20 Chunks
Type,int64,numpy.ndarray


In [22]:
lr = LogisticRegression()

In [17]:
%%time
lr.fit(X, y)

NameError: name 'lr' is not defined

In [None]:
%%time
predictions = lr.predict(X).compute()

In [None]:
lr.score(X,y).compute()

# Digging Deeper

Dask's lower-level APIs give you even more flexibility and control over what / how to parallelize your custom Python code.

## Parallelize Python Code with `dask.delayed`

In [None]:
from time import sleep

def inc(x):
    """Increments x by one"""
    sleep(1)
    return x + 1

def add(x=0, y=0, z=0):
    """Adds x and y and z"""
    sleep(1)
    return x + y + z

In [None]:
%%time

x = inc(1) # takes 1 second
y = inc(2) # takes 1 second
z = add(x, y) # takes 1 second

In [None]:
z

In [None]:
from dask import delayed

In [None]:
%%time

a = delayed(inc)(1)
b = delayed(inc)(2)
c = delayed(add)(a, b)

In [None]:
c

In [None]:
a.visualize()

In [None]:
b.visualize()

In [None]:
c.visualize()

In [None]:
%%time
c.compute()

In [None]:
d = delayed(inc)(3)

In [None]:
c = delayed(add)(a, b, d)

In [None]:
c.visualize()

In [None]:
%%time
c.compute()

Task graphs can get...complicated:

<img src="https://raw.githubusercontent.com/coiled/pydata-global-dask/master/images/grid_search_schedule.gif"
     width="95%"
     alt="Grid search schedule\" />

## Dask Cluster on Coiled

To launch your own Coiled clusters:
1. Create an account at [cloud.coiled.io](cloud.coiled.io)
2. Open a terminal
3. Create a new conda env and activate it
4. Run `conda install -c conda-forge coiled-runtime`
5. Run `coiled login`

You’ll then be asked to login to the Coiled web interface. Normally you'd navigate to https://cloud.coiled.io/profile where you can create and manage API tokens. This requires setting up some cloud credentials. To bypass that for this tutorial, we'll use a test account that's already set up.

```
Please login to https://cloud.coiled.io/profile to get your token
Token:
```

Copy the following token (removing the "LONDON" in the middle) and press Enter:

`aea6c94125e64d8f839e9c7719537ca4-c48ca9434221c4d39b65b9266901c3956065a6cd`
    
This token will be destroyed immediately after this tutorial. To continue using Coiled after the tutorial, connect your Coiled account to your AWS/GCP cloud by following the steps [here](https://docs.coiled.io/user_guide/backends.html).

In [1]:
import coiled

In [2]:
coiled.create_software_environment(
    account="pydata-london",
    conda="../binder/environment.yml",
    name="dask-tutorial",
)

Found existing software environment build, returning


In [4]:
# create a unique identifier for your cluster
import random
your_name = "INSERT-MY-NAME-HERE-THREE" 
unique_id = your_name + str(random.randint(100,200))

# spin up the cluster
cluster = coiled.Cluster(
    name=f"dask-tutorial-{unique_id}", 
    n_workers=16, 
    worker_memory='16Gib',
    software="pydata-london/dask-tutorial",
    scheduler_options={'idle_timeout':'2 hours'}, # default is 20min
    shutdown_on_close=False,
)

Output()

ClusterCreationError: Cluster status is error (reason: Scheduler Stopped -> Instance failed: AWS failed to create requested instance.
VcpuLimitExceeded - You have requested more vCPU capacity than your current vCPU limit of 512 allows for the instance bucket that the specified instance type belongs to. Please visit http://aws.amazon.com/contact-us/ec2-request to request an adjustment to this limit.) (cluster_id: 35321)

In [None]:
from distributed import Client

client = Client(cluster)

In [None]:
import dask.dataframe as dd

In [None]:
df = dd.read_csv(
    "s3://nyc-tlc/csv_backup/yellow_tripdata_2019-*.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    dtype={
        "payment_type": "UInt8",
        "VendorID": "UInt8",
        "passenger_count": "UInt8",
        "RatecodeID": "UInt8",
        "store_and_fwd_flag": "category",
        "PULocationID": "UInt16",
        "DOLocationID": "UInt16",
    },
    storage_options={"anon": True},
    blocksize="16 MiB",
)

In [None]:
df

In [None]:
%%time
df.groupby("passenger_count").tip_amount.mean().compute()

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/dask-tutorial/lib/python3.9/site-packages/distributed/comm/tcp.py", line 439, in connect
    stream = await self.client.connect(
  File "/Users/rpelgrim/mambaforge/envs/dask-tutorial/lib/python3.9/site-packages/tornado/tcpclient.py", line 275, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/dask-tutorial/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/rpelgrim/mambaforge/envs/dask-tutorial/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
    comm = await asyncio.wait_for(
  File "/Users/rpelgrim/mambaf