### Dask DataFrames

Similar to the design of arrays, dask dataframes are paritioned pandas dataframes. Computations run pandas operators on the chunks and aggregate results from the chunked operations.

<img src="https://upload.wikimedia.org/wikipedia/commons/8/81/Dask-dataframe.svg" width="256" title="    https://upload.wikimedia.org/wikipedia/commons/8/81/Dask-dataframe.svg" />

For the most part, dask has tried to implement all of pandas, but there are some inefficient operations that it does not support.

Let's load up a dataframe and see what we get.  This is the NYC flight data used in the dask tutorial.

In [1]:
import dask.dataframe as dd
df = dd.read_csv('./data/nycflight/*.csv',
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
df

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],int64,float64,int64,float64,int64,object,int64,object,float64,float64,float64,float64,float64,object,object,float64,float64,float64,bool,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [2]:
df.tail()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
269176,1999-12-27,1,1645.0,1645,1830.0,1901,UA,1753,N516UA,225.0,...,205.0,-31.0,0.0,LGA,DEN,1619.0,7.0,13.0,False,0
269177,1999-12-28,2,1726.0,1645,1928.0,1901,UA,1753,N504UA,242.0,...,214.0,27.0,41.0,LGA,DEN,1619.0,5.0,23.0,False,0
269178,1999-12-29,3,1646.0,1645,1846.0,1901,UA,1753,N592UA,240.0,...,220.0,-15.0,1.0,LGA,DEN,1619.0,5.0,15.0,False,0
269179,1999-12-30,4,1651.0,1645,1908.0,1901,UA,1753,N575UA,257.0,...,233.0,7.0,6.0,LGA,DEN,1619.0,5.0,19.0,False,0
269180,1999-12-31,5,1642.0,1645,1851.0,1901,UA,1753,N539UA,249.0,...,232.0,-10.0,-3.0,LGA,DEN,1619.0,6.0,11.0,False,0


In [3]:
df.head()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,False,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,False,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,False,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,False,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,False,0


The dataframe has 10 chunks (npartitions) that correspond to the ten files that were read.  Dataframes have two key properties:
  * they are tabular (two-dimensional) data
  * each column has a datatype defined by a _schema_
  
The programmer is encouraged to think of this as "spreadsheet or SQL table".  It is reasonable to call the data __structured__.
  * this is in contrast to semi-structured data which has tags and hierarchy, but does not enforce types. Examples are XML and JSON.
  * in database parlance, this is flat data model.

### Data Slicing and Aggregation

The most basic operations in a database and in dask is to <code>select</code> rows and <code>project</code> columns.

#### Selecting rows

Find all flights flown by a specific plane, identified by <code>TailNum</code>

In [4]:
df2 = df[df.TailNum=='N516UA'].compute()
df2

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
2295,1995-01-26,4,1822.0,1815,2147.0,2129,UA,71,N516UA,385.0,...,359.0,18.0,7.0,EWR,SFO,2565.0,7.0,19.0,False,0
2304,1995-01-04,3,1757.0,1715,2103.0,2029,UA,73,N516UA,366.0,...,337.0,34.0,42.0,EWR,SFO,2565.0,6.0,23.0,False,0
2349,1995-01-18,3,1049.0,1050,1348.0,1404,UA,75,N516UA,359.0,...,330.0,-16.0,-1.0,EWR,SFO,2565.0,4.0,25.0,False,0
2351,1995-01-20,5,1119.0,1050,1416.0,1404,UA,75,N516UA,357.0,...,337.0,12.0,29.0,EWR,SFO,2565.0,10.0,10.0,False,0
2369,1995-01-07,6,700.0,700,1041.0,1019,UA,79,N516UA,401.0,...,368.0,22.0,0.0,EWR,SFO,2565.0,7.0,26.0,False,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
248678,1999-12-16,4,701.0,700,1019.0,1024,UA,853,N516UA,378.0,...,337.0,-5.0,1.0,EWR,SFO,2565.0,22.0,19.0,False,0
248680,1999-12-18,6,659.0,700,1033.0,1024,UA,853,N516UA,394.0,...,363.0,9.0,-1.0,EWR,SFO,2565.0,19.0,12.0,False,0
269006,1999-12-05,7,820.0,820,1024.0,1043,UA,329,N516UA,244.0,...,214.0,-19.0,0.0,LGA,DEN,1619.0,14.0,16.0,False,0
269094,1999-12-01,3,616.0,620,814.0,832,UA,875,N516UA,238.0,...,215.0,-18.0,-4.0,LGA,DEN,1619.0,7.0,16.0,False,0


