Behind the curtain, what Dask dataframes do is to coordinate many Pandas dataframes, partitioned along an index. The figure below demonstrates this:

<img src="dask_dataframe.svg" width="30%" alt="Dask dataframes are blocked Pandas dataframes">

In doing so, they support a large subset of the Pandas api and hence we use them almost as if we use Pandas dataframes. This is something admirable. Big data world has a lot of technologies that use their own apis and syntax. Learning them is a main obstacle in adopting these technologies. Dask's approach in this respect is something welcomed as many data scientists are already familiar with the Pandas.

**Note that we'll be using Dask installed on our local machines not on a cluster**. In doing so, we'll use multiple CPU cores of our machines to parallelize the dataframe operations. Remember that all the codes we'll write here should also run in a clustered environment with a proper configuration.

# Working with Dask dataframes

We'll start using Dask dataframes by loading a csv formatted file. But before that, we start a Dask client. As we mentioned in the previous checkpoint, starting a client isn't necessary when working with Dask on our local computer but still it provides us a dashboard where we can investigate our computations.

The link to the dashboard will become visible when you create the client below. We suggest opening the dashboard and observing what happens when running the Dask code. It's a good way of learning what is going on.

In [1]:
import warnings
warnings.filterwarnings("ignore")

from dask.distributed import Client, progress

client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:50164  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


# Loading data into Dask dataframes

We first load a dataset that's relatively large on a Dask dataframe. The dataset we'll use is the [Credit Card Fraud Detection](https://www.kaggle.com/mlg-ulb/creditcardfraud) dataset from Kaggle. It contains financial transactions along with an identifier whether a transaction is fraud or not. The dataset includes several features which are the principal components of the original data. Since the data is confidential, the authors of the data prefers to put only the principal components instead of the real variables. Since our focus here is to demonstrate the capabilities of Dask dataframes, this isn't something problematic for our purposes.

We start by importing the Dask dataframes as:

```python
import dask.dataframe as dd
```

Then we load the csv file using the `.read_csv()` method of Dask dataframe. Notice that this is the identically named counterpart of the `.read_csv()` method of the Pandas dataframe:

In [4]:
# Dataframes implement the Pandas API
import dask.dataframe as dd

# This loads the data into Dask dataframe
df = dd.read_csv('https://tf-assets-prod.s3.amazonaws.com/tf-curric/data-science/creditcard.csv')

When working with Dask, we should always be aware of the fact that Dask objects are evaluated lazily. That is to say, unlike Pandas, when we load a dataset to a Dask dataframe, the data will not be available until we call `.compute()` function. For example, the code below would return just the names of the columns and their data types:

In [5]:
print(df)

Dask DataFrame Structure:
                Time       V1       V2       V3       V4       V5       V6       V7       V8       V9      V10      V11      V12      V13      V14      V15      V16      V17      V18      V19      V20      V21      V22      V23      V24      V25      V26      V27      V28   Amount  Class
npartitions=3                                                                                                                                                                                                                                                                                   
               int64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  float64  int64
                 ...      ...      ...      ...      ...      ...      ...      ...      ...      ...      

We can also see the data types of the columns by calling the `.dtypes` attribute:

In [6]:
df.dtypes

Time        int64
V1        float64
V2        float64
V3        float64
V4        float64
V5        float64
V6        float64
V7        float64
V8        float64
V9        float64
V10       float64
V11       float64
V12       float64
V13       float64
V14       float64
V15       float64
V16       float64
V17       float64
V18       float64
V19       float64
V20       float64
V21       float64
V22       float64
V23       float64
V24       float64
V25       float64
V26       float64
V27       float64
V28       float64
Amount    float64
Class       int64
dtype: object

Dask dataframes provide many similar methods to Pandas dataframes. As an example, let's call the `.head()` function and see what it returns:

In [7]:
df.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


We observe some data as the result of the `.head()` function. Then, what about the lazy evaluation? The reason is that some operations on Dask dataframes will automatically display the data, yet some operations do not. For example, the `.describe()` method would return nothing:

In [8]:
df.describe()

Unnamed: 0_level_0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Instead, we should call `.compute()` to evaluate it as follows:

In [9]:
df.describe().compute()

ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+--------+---------+----------+
| Column | Found   | Expected |
+--------+---------+----------+
| Time   | float64 | int64    |
+--------+---------+----------+

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'Time': 'float64'}

to the call to `read_csv`/`read_table`.

Alternatively, provide `assume_missing=True` to interpret
all unspecified integer columns as floats.

Oops! We got an error. That's because Dask tried to infer the data types of the columns it loads. However, Dask doesn't consider all the values in each column when deciding the data types due to the lazy evaluation. Instead, it randomly sample a subset of the columns and guess what their data types are. In case we encounter these type of situations, we need to specify the column types manually. Moreover, setting the `assume_missing` parameter of the `read_csv()` function to `True` would also be helpful in some situations as it informs Dask that all integer columns that aren’t specified in `dtype` parameter are assumed to contain missing values, and are converted to floats.

Now, let's read our dataset again but specify the type of the `Time` columns as float64:

