


**Author:** Steffen Schober



## Acknowlegment



This notebook is based on the DASK tutorial.



## Prepare Data



Prepare the data, make sure that `prep.py` is the same directory than this notebook.



In [30]:
%run 03_prep.py -d flights

In [31]:
%run 03_prep.py -d accounts

## Setup



In [32]:
from dask.distributed import Client

client = Client(n_workers=4)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 39903 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:39903/status,

0,1
Dashboard: http://127.0.0.1:39903/status,Workers: 4
Total threads: 16,Total memory: 31.27 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43557,Workers: 4
Dashboard: http://127.0.0.1:39903/status,Total threads: 16
Started: Just now,Total memory: 31.27 GiB

0,1
Comm: tcp://127.0.0.1:43197,Total threads: 4
Dashboard: http://127.0.0.1:44361/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:35849,
Local directory: /tmp/dask-scratch-space/worker-1nh2lz06,Local directory: /tmp/dask-scratch-space/worker-1nh2lz06

0,1
Comm: tcp://127.0.0.1:46155,Total threads: 4
Dashboard: http://127.0.0.1:34373/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:42135,
Local directory: /tmp/dask-scratch-space/worker-g032q0fl,Local directory: /tmp/dask-scratch-space/worker-g032q0fl

0,1
Comm: tcp://127.0.0.1:46095,Total threads: 4
Dashboard: http://127.0.0.1:45005/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:41733,
Local directory: /tmp/dask-scratch-space/worker-mz0kcor_,Local directory: /tmp/dask-scratch-space/worker-mz0kcor_

0,1
Comm: tcp://127.0.0.1:34911,Total threads: 4
Dashboard: http://127.0.0.1:39243/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:41049,
Local directory: /tmp/dask-scratch-space/worker-zkuyept0,Local directory: /tmp/dask-scratch-space/worker-zkuyept0


You can access the dashboard using your web browser, the linke is also found here:



In [33]:
print(client.cluster.dashboard_link)

http://127.0.0.1:39903/status


Explore the dashboard, you can find a lot of information there.
Note that under `Info` you find information about
the TCP endpoint of the scheduler (you can use this to connect to the cluster via the `Client`.).



## First Example - Loading CSV file



In [34]:
import os
import dask
filename = os.path.join('data', 'accounts.*.csv')
filename

'data/accounts.*.csv'

In [35]:
import dask.dataframe as dd
df = dd.read_csv(filename)
df.head()

Unnamed: 0,id,names,amount
0,281,Ursula,-157
1,274,Sarah,1214
2,198,George,1522
3,489,Bob,3560
4,42,Bob,108


In [36]:
# load and count number of rows
len(df)

3000000

## Flights Data Set



In [37]:
# load and count number of rows
df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
  	       parse_dates={'Date': [0, 1, 2]},
  	       dtype={'TailNum': str,
  		      'CRSElapsedTime': float,
  		      'Cancelled': bool}
)
df

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],int64,float64,int64,float64,int64,string,int64,string,float64,float64,float64,float64,float64,string,string,float64,float64,float64,bool,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Notice that the representation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.
We enforce the dtype for three columns, because those do not contain data in the first rows, hence,
type inference will fail&#x2026; (you can check this by omitting the `dtype` in `read_csv()`).



In [38]:
df.dtypes

Date                  datetime64[ns]
DayOfWeek                      int64
DepTime                      float64
CRSDepTime                     int64
ArrTime                      float64
CRSArrTime                     int64
UniqueCarrier        string[pyarrow]
FlightNum                      int64
TailNum              string[pyarrow]
ActualElapsedTime            float64
CRSElapsedTime               float64
AirTime                      float64
ArrDelay                     float64
DepDelay                     float64
Origin               string[pyarrow]
Dest                 string[pyarrow]
Distance                     float64
TaxiIn                       float64
TaxiOut                      float64
Cancelled                       bool
Diverted                       int64
dtype: object

In [39]:
df.head()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,False,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,False,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,False,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,False,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,False,0


Unlike `pandas.read_csv` which reads in the entire file before inferring datatypes,
`dask.dataframe.read_csv` only reads in a sample from the beginning of the file (or first file if using a glob).
These inferred datatypes are then enforced when reading all partitions.