#### Projecting Columns

Build a dataframe that describes the plane ('TailNum') and route ('FlightNum')

In [5]:
#routes = df[df.TailNum=='N516UA'][['FlightNum','TailNum']].compute()
routes = df[['FlightNum','TailNum']].compute()

routes

Unnamed: 0,FlightNum,TailNum
0,33,
1,33,
2,33,
3,33,
4,33,
...,...,...
269176,1753,N516UA
269177,1753,N504UA
269178,1753,N592UA
269179,1753,N575UA


### Aggregating Data

A common data science inquiry is to query an aggregate (mean, min, max, sum, etc.) in a group.  This is done with a <code>groupby</code> query. The pattern is to construct a grouping and then aggregate over the grouping.

__Query:__ How many different flights were flown by each plane?

In [6]:
#routes.groupby('TailNum').FlightNum.count()
routes.groupby('TailNum')['FlightNum'].count()

TailNum
'144DA        1
112          10
A367NW       39
EI-BWD      638
EI-CAL       91
          ...  
NEIDLA      125
NN7324       74
NOZ1AA       27
NXXXXX        4
UNKNOW    33663
Name: FlightNum, Length: 3712, dtype: int64

I have intentionaly mixed syntax. Dataframes in both R and Python use two form of syntax interchangeably.
  * <code>dataframe.columnName</code>
  * <code>dataframe['columnName']

Many functions only accept the bracketed indexing of columns.
    
__Query:__ How many times was each flight flown?

In [7]:
routes.groupby('FlightNum')['TailNum'].count()

FlightNum
1       3215
2          0
3       3744
4          0
5       3218
        ... 
9607       1
9608       1
9777       2
9851       0
9899       2
Name: TailNum, Length: 2405, dtype: int64

__Query:__ What were the most routes flown by a single plane?

In [8]:
routes.groupby('TailNum')['FlightNum'].count().max()

33663

__Query:__ What is the maximum number of planes to fly a single route?

In [9]:
routes.groupby('FlightNum')['TailNum'].count().max()

6343

But, these aggregates are not really the questions we want answered. More natural questions are awkward.
  * What plane flew the most routes?
  * What route was flown by the most planes?

In [10]:
routes.groupby('FlightNum').TailNum.count().idxmax()

305

In [11]:
routes.groupby('TailNum').FlightNum.count().idxmax()

'UNKNOW'

This reveals problems with the data.  So, let's look for an actual plane.

In [12]:
routes[routes.TailNum != 'UNKNOW'].groupby('TailNum').FlightNum.count().idxmax()

'N413DA'

and ask for how many flights it has flown

In [13]:
routes[routes.TailNum != 'UNKNOW'].groupby('TailNum').FlightNum.count().max()

4844

In [14]:
type(routes)

pandas.core.frame.DataFrame

and verify that this is the right answer.

In [15]:
routecount = routes.groupby('TailNum').FlightNum.count()
routecount.get('N413DA')

4844

In [16]:
type(routecount)

pandas.core.series.Series

We have uncovered what I think is the most annoying part of dask and pandas dataframes. Aggregate functions return series, which are not dataframes. They have different methods.  I would have preferred to have written:

<code>routecount[routecount.TailNum=='N413DA'].compute()</code>

But, that's dataframe syntax not series syntax.  Aggregates assume that the output is small and return pandas series.

In [17]:
type(routecount)

pandas.core.series.Series

### Indexes

All dataframes have a __default index__.  In this case, the index was generated when we loaded the data and is the row number in the pandas dataframe. Surprisingly, the index is not unique.  The same index value appears in each pandas dataframe.

