### Dask DataFrames

Similar to the design of arrays, dask dataframes are paritioned pandas dataframes. Computations run pandas operators on the chunks and aggregate results from the chunked operations.

<img src="https://upload.wikimedia.org/wikipedia/commons/8/81/Dask-dataframe.svg" width="256" title="    https://upload.wikimedia.org/wikipedia/commons/8/81/Dask-dataframe.svg" />

For the most part, dask has tried to implement all of pandas, but there are some inefficient operations that it does not support.

Let's load up a dataframe and see what we get.  This is the NYC flight data used in the dask tutorial.

In [2]:
import dask.dataframe as dd
df = dd.read_csv('./data/nycflight/*.csv',
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
df

Unnamed: 0_level_0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1
,int64,int64,int64,int64,float64,int64,float64,int64,string,int64,string,float64,float64,float64,float64,float64,string,string,float64,float64,float64,bool,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [20]:
df.tail()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
269176,1999,12,27,1,1645.0,1645,1830.0,1901,UA,1753,...,205.0,-31.0,0.0,LGA,DEN,1619.0,7.0,13.0,False,0
269177,1999,12,28,2,1726.0,1645,1928.0,1901,UA,1753,...,214.0,27.0,41.0,LGA,DEN,1619.0,5.0,23.0,False,0
269178,1999,12,29,3,1646.0,1645,1846.0,1901,UA,1753,...,220.0,-15.0,1.0,LGA,DEN,1619.0,5.0,15.0,False,0
269179,1999,12,30,4,1651.0,1645,1908.0,1901,UA,1753,...,233.0,7.0,6.0,LGA,DEN,1619.0,5.0,19.0,False,0
269180,1999,12,31,5,1642.0,1645,1851.0,1901,UA,1753,...,232.0,-10.0,-3.0,LGA,DEN,1619.0,6.0,11.0,False,0


In [21]:
df.head()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990,1,1,1,1621.0,1540,1747.0,1701,US,33,...,,46.0,41.0,EWR,PIT,319.0,,,False,0
1,1990,1,2,2,1547.0,1540,1700.0,1701,US,33,...,,-1.0,7.0,EWR,PIT,319.0,,,False,0
2,1990,1,3,3,1546.0,1540,1710.0,1701,US,33,...,,9.0,6.0,EWR,PIT,319.0,,,False,0
3,1990,1,4,4,1542.0,1540,1710.0,1701,US,33,...,,9.0,2.0,EWR,PIT,319.0,,,False,0
4,1990,1,5,5,1549.0,1540,1706.0,1701,US,33,...,,5.0,9.0,EWR,PIT,319.0,,,False,0


The dataframe has 10 chunks (npartitions) that correspond to the ten files that were read.  Dataframes have two key properties:
  * they are tabular (two-dimensional) data
  * each column has a datatype defined by a _schema_
  
The programmer is encouraged to think of this as "spreadsheet or SQL table".  It is reasonable to call the data __structured__.
  * this is in contrast to semi-structured data which has tags and hierarchy, but does not enforce types. Examples are XML and JSON.
  * in database parlance, this is flat data model.

### Data Slicing and Aggregation

The most basic operations in a database and in dask is to <code>select</code> rows and <code>project</code> columns.

#### Selecting rows

Find all flights flown by a specific plane, identified by <code>TailNum</code>

In [22]:
df2 = df[df.TailNum=='N516UA'].compute()
df2

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
2295,1995,1,26,4,1822.0,1815,2147.0,2129,UA,71,...,359.0,18.0,7.0,EWR,SFO,2565.0,7.0,19.0,False,0
2304,1995,1,4,3,1757.0,1715,2103.0,2029,UA,73,...,337.0,34.0,42.0,EWR,SFO,2565.0,6.0,23.0,False,0
2349,1995,1,18,3,1049.0,1050,1348.0,1404,UA,75,...,330.0,-16.0,-1.0,EWR,SFO,2565.0,4.0,25.0,False,0
2351,1995,1,20,5,1119.0,1050,1416.0,1404,UA,75,...,337.0,12.0,29.0,EWR,SFO,2565.0,10.0,10.0,False,0
2369,1995,1,7,6,700.0,700,1041.0,1019,UA,79,...,368.0,22.0,0.0,EWR,SFO,2565.0,7.0,26.0,False,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
248678,1999,12,16,4,701.0,700,1019.0,1024,UA,853,...,337.0,-5.0,1.0,EWR,SFO,2565.0,22.0,19.0,False,0
248680,1999,12,18,6,659.0,700,1033.0,1024,UA,853,...,363.0,9.0,-1.0,EWR,SFO,2565.0,19.0,12.0,False,0
269006,1999,12,5,7,820.0,820,1024.0,1043,UA,329,...,214.0,-19.0,0.0,LGA,DEN,1619.0,14.0,16.0,False,0
269094,1999,12,1,3,616.0,620,814.0,832,UA,875,...,215.0,-18.0,-4.0,LGA,DEN,1619.0,7.0,16.0,False,0


#### Projecting Columns

Build a dataframe that describes the plane ('TailNum') and route ('FlightNum')

In [23]:
routes = df[['FlightNum','TailNum']].compute()

routes

Unnamed: 0,FlightNum,TailNum
0,33,
1,33,
2,33,
3,33,
4,33,
...,...,...
269176,1753,N516UA
269177,1753,N504UA
269178,1753,N592UA
269179,1753,N575UA


### Aggregating Data

A common data science inquiry is to query an aggregate (mean, min, max, sum, etc.) in a group.  This is done with a <code>groupby</code> query. The pattern is to construct a grouping and then aggregate over the grouping.

__Query:__ How many different flights were flown by each plane?

In [24]:
#routes.groupby('TailNum').FlightNum.count()
routes.groupby('TailNum')['FlightNum'].count()

TailNum
'144DA        1
112          10
A367NW       39
EI-BWD      638
EI-CAL       91
          ...  
NEIDLA      125
NN7324       74
NOZ1AA       27
NXXXXX        4
UNKNOW    33663
Name: FlightNum, Length: 3712, dtype: int64

I have intentionaly mixed syntax. Dataframes in both R and Python use two form of syntax interchangeably.
  * <code>dataframe.columnName</code>
  * <code>dataframe['columnName']

Many functions only accept the bracketed indexing of columns.
    
__Query:__ How many times was each flight flown?

In [25]:
routes.groupby('FlightNum')['TailNum'].count()

FlightNum
1       3215
2          0
3       3744
4          0
5       3218
        ... 
9607       1
9608       1
9777       2
9851       0
9899       2
Name: TailNum, Length: 2405, dtype: int64

__Query:__ What were the most routes flown by a single plane?

In [26]:
routes.groupby('TailNum')['FlightNum'].count().max()

33663

__Query:__ What is the maximum number of planes to fly a single route?

In [27]:
routes.groupby('FlightNum')['TailNum'].count().max()

6343

But, these aggregates are not really the questions we want answered. More natural questions are awkward.
  * What route was flown by the most planes?
  * What plane flew the most routes?

In [28]:
routes.groupby('FlightNum').TailNum.count().idxmax()

305

In [29]:
routes.groupby('TailNum').FlightNum.count().idxmax()

'UNKNOW'

This reveals problems with the data.  So, let's look for an actual plane.

In [30]:
routes[routes.TailNum != 'UNKNOW'].groupby('TailNum').FlightNum.count().idxmax()

'N413DA'

and ask for how many flights it has flown

In [31]:
routes[routes.TailNum != 'UNKNOW'].groupby('TailNum').FlightNum.count().max()

4844

In [32]:
type(routes)

pandas.core.frame.DataFrame

and verify that this is the right answer.

In [33]:
routecount = routes.groupby('TailNum').FlightNum.count()
routecount.get('N413DA')

4844

In [34]:
type(routecount)

pandas.core.series.Series

We have uncovered what I think is the most annoying part of dask and pandas dataframes. Aggregate functions return series, which are not dataframes. They have different methods.  I would have preferred to have written:

<code>routecount[routecount.TailNum=='N413DA'].compute()</code>

But, that's dataframe syntax not series syntax.  Aggregates assume that the output is small and return pandas series.

In [35]:
type(routecount)

pandas.core.series.Series

### Indexes

All dataframes have a __default index__.  In this case, the index was generated when we loaded the data and is the row number in the pandas dataframe. Surprisingly, the index is not unique.  The same index value appears in each pandas dataframe.

In [36]:
print(df.index, "\n")
print("Number of rows in the database\n", len(df))
maxindex = df.index.nunique().compute()
print("Number of unique values in the index\n", maxindex)

# find all entries with index value 22000
df.loc[22000].compute()

<dask_expr.expr.Index: expr=Index(frame=ReadCSV(a8c69cb))> 

Number of rows in the database
 2611892
Number of unique values in the index
 271539


Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
22000,1990,1,17,3,1849.0,1850,1946.0,1945,CO,828,...,,1.0,-1.0,EWR,BDL,116.0,,,False,0
22000,1991,2,28,4,2042.0,2040,2154.0,2156,US,458,...,,-2.0,2.0,LGA,BUF,292.0,,,False,0
22000,1992,2,23,7,1302.0,1305,1419.0,1416,US,132,...,,3.0,-3.0,EWR,BUF,282.0,,,False,0
22000,1993,2,12,5,2258.0,2029,5.0,2210,US,117,...,,115.0,149.0,LGA,CLE,418.0,,,False,0
22000,1994,2,25,5,712.0,700,806.0,753,US,419,...,,13.0,12.0,LGA,PHL,96.0,,,False,0
22000,1995,1,31,2,1825.0,1800,1943.0,1920,CO,323,...,37.0,23.0,25.0,EWR,DCA,199.0,3.0,38.0,False,0
22000,1996,2,7,3,1216.0,1215,1343.0,1338,US,1419,...,65.0,5.0,1.0,LGA,PIT,335.0,9.0,13.0,False,0
22000,1997,1,31,5,747.0,745,1026.0,1003,CO,133,...,239.0,23.0,2.0,EWR,DEN,1605.0,7.0,33.0,False,0
22000,1998,2,19,4,2031.0,2025,2146.0,2151,CO,1978,...,44.0,-5.0,6.0,EWR,PWM,284.0,3.0,28.0,False,0
22000,1999,2,17,3,1511.0,1450,1830.0,1820,HP,2035,...,290.0,10.0,21.0,EWR,PHX,2133.0,4.0,25.0,False,0


In [45]:
df.index.head()

RangeIndex(start=0, stop=5, step=1)

The term index is used to mean many things in computer science:
* the offset of an element in a data structure, i.e. the array index
* a data structure used for fast access in a data store

When I heard index in dask, I expected it to be the latter. It is not. By default, it is only the row number in the local pandas data frame.


My concept of an index comes from relational databases: "A database index is a data structure that improves the speed of data retrieval operations on a database table at the cost of additional writes and storage space to maintain the index data structure."

Real indexes come in many forms, but are most commonly:

* hash tables -- organize the data by the hash value of a key field for constant time $O(1)$ lookup by key.

<img src="https://uploads-ssl.webflow.com/632c5ceb41133222d120b86e/632f1efd49442a2371a6bb8a_hash-function.png" width=256 />

* B+-trees/sorted -- sort the data in a tree to lookup a key in $O(\log n)$ time and be able to scan sequential keys.

<img src="https://miro.medium.com/v2/resize:fit:1400/format:webp/1*brGYM6uwjUmAaGLs1QKKxw.png" width=386 />

This is not really a topic for this class. Although, indexing is one of my favorite parts of computer science.

A **simulacrum** of indexing can be accomplished in dask by calling <code>set_index</code>, which often results in a global shuffle of the data.  It can be very expensive. After an index is set, all of the data are sorted (across all partitions). This makes it much more efficient to conduct some database like operations, `join()`, `groupby()`, etc.

### Why do indexes matter?

Understanding the structure of the can lead to massive performance differences.
We can create an example in which seemingly identical queries perform 
differently because of an implicit index structure.

We create 100 files each with 100000 entries with two fields 'A' and 'B'.  'A' contains an integer that identifies the file number (0-99). 'A' is the same in each file. 'B' contains a sequence of numbers 0-1000000 in each file. These data:
  * are too big to fit in memory
  * have one field that will be identical in each pandas dataframe

To generate the data, run the cell **at the end** of the workbook once.

In [67]:
df = dd.read_csv('/tmp/csv*.csv')
df.head
df.compute()

Unnamed: 0,A,B
0,0,0
1,0,1
2,0,2
3,0,3
4,0,4
...,...,...
499995,99,499995
499996,99,499996
499997,99,499997
499998,99,499998


Now let's compare how long it takes to sum all the elements grouped by each key for both columns.  In both cases, we touch all of the data.  But, the sums in when we <code>groupby</code> 'A' can all be evaluated in one chunk.  When we <code>groupby</code> 'B', partial sums at each value must be aggregated across all chunks.

In [68]:
%time df.groupby('A').sum().compute()

CPU times: user 5.82 s, sys: 686 ms, total: 6.5 s
Wall time: 2.09 s


Unnamed: 0_level_0,B
A,Unnamed: 1_level_1
74,124999750000
52,124999750000
88,124999750000
93,124999750000
97,124999750000
...,...
15,124999750000
9,124999750000
4,124999750000
44,124999750000


In [69]:
%time df.groupby('B').sum().compute()

CPU times: user 30 s, sys: 8.71 s, total: 38.7 s
Wall time: 11.4 s


Unnamed: 0_level_0,A
B,Unnamed: 1_level_1
52,4950
74,4950
88,4950
93,4950
97,4950
...,...
499955,4950
499961,4950
499963,4950
499966,4950


A couple of observations:
- Dask is parallelizing the computation across multiple nodes.
  - notice the difference between wall time and CPU times.
- summing by `B` is much less efficient
  - it has interference between different parallel chunks
  - sending the data from node to node
  
Now let's set up an index and compare:

In [6]:
df = dd.read_csv('/tmp/csv*.csv')
%time df = df.set_index('A').compute()
df

CPU times: user 18.4 s, sys: 5.44 s, total: 23.9 s
Wall time: 8.2 s


Unnamed: 0_level_0,B
A,Unnamed: 1_level_1
0,0
0,1
0,2
0,3
0,4
...,...
99,499995
99,499996
99,499997
99,499998


In [71]:
%time df.groupby('A').sum()

CPU times: user 478 ms, sys: 362 ms, total: 840 ms
Wall time: 835 ms


Unnamed: 0_level_0,B
A,Unnamed: 1_level_1
0,124999750000
1,124999750000
2,124999750000
3,124999750000
4,124999750000
...,...
95,124999750000
96,124999750000
97,124999750000
98,124999750000


In [4]:
df = dd.read_csv('/tmp/csv*.csv')
%time df = df.set_index('B').compute()
df

CPU times: user 23.2 s, sys: 7.65 s, total: 30.9 s
Wall time: 15.9 s


Unnamed: 0_level_0,A
B,Unnamed: 1_level_1
0,1
0,45
0,74
0,8
0,18
...,...
499999,2
499999,7
499999,11
499999,50


In [5]:
%time df.groupby('B').sum()

CPU times: user 966 ms, sys: 117 ms, total: 1.08 s
Wall time: 1.08 s


Unnamed: 0_level_0,A
B,Unnamed: 1_level_1
0,4950
1,4950
2,4950
3,4950
4,4950
...,...
499995,4950
499996,4950
499997,4950
499998,4950


We observe that computing aggregates or building indexes within partitions `A` is faster than across partitions `B`, owing to interference

It only makes sense to build an index if you are going to use it multiple times. In this way, you invest in computing once and benefit over multiple invocations.

Dask has implemented some what I would expect out of indexing:
1. dask can be told that there is existing structure in the data
2. dask reorganizes data to optimize queries

Dask does not support some features that I think it should
1. hash indexing
2. data readers infer the existence of an index

Why is this a simulacrum of an index, and not a real database structure. Because it only sorts the data, if does not build an additional data structure for fast access. The improved performance only comes from the sortedness of the data, e.g.:
* find an element using the index field can use binary search, rather than a scan
* joining data on the index field can be done using a "two-finger join" in a linear scan of two tables without sorting (data are presorted)

### Parting Thoughts

Limitations and comments:
  * dask does not have a general sort capability
    * but, can be accomplished with set_index
    * shuffle is inefficient, use a different engine
  * dask does not support row indexing by phycial offset
    * this is not an important feature

### Generate data

Uncomment and run this once to make data.

In [44]:
import csv
for i in range(100):
    with open(f'/tmp/csv{i}.csv', 'w', newline='') as csvfile:
        csvw = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        csvw.writerow(['A','B'])
        for j in range(500000):
            csvw.writerow([i,j])