In [10]:
df = dd.read_csv('https://tf-assets-prod.s3.amazonaws.com/tf-curric/data-science/creditcard.csv', dtype={'Time': 'float64'})

We can now run the `.describe()` method and `.compute()` to see the descriptive statistics:

In [11]:
df.describe().compute()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
count,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,...,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0
mean,94813.859575,6.003539e-16,6.386744e-18,-3.078411e-15,2.484443e-15,3.832046e-16,1.542399e-15,-1.341216e-15,1.532819e-16,-1.362372e-15,...,3.5127090000000003e-17,1.119277e-15,3.153455e-16,4.394479e-15,1.07936e-15,1.743781e-15,-3.506722e-16,-1.003916e-16,88.349619,0.001727
std,47488.145955,1.958696,1.651309,1.516255,1.415869,1.380247,1.332271,1.237094,1.194353,1.098632,...,0.734524,0.7257016,0.6244603,0.6056471,0.5212781,0.482227,0.4036325,0.3300833,250.120109,0.041527
min,0.0,-56.40751,-72.71573,-48.32559,-5.683171,-113.7433,-26.16051,-43.55724,-73.21672,-13.43407,...,-34.83038,-10.93314,-44.80774,-2.836627,-10.2954,-2.604551,-22.56568,-15.43008,0.0,0.0
25%,49346.0,-0.78686,-0.5536365,-0.7156553,-0.7067841,-0.4093779,-0.6546632,-0.4787135,-0.1339191,-0.5477089,...,-0.2256367,-0.525053,-0.1301482,-0.3241623,-0.2218416,-0.2821043,-0.06089022,-0.02762231,6.84,0.0
50%,94872.0,0.09301532,0.08999754,0.1743346,0.1809941,0.1480477,-0.1649056,0.1590763,0.07790153,0.02583745,...,0.00663631,0.1241855,0.05160336,0.06794774,0.1674311,-0.001290092,0.0108823,0.0234504,24.99,0.0
75%,165830.0,1.984826,0.8916958,1.373682,1.012508,0.8613822,0.4775488,0.7334868,0.3706835,0.7092123,...,0.2385322,0.7308589,0.2346538,0.5274699,0.4190202,0.293847,0.1060637,0.08061929,84.91,0.0
max,172792.0,2.45493,22.05773,9.382558,16.87534,34.80167,73.30163,120.5895,20.00721,15.59499,...,27.20284,10.50309,22.52841,4.584549,7.519589,3.517346,31.6122,33.84781,25691.16,1.0


Voila!

# Using Dask as if it's Pandas

Next, we demonstrate how we can do some commonly used Pandas dataframe operations on Dask dataframes. In a nutshell, most of the common Pandas operations operate almost identically on Dask dataframes. The major difference is to call `.compute()` to evaluate the result when doing operations on Dask dataframes.

## Filtering the data

One of the most common scenarios when using Pandas is to do filtering on a dataframe. Let's filter our Dask dataframe. Specifically, we want to filter only those transactions that are larger than 10000 US dollars:

In [12]:
df2 = df[df["Amount"] > 10000]
df2

Unnamed: 0_level_0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
npartitions=3,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


As usual, nothing returned. This is because we need to call `.compute()`:

In [13]:
df[df["Amount"] > 10000].compute()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
46841,42951.0,-23.712839,-42.172688,-13.320825,9.925019,-13.945538,5.564891,15.710644,-2.844253,-1.580725,...,7.9216,-6.32071,-11.310338,0.404175,-4.547278,-1.577118,-2.357385,2.253662,12910.93,0
54018,46253.0,-21.780665,-38.30531,-12.122469,9.752791,-12.880794,4.256017,14.785051,-2.818253,-0.667338,...,7.437478,-5.619439,-10.547038,0.653249,-4.232409,-0.480459,-2.257913,2.082488,11898.09,0
58465,48401.0,-36.80232,-63.344698,-20.645794,16.715537,-20.672064,7.694002,24.956587,-4.730111,-2.687312,...,11.455313,-10.933144,-17.173665,1.1807,-7.025783,-2.53433,-3.602479,3.450224,19656.53,0
30325,95286.0,-34.549296,-60.464618,-21.340854,16.875344,-19.229075,6.335259,24.422716,-4.964566,0.188912,...,11.50258,-9.499423,-16.513186,0.744341,-7.081325,-2.604551,-3.550963,3.250802,18910.0,0
48486,119713.0,-20.924897,-37.943452,-14.060281,10.473005,-10.866639,6.256654,14.960521,-2.392155,-0.597076,...,6.82981,-6.926353,-9.928657,-0.447084,-4.848151,-2.24162,-2.140723,2.001492,11789.84,0
32983,166198.0,-35.548539,-31.850484,-48.325589,15.304184,-113.743307,73.301626,120.589494,-27.34736,-3.872425,...,-21.62012,5.712303,-1.581098,4.584549,4.554683,3.415636,31.612198,-15.430084,25691.16,0
42461,172273.0,-9.030538,-11.112584,-16.233798,3.592021,-40.427726,23.917837,44.054461,-7.277778,-4.210637,...,-0.269048,0.988144,7.040028,0.347693,2.520869,2.342495,3.478175,-2.713136,10199.44,0