In [18]:
print(df.index, "\n")
print("Number of rows in the database\n", len(df))
maxindex = df.index.nunique().compute()
print("Number of unique values in the index\n", maxindex)

# find all entries with index value 22000
df.loc[22000].compute()

Dask Index Structure:
npartitions=10
    int64
      ...
    ...  
      ...
      ...
dtype: int64
Dask Name: read-csv, 20 tasks 

Number of rows in the database
 2611892
Number of unique values in the index
 271539


Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
22000,1990-01-17,3,1849.0,1850,1946.0,1945,CO,828,,57.0,...,,1.0,-1.0,EWR,BDL,116.0,,,False,0
22000,1991-02-28,4,2042.0,2040,2154.0,2156,US,458,,72.0,...,,-2.0,2.0,LGA,BUF,292.0,,,False,0
22000,1992-02-23,7,1302.0,1305,1419.0,1416,US,132,,77.0,...,,3.0,-3.0,EWR,BUF,282.0,,,False,0
22000,1993-02-12,5,2258.0,2029,5.0,2210,US,117,,67.0,...,,115.0,149.0,LGA,CLE,418.0,,,False,0
22000,1994-02-25,5,712.0,700,806.0,753,US,419,,54.0,...,,13.0,12.0,LGA,PHL,96.0,,,False,0
22000,1995-01-31,2,1825.0,1800,1943.0,1920,CO,323,N83872,78.0,...,37.0,23.0,25.0,EWR,DCA,199.0,3.0,38.0,False,0
22000,1996-02-07,3,1216.0,1215,1343.0,1338,US,1419,N437US,87.0,...,65.0,5.0,1.0,LGA,PIT,335.0,9.0,13.0,False,0
22000,1997-01-31,5,747.0,745,1026.0,1003,CO,133,N17321,279.0,...,239.0,23.0,2.0,EWR,DEN,1605.0,7.0,33.0,False,0
22000,1998-02-19,4,2031.0,2025,2146.0,2151,CO,1978,N14245,75.0,...,44.0,-5.0,6.0,EWR,PWM,284.0,3.0,28.0,False,0
22000,1999-02-17,3,1511.0,1450,1830.0,1820,HP,2035,N645AW,319.0,...,290.0,10.0,21.0,EWR,PHX,2133.0,4.0,25.0,False,0


In [19]:
df.index.head()

RangeIndex(start=0, stop=5, step=1)

The term index is used to mean many things in computer science:
* the offset of an element in a data structure, i.e. the array index
* a data structure used for fast access in a data store

When I heard index in dask, I expected it to be the latter. It is not. By default, it is only the row number in the local pandas data frame.


My concept of an index comes from relational databases: "A database index is a data structure that improves the speed of data retrieval operations on a database table at the cost of additional writes and storage space to maintain the index data structure."

Real indexes come in many forms, but are most commonly:

* hash tables -- organize the data by the hash value of a key field for constant time $O(1)$ lookup by key.

<img src="https://uploads-ssl.webflow.com/632c5ceb41133222d120b86e/632f1efd49442a2371a6bb8a_hash-function.png" width=256 />

* B+-trees/sorted -- sort the data in a tree to lookup a key in $O(\log n)$ time and be able to scan sequential keys.

<img src="https://www.sentryone.com/hs-fs/hubfs/03_Btree1.png?width=1248&name=03_Btree1.png" width=386 />

This is not really a topic for this class. Although, indexing is one of my favorite parts of computer science.

A simulacrum of indexing can be accomplished in dask by calling <code>set_index</code>, which often results in a global shuffle of the data.  It can be very expensive. After an index is set, all of the data are sorted (across all partitions). This makes it much more efficient to conduct some database like operations, `join()`, `groupby()`, etc.

The data that we have is already sorted by date, so setting the index to that value does not take a long time. 


In [20]:
df.index

Dask Index Structure:
npartitions=10
    int64
      ...
    ...  
      ...
      ...
dtype: int64
Dask Name: read-csv, 20 tasks

In [21]:
%time df.set_index('Date')

CPU times: user 6.19 s, sys: 736 ms, total: 6.93 s
Wall time: 3.29 s


