# Dask DataFrame

 - Dask is a flexible library for parallel computing using Python.
 - Besides other powerful tools, we'll focus on Dask DataFrame. 
 - It reuses a lot of pandas code but it extends it to a larger scale. 
 - A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along an index


In [None]:
from IPython.display import Image
from IPython.core.display import HTML 

In [None]:
Image(url="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg",width=400)

### Reading the data - National Food Survey UK 

The National Food Survey (NFS), which closed in 2000, was the longest-running continuous survey of household food consumption and expenditure in the world. It was originally set up in 1940 by the then Ministry of Food to monitor the adequacy of the diet of urban 'working class' households in wartime, but it was extended in 1950 to become representative of households throughout Great Britain (the UKDA holds NFS data from 1974-2000 only). 
- Information: https://www.gov.uk/government/statistics/family-food-open-data
- Source: https://uofi.box.com/s/v3pchnfl20i8qw3qwur42eji9f851aak
We have a file that contains all the .csv files that we'll read. __FIRST__: Download that data and put it somewhere you know where it is! 

In [None]:
pwd #check current directory

In [None]:
cd "C:\Users\Hanna Willwerth\Box\nfs"

In [None]:
pwd #check current directory to ensure it changed

In [None]:
import pandas as pd

In [None]:
df = pd.read_csv('NFS_2000.csv')
df

 ### Some variables from the dataset
 - hhno : household id
 - styr : year column
 - minfd : identifier of food category
 - cq: consumption
 - memhh : number of members of the household

As usual, we can do pandas operations... 

In [None]:
df.groupby('minfd').minfd.count()

In [None]:
df.groupby('minfd').cq.mean()

## What can we do with Dask ?

Most of the functions are part of pandas dataframe API: https://docs.dask.org/en/latest/dataframe-api.html

Some examples of parallelizable operations (fast):

- Elementwise operations: `df.x + df.y, df * df`
- Row-wise selections: `df[df.x > 0]`
- Loc: `df.loc[4.0:10.5]`
- Common aggregations: `df.x.max(), df.max()`
- Is in: `df[df.x.isin([1, 2, 3])]`


In [None]:
from dask.distributed import Client, progress
client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client

In [None]:
import dask
import dask.dataframe as dd

In [None]:
# Notice that we are reading ALL csv files. 
df = dd.read_csv("NFS*.csv")

Notice that:
- Nothing yet is loaded in to memory
- Meta-information from pandas is available

In [None]:
df

Notice that ```npartitions=27``` is the number of csv files (and years in this case) that we have for the data

In [None]:
# DataFrame.head is one operation that is not lazy
df.head(5)

- styr : year column
- minfd : identifier of food category

In [None]:
df.cq.mean() 

What is up with this?? Nothing happened? We need to add the ```.compute()``` to the end of the code to actually execute the action using Dask

In [None]:
df.cq.mean().compute()

### What did we just do??
We calculated the average of a column acros 27 large CSV files wihtout having to read them all into memory and concatenate them

###  Partitions

- By default the data is partitioned by each file
- In our case, this is good. The files have a natural partition


In [None]:
df.npartitions
# Number of. CSV files

#### Each partition is just a pandas DataFrame

In [None]:
df.partitions[5].compute()

In [None]:
type(df.partitions[5].compute())

In [None]:
df.known_divisions

In [None]:
df = df.set_index('styr', sorted=True)

We know that each CSV is divided up by year, but Dask does not know that and will not unless we tell it explicitly. So that is what we do when we `df.set_index('styr',sorted=True)`

In [None]:
df.known_divisions

In [None]:
df.head()

In [None]:
df.info()


In [None]:
df.divisions

## Fast access to subsets of data

In [None]:
df.loc[2000]

However, this IS a lazy operation. So, we need to add `compute()`.

In [None]:
df.loc[2000].compute() #here we are selecting the index '2000' 
#which is the year 2000

In [None]:
len(df) #This is how big the whole data set is. Over 833,000

In [None]:
df.groupby('minfd').minfd.count().nlargest(10).compute() 
# Remember 'minfd' is the food category 
#aggregation API is fast

In [None]:
minfd = 401

In [None]:
import pandas as pd
food_mapping = pd.read_csv("food_mapping.csv", index_col='minfd')

In [None]:
food_mapping

#### What was the most consumed food group in 1974? 
#### What was the most consumed food group overall?

In [None]:
df_1974 = df.loc[1974]

In [None]:
top_74 = df_1974.minfd.value_counts().nlargest(10).compute()

In [None]:
top_74

In [None]:
top_74df=top_74.to_frame()

In [None]:
#These are the top consumed Food categories in 1974
top_74df_merged=top_74df.merge(food_mapping, how='left', right_index=True, left_index=True)\
.sort_values(by='minfd', ascending=False)
top_74df_merged
#This is great but it feels like a lot of uneeded info.

In [None]:
top_74df_merged[['minfd', 'fd gp description']]

In [None]:
top = df.minfd.value_counts().nlargest(10).compute()

In [None]:
top

In [None]:
topdf=top.to_frame()

In [None]:
#These are the top consumed Food categories in 1974
topdf_merged=topdf.merge(food_mapping, how='left', right_index=True, left_index=True)\
.sort_values(by='minfd', ascending=False)
topdf_merged[['minfd', 'fd gp description']]

### map_partitions
- If we have a code that works well on a single data frame and we want to apply an "embarransingly parallel way" across many pandas data frames that live inside the `dask` data frame. 
- Map partitions does what you might expect
- Maps a function across partitions

In [None]:
df.map_partitions(len).compute()

### Let's calculate the most frequently purchase food group for each year

In [None]:
def most_frequent_food(partition):
    # partition is a pandas.DataFrame
    minfd = partition.minfd.value_counts().nlargest(1).index[0]
    description = food_mapping.loc[minfd].minfddesc.iloc[0]
    year = int(partition.index[0])
    return pd.DataFrame({'year': [year], 'description': [description]})

In [None]:
mnfd_year = df.map_partitions(most_frequent_food, 
                              meta={'year': int,
                                    'description': str})

In [None]:
mnfd_year.compute()