If you were to look at the Dask dashboard during the above computation, you would see something like the following on the "Workers" tab:

![dask dashboard](dask_dashboard.png)

We can also use the `.loc` and `.iloc` functions on Dask dataframes. Again, we shouldn't forget to call the `.compute()` method:

In [14]:
df.loc[df["Amount"] > 10000,:].compute()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
46841,42951.0,-23.712839,-42.172688,-13.320825,9.925019,-13.945538,5.564891,15.710644,-2.844253,-1.580725,...,7.9216,-6.32071,-11.310338,0.404175,-4.547278,-1.577118,-2.357385,2.253662,12910.93,0
54018,46253.0,-21.780665,-38.30531,-12.122469,9.752791,-12.880794,4.256017,14.785051,-2.818253,-0.667338,...,7.437478,-5.619439,-10.547038,0.653249,-4.232409,-0.480459,-2.257913,2.082488,11898.09,0
58465,48401.0,-36.80232,-63.344698,-20.645794,16.715537,-20.672064,7.694002,24.956587,-4.730111,-2.687312,...,11.455313,-10.933144,-17.173665,1.1807,-7.025783,-2.53433,-3.602479,3.450224,19656.53,0
30325,95286.0,-34.549296,-60.464618,-21.340854,16.875344,-19.229075,6.335259,24.422716,-4.964566,0.188912,...,11.50258,-9.499423,-16.513186,0.744341,-7.081325,-2.604551,-3.550963,3.250802,18910.0,0
48486,119713.0,-20.924897,-37.943452,-14.060281,10.473005,-10.866639,6.256654,14.960521,-2.392155,-0.597076,...,6.82981,-6.926353,-9.928657,-0.447084,-4.848151,-2.24162,-2.140723,2.001492,11789.84,0
32983,166198.0,-35.548539,-31.850484,-48.325589,15.304184,-113.743307,73.301626,120.589494,-27.34736,-3.872425,...,-21.62012,5.712303,-1.581098,4.584549,4.554683,3.415636,31.612198,-15.430084,25691.16,0
42461,172273.0,-9.030538,-11.112584,-16.233798,3.592021,-40.427726,23.917837,44.054461,-7.277778,-4.210637,...,-0.269048,0.988144,7.040028,0.347693,2.520869,2.342495,3.478175,-2.713136,10199.44,0


## Aggregating with group by

Another common operation on dataframes is to aggregate some columns after applying group by on a categorical column. To demonstrate this, let's group by our dataframe with respect to the  `Class` column and see whether the average `Amount` differs between frauds and legal transactions:

In [15]:
df2 = df.groupby("Class")["Amount"].mean().compute()
df2

Class
0     88.291022
1    122.211321
Name: Amount, dtype: float64

It seems that fraud transactions are higher than the other on average.

## Using apply() function

A good use case for parallel processing is to use the `.apply()` function. If we use `.apply()` on a column to operate value by value, distributing the workload across many cores would speed up the calculations. As a toy example, say that we want to transform the `Amount` column such that amounts that are higher than 10000 US dollars will be 1 and the amounts that are lower than and equal to 10000 US dollars will be 0:

In [16]:
df["Amount2"] = df["Amount"].apply(lambda x: 1 if x>10000 else 0, meta=('Amount2', 'int64'))
df["Amount2"].head(20)

0     0
1     0
2     0
3     0
4     0
5     0
6     0
7     0
8     0
9     0
10    0
11    0
12    0
13    0
14    0
15    0
16    0
17    0
18    0
19    0
Name: Amount2, dtype: int64

This type of operations lay itself to parallelization well, because in the example above, the `.apply()` function operates row by row and each operation is independent to each other. Hence, rows can be distributed over several cores or even machines.

# Persisting data in memory

So far, we saw that we can load large amounts of data into Dask dataframes and do parallel computations on them. However, Dask can also be convenient even if we have a sufficient amount of RAM in our computer. If we have the available memory for a dataset, then we can persist the data in memory and take advantage of the memory speed. Dask still will be useful when parallelizing the operations. If we do this, all of the future computations on the persisted dataframe will be much faster.

In order to persist a dataframe to memory, we just call `.persist()` method of the Dask dataframes:

In [17]:
df = df.persist()

# Drawbacks of Dask dataframe

As we saw in this checkpoint, working with Dask dataframes is very similar to working with Pandas. That being said, we should be aware of the fact that `Dask.dataframe` package only covers a small but well-used portion of the Pandas api. This because of the following two reasons:

1.  The Pandas api is *huge* and Dask still has a way to go in order to increase its coverage.
2.  Some operations are hard to do in parallel like sorting. Hence, not all of the functionalities of Pandas will be on the Dask dataframes.

Additionally, some important operations like ``set_index`` work, but it works slower than Pandas because sorting includes substantial shuffling of the data. Since Dask stores the data in a Dask dataframe in several Pandas dataframes, sometimes it needs to write out to disk in order the coordinate the sorting operation. This causes an overhead.

# Closing the connection

We're done with the Dask for this checkpoint. So, we can close our connection:

In [15]:
client.close()