Unnamed: 0_level_0,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
1990-01-01,int64,float64,int64,float64,int64,object,int64,object,float64,float64,float64,float64,float64,object,object,float64,float64,float64,bool,int64
1991-01-01,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1999-01-01,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1999-12-31,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [22]:
df

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],int64,float64,int64,float64,int64,object,int64,object,float64,float64,float64,float64,float64,object,object,float64,float64,float64,bool,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Dask has implemented some what I would expect out of indexing:
1. dask can be told that there is existing structure in the data
2. dask reorganizes data to optimize queries

Dask does not support some features that I think it should
1. hash indexing
2. data readers infer the existence of an index

Why is this a simulacrum of an index, and not a real database structure. Because it only sorts the data, if does not build an additional data structure for fast access. The improved performance only comes from the sortedness of the data, e.g.:
* find an element using the index field can use binary search, rather than a scan
* joining data on the index field can be done using a "two-finger join" in a linear scan of two tables without sorting (data are presorted)

## RB this doesn't work the way I want. I will try to fix and present later.


### Why do indexes matter?

Understanding the structure of the can lead to massive performance differences.
We can create an example in which seemingly identical queries perform 
differently because of an implicit index structure.

We create 100 files each with 100000 entries with two fields 'A' and 'B'.  'A' contains an integer that identifies the file number (0-99). 'A' is the same in each file. 'B' contains a sequence of numbers 0-1000000 in each file. These data:
  * are too big to fit in memory
  * have one field that will be identical in each pandas dataframe

In [None]:
df = dd.read_csv('/tmp/csv*.csv')
df.head
df.compute()

Now let's compare how long it takes to sum all the elements grouped by each key for both columns.  In both cases, we touch all of the data.  But, the sums in when we <code>groupby</code> 'A' can all be evaluated in one chunk.  When we <code>groupby</code> 'B', partial sums at each value must be aggregated across all chunks.

In [None]:
%time df.groupby('A').sum().compute()

In [None]:
# RB for this to run you must create data at bottom of file.
df = dd.read_csv('/tmp/csv*.csv')
%time df.groupby('B').sum().compute()

A couple of observations:
- Dask is parallelizing the computation across multiple nodes.
  - notice the difference between wall time and CPU times.
- this pattern is much less efficient
  - it has interference between different parallel chunks
  - sending the data from node to node
  
Now setting the index and compare:

In [None]:
df = dd.read_csv('/tmp/csv*.csv')
df.set_index('A')
%time df.groupby('A').sum().compute()

In [None]:
df = dd.read_csv('/tmp/csv*.csv')
df.set_index('B')
%time df.groupby('B').sum().compute()

So, it turns out the the index ends up being not that helpful. It doesn't localize any computation in this specific example, i.e. both versions end up using the same amount of data.

### This doesn't work the way I want.  Warning.

I tried to get the index to accelerate computation. But, I can't get it to work and I can't get Dask to reshuffle the data. 

In [None]:
df = dd.read_csv('./data/nycflight/*.csv',
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})

In [None]:
dayflights = df[df.Date=='1994-05-21'].compute()
dayflights

In [None]:
df = dd.read_csv('./data/nycflight/*.csv',
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
%time dayflights = df[df.Date=='1994-05-21'].compute()


In [None]:
df = dd.read_csv('./data/nycflight/*.csv',
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
df.set_index('Date')
%time dayflights = df.loc['1994-05-21'].compute()

This are big performance differences that come from both:
  * the organization of the data
  * dask knowing about the organization of the data

### Parting Thoughts

Limitations and comments:
  * dask does not have a general sort capability
    * but, can be accomplished with set_index
    * shuffle is inefficient, use a different engine
  * dask does not support row indexing by phycial offset
    * this is not an important feature

### Generate data

Uncomment and run this once to make data.

In [None]:
import csv
for i in range(100):
    with open(f'/tmp/csv{i}.csv', 'w', newline='') as csvfile:
        csvw = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        csvw.writerow(['A','B'])
        for j in range(500000):
            csvw.writerow([i,j])