### Some Analysis



We compute the maximum of the `DepDelay` column. With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums

    maxes = []
    for fn in filenames:
        df = pd.read_csv(fn)
        maxes.append(df.DepDelay.max())
    
    final_max = max(maxes)

We could wrap that `pd.read_csv` with `dask.delayed` so that it runs in parallel.
Regardless, we’re still having to think about loops, intermediate results (one per file) and the final reduction (max of the intermediate maxes).

    df = pd.read_csv(filename, dtype=dtype)
    df.DepDelay.max()

`dask.dataframe` lets us write pandas-like code, that operates on larger than memory datasets in parallel.
Here we compute the max of `DepDelay`:



In [40]:
%time df.DepDelay.max().compute()

CPU times: user 198 ms, sys: 60 ms, total: 258 ms
Wall time: 1.52 s


1435.0

Let's visualize the graph:



In [41]:
# notice the parallelism
df.DepDelay.max().visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

## Exercises



Try to answer the following questions:

1.  How many rows are in our dataset?
2.  In total, how many non-canceled flights were taken?
3.  In total, how many non-cancelled flights were taken from each airport?
4.  What day of the week has the worst average departure delay?

Hint for the third question:
use `groupby` with the aggregate function `count`.
See [https://pandas.pydata.org/pandas-docs/stable/groupby.html](https://pandas.pydata.org/pandas-docs/stable/groupby.html).



In [42]:
shape1 = df.shape[0].compute()
shape2 = len(df)

print("rows:", shape1)
print("rows:", shape2)

rows: 2611892
rows: 2611892


In [43]:
non_cancelled = len(df[~df.Cancelled]) # ~: invert selection

print("non-cancelled:", non_cancelled)

# Calculate the total number of non-canceled flights
total_non_canceled_flights = (df['Cancelled'] == 0).sum().compute()

# Print the result
print("non-cancelled chatgpt:", total_non_canceled_flights)

non-cancelled: 2540961
non-cancelled chatgpt: 2540961


In [44]:
# Filter out the cancelled flights
non_cancelled_flights = df[~df.Cancelled]

# Group by 'Origin' and count the number of non-cancelled flights from each airport
flight_counts_by_origin = non_cancelled_flights.groupby('Origin').size()

# Compute the result
flight_counts_by_origin = flight_counts_by_origin.compute()

# Print the result
print("Total number of non-cancelled flights from each airport:")
print(flight_counts_by_origin)

Total number of non-cancelled flights from each airport:
Origin
EWR    1139451
JFK     427243
LGA     974267
dtype: int64


In [45]:
# Group by 'DayOfWeek' and calculate the average departure delay
average_delay_by_day = df.groupby('DayOfWeek')['DepDelay'].mean()

# Find the day of the week with the worst average departure delay
worst_delay_day = average_delay_by_day.idxmax().compute()

# Print the result
print("Day of the week with the worst average departure delay:", worst_delay_day)

Day of the week with the worst average departure delay: 5


## Sharing Intermediate Results



When computing all of the above, we sometimes did the same operation more than once.
For most operations, `dask.dataframe` hashes the arguments, allowing duplicate computations to be shared, and only computed once.

For example, lets compute the mean and standard deviation for departure delay of all non-canceled flights.
Since dask operations are lazy, those values aren’t the final results yet. They’re just the recipe required to get the result.

If we compute them with two calls to compute, there is no sharing of intermediate computations.



In [46]:
non_cancelled = df[~df.Cancelled]
mean_delay = non_cancelled.DepDelay.mean()
std_delay = non_cancelled.DepDelay.std()

In [47]:
%%time

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

CPU times: user 422 ms, sys: 92.6 ms, total: 515 ms
Wall time: 2.65 s


But let’s try by passing both to a single compute call.



In [48]:
%%time
mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

CPU times: user 234 ms, sys: 58.7 ms, total: 292 ms
Wall time: 1.38 s


The task graphs for both results are merged when calling `dask.compute`, allowing shared operations to only be done once instead of twice.

