# Dask Arrays

<img src="images/dask-array-black-text.svg" 
     align="right"
     alt="Dask arrays are blocked numpy arrays">
     
Dask arrays coordinate many Numpy arrays, arranged into chunks within a grid.  They support a large subset of the Numpy API.

## Start Dask Client for Dashboard

Starting the Dask Client is optional.  It will provide a dashboard which 
is useful to gain insight on the computation.  

The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

In [3]:
from dask.distributed import Client, progress
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:38633  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 4.14 GB


In [2]:
client.close()

## Create Random array

This creates a 10000x10000 array of random numbers, represented as many numpy arrays of size 1000x1000 (or smaller if the array cannot be divided evenly). In this case there are 100 (10x10) numpy arrays of size 1000x1000.

In [2]:
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 800.00 MB 8.00 MB Shape (10000, 10000) (1000, 1000) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


Use NumPy syntax as usual

In [3]:
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z

Unnamed: 0,Array,Chunk
Bytes,40.00 kB,4.00 kB
Shape,"(5000,)","(500,)"
Count,430 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 40.00 kB 4.00 kB Shape (5000,) (500,) Count 430 Tasks 10 Chunks Type float64 numpy.ndarray",5000  1,

Unnamed: 0,Array,Chunk
Bytes,40.00 kB,4.00 kB
Shape,"(5000,)","(500,)"
Count,430 Tasks,10 Chunks
Type,float64,numpy.ndarray


Call `.compute()` when you want your result as a NumPy array.

If you started `Client()` above then you may want to watch the status page during computation.

In [4]:
z.compute()

array([0.9966184 , 1.00112557, 0.99718862, ..., 1.00401453, 0.99628741,
       0.99328764])

## Persist data in memory

If you have the available RAM for your dataset then you can persist data in memory.  

This allows future computations to be much faster.

In [5]:
y = y.persist()

In [6]:
%time y[0, 0].compute()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 5.94 ms


0.7532757803366623

In [7]:
%time y.sum().compute()

CPU times: user 232 ms, sys: 16 ms, total: 248 ms
Wall time: 179 ms


100010844.4288536

## Create Random Dataframe

We create a random timeseries of data with the following attributes:

1.  It stores a record for every 10 seconds of the year 2000
2.  It splits that year by month, keeping every month as a separate Pandas dataframe
3.  Along with a datetime index it has columns for names, ids, and numeric values

This is a small dataset of about 240 MB. Increase the number of days or reduce the frequency to practice with a larger dataset.

In [8]:
import dask
import dask.dataframe as dd
df = dask.datasets.timeseries()

Unlike Pandas, Dask DataFrames are lazy and so no data is printed here.

In [9]:
df

Unnamed: 0_level_0,id,name,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,int64,object,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


But the column names and dtypes are known.

In [4]:
df.dtypes

id        int64
name     object
x       float64
y       float64
dtype: object

Some operations will automatically display the data.

In [5]:
import pandas as pd
pd.options.display.precision = 2
pd.options.display.max_rows = 10

In [6]:
df.head(3)

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,1007,Ursula,-0.28,-0.19
2000-01-01 00:00:01,984,Michael,-0.81,0.83
2000-01-01 00:00:02,966,Tim,0.3,-0.24


## Use Standard Pandas Operations

Most common Pandas operations operate identically on Dask dataframes

In [7]:
df2 = df[df.y > 0]
df3 = df2.groupby('name').x.std()
df3

Dask Series Structure:
npartitions=1
    float64
        ...
Name: x, dtype: float64
Dask Name: sqrt, 157 tasks

Call `.compute()` when you want your result as a Pandas dataframe.

If you started `Client()` above then you may want to watch the status page during computation.

In [23]:
computed_df = df3.compute()
type(computed_df)

pandas.core.series.Series

In [9]:
computed_df

name
Alice      0.58
Bob        0.58
Charlie    0.58
Dan        0.58
Edith      0.58
           ... 
Victor     0.58
Wendy      0.58
Xavier     0.58
Yvonne     0.58
Zelda      0.57
Name: x, Length: 26, dtype: float64

## Persist data in memory

If you have the available RAM for your dataset then you can persist data in memory.  

This allows future computations to be much faster.

In [10]:
df = df.persist()

## Time Series Operations

Because we have a datetime index time-series operations work efficiently

In [11]:
%matplotlib inline

In [12]:
df[['x', 'y']].resample('1h').mean().head()

Unnamed: 0_level_0,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2000-01-01 00:00:00,0.00754,0.00965
2000-01-01 01:00:00,-0.00175,-0.0054
2000-01-01 02:00:00,0.012,0.0111
2000-01-01 03:00:00,0.00253,0.00974
2000-01-01 04:00:00,0.0114,0.00416


In [None]:
df[['x', 'y']].resample('24h').mean().compute().plot()

In [13]:
df[['x', 'y']].rolling(window='24h').mean().head()

Unnamed: 0_level_0,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2000-01-01 00:00:00,-0.28,-0.19
2000-01-01 00:00:01,-0.55,0.32
2000-01-01 00:00:02,-0.27,0.13
2000-01-01 00:00:03,-0.34,0.3
2000-01-01 00:00:04,-0.3,0.23


Random access is cheap along the index, but must still be computed.

In [14]:
df.loc['2000-01-05']

Unnamed: 0_level_0,id,name,x,y
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-05 00:00:00.000000000,int64,object,float64,float64
2000-01-05 23:59:59.999999999,...,...,...,...


In [15]:
%time df.loc['2000-01-05'].compute()

CPU times: user 8 ms, sys: 32 ms, total: 40 ms
Wall time: 62.4 ms


Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-05 00:00:00,983,Ingrid,0.96,0.94
2000-01-05 00:00:01,1040,Frank,-0.14,-0.09
2000-01-05 00:00:02,983,Frank,0.66,0.53
2000-01-05 00:00:03,1023,Norbert,0.20,-0.11
2000-01-05 00:00:04,982,Laura,1.00,-0.62
...,...,...,...,...
2000-01-05 23:59:55,994,George,0.09,0.26
2000-01-05 23:59:56,956,Kevin,-0.75,0.16
2000-01-05 23:59:57,949,Bob,-0.33,-0.72
2000-01-05 23:59:58,989,Ray,0.06,0.96


## Set Index

Data is sorted by the index column.  This allows for faster access, joins, groupby-apply operations, etc..  However sorting data can be costly to do in parallel, so setting the index is both important to do, but only infrequently.

In [16]:
df = df.set_index('name')
df

Unnamed: 0_level_0,id,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Alice,int64,float64,float64
Alice,...,...,...
...,...,...,...
Zelda,...,...,...
Zelda,...,...,...


Because computing this dataset is expensive and we can fit it in our available RAM, we persist the dataset to memory.

In [17]:
df = df.persist()

Dask now knows where all data lives, indexed cleanly by name.  As a result oerations like random access are cheap and efficient

In [22]:
%time df.loc['Alice'].compute()

CPU times: user 28 ms, sys: 4 ms, total: 32 ms
Wall time: 57.1 ms


Unnamed: 0_level_0,id,x,y
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Alice,995,0.09,-0.41
Alice,1002,-0.39,0.83
Alice,1024,0.20,0.02
Alice,984,0.55,0.92
Alice,999,0.48,0.71
...,...,...,...
Alice,992,0.52,0.72
Alice,1040,-0.90,-0.53
Alice,994,-0.20,-0.50
Alice,1020,-0.94,0.92


## Groupby Apply with Scikit-Learn

Now that our data is sorted by name we can easily do operations like random access on name, or groupby-apply with custom functions.

Here we train a different Scikit-Learn linear regression model on each name.

In [19]:
from  sklearn.linear_model import LinearRegression

def train(partition):
    est = LinearRegression()
    est.fit(partition[['x']].values, partition.y.values)
    return est

In [21]:
df.groupby('name').apply(train, meta=object).compute()

name
Alice      LinearRegression(copy_X=True, fit_intercept=Tr...
Bob        LinearRegression(copy_X=True, fit_intercept=Tr...
Charlie    LinearRegression(copy_X=True, fit_intercept=Tr...
Dan        LinearRegression(copy_X=True, fit_intercept=Tr...
Edith      LinearRegression(copy_X=True, fit_intercept=Tr...
                                 ...                        
Victor     LinearRegression(copy_X=True, fit_intercept=Tr...
Wendy      LinearRegression(copy_X=True, fit_intercept=Tr...
Xavier     LinearRegression(copy_X=True, fit_intercept=Tr...
Yvonne     LinearRegression(copy_X=True, fit_intercept=Tr...
Zelda      LinearRegression(copy_X=True, fit_intercept=Tr...
Length: 26, dtype: object

## Further Reading

For a more in-depth introduction to Dask dataframes, see the [dask tutorial](https://github.com/dask/dask-tutorial), notebooks 04 and 07.

## Create Random Dataframe

We create a random timeseries of data with the following attributes:

1.  It stores a record for every 10 seconds of the year 2000
2.  It splits that year by month, keeping every month as a separate Pandas dataframe
3.  Along with a datetime index it has columns for names, ids, and numeric values

This is a small dataset of about 240 MB. Increase the number of days or reduce the frequency to practice with a larger dataset.

In [2]:
import dask
import dask.dataframe as dd
df = dask.datasets.timeseries()

Unlike Pandas, Dask DataFrames are lazy and so no data is printed here.

In [3]:
df

Unnamed: 0_level_0,id,name,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,int64,object,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


But the column names and dtypes are known.

In [4]:
df.dtypes

id        int64
name     object
x       float64
y       float64
dtype: object

Some operations will automatically display the data.

In [5]:
import pandas as pd
pd.options.display.precision = 2
pd.options.display.max_rows = 10

In [6]:
df.head(3)

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,1007,Ursula,-0.28,-0.19
2000-01-01 00:00:01,984,Michael,-0.81,0.83
2000-01-01 00:00:02,966,Tim,0.3,-0.24


## Use Standard Pandas Operations

Most common Pandas operations operate identically on Dask dataframes

In [7]:
df2 = df[df.y > 0]
df3 = df2.groupby('name').x.std()
df3

Dask Series Structure:
npartitions=1
    float64
        ...
Name: x, dtype: float64
Dask Name: sqrt, 157 tasks

Call `.compute()` when you want your result as a Pandas dataframe.

If you started `Client()` above then you may want to watch the status page during computation.

In [23]:
computed_df = df3.compute()
type(computed_df)

pandas.core.series.Series

In [9]:
computed_df

name
Alice      0.58
Bob        0.58
Charlie    0.58
Dan        0.58
Edith      0.58
           ... 
Victor     0.58
Wendy      0.58
Xavier     0.58
Yvonne     0.58
Zelda      0.57
Name: x, Length: 26, dtype: float64

## Persist data in memory

If you have the available RAM for your dataset then you can persist data in memory.  

This allows future computations to be much faster.

In [10]:
df = df.persist()

## Time Series Operations

Because we have a datetime index time-series operations work efficiently

In [11]:
%matplotlib inline

In [12]:
df[['x', 'y']].resample('1h').mean().head()

Unnamed: 0_level_0,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2000-01-01 00:00:00,0.00754,0.00965
2000-01-01 01:00:00,-0.00175,-0.0054
2000-01-01 02:00:00,0.012,0.0111
2000-01-01 03:00:00,0.00253,0.00974
2000-01-01 04:00:00,0.0114,0.00416


In [None]:
df[['x', 'y']].resample('24h').mean().compute().plot()

In [13]:
df[['x', 'y']].rolling(window='24h').mean().head()

Unnamed: 0_level_0,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2000-01-01 00:00:00,-0.28,-0.19
2000-01-01 00:00:01,-0.55,0.32
2000-01-01 00:00:02,-0.27,0.13
2000-01-01 00:00:03,-0.34,0.3
2000-01-01 00:00:04,-0.3,0.23


Random access is cheap along the index, but must still be computed.

In [14]:
df.loc['2000-01-05']

Unnamed: 0_level_0,id,name,x,y
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-05 00:00:00.000000000,int64,object,float64,float64
2000-01-05 23:59:59.999999999,...,...,...,...


In [15]:
%time df.loc['2000-01-05'].compute()

CPU times: user 8 ms, sys: 32 ms, total: 40 ms
Wall time: 62.4 ms


Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-05 00:00:00,983,Ingrid,0.96,0.94
2000-01-05 00:00:01,1040,Frank,-0.14,-0.09
2000-01-05 00:00:02,983,Frank,0.66,0.53
2000-01-05 00:00:03,1023,Norbert,0.20,-0.11
2000-01-05 00:00:04,982,Laura,1.00,-0.62
...,...,...,...,...
2000-01-05 23:59:55,994,George,0.09,0.26
2000-01-05 23:59:56,956,Kevin,-0.75,0.16
2000-01-05 23:59:57,949,Bob,-0.33,-0.72
2000-01-05 23:59:58,989,Ray,0.06,0.96


## Set Index

Data is sorted by the index column.  This allows for faster access, joins, groupby-apply operations, etc..  However sorting data can be costly to do in parallel, so setting the index is both important to do, but only infrequently.

In [16]:
df = df.set_index('name')
df

Unnamed: 0_level_0,id,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Alice,int64,float64,float64
Alice,...,...,...
...,...,...,...
Zelda,...,...,...
Zelda,...,...,...


Because computing this dataset is expensive and we can fit it in our available RAM, we persist the dataset to memory.

In [17]:
df = df.persist()

Dask now knows where all data lives, indexed cleanly by name.  As a result oerations like random access are cheap and efficient

In [22]:
%time df.loc['Alice'].compute()

CPU times: user 28 ms, sys: 4 ms, total: 32 ms
Wall time: 57.1 ms


Unnamed: 0_level_0,id,x,y
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Alice,995,0.09,-0.41
Alice,1002,-0.39,0.83
Alice,1024,0.20,0.02
Alice,984,0.55,0.92
Alice,999,0.48,0.71
...,...,...,...
Alice,992,0.52,0.72
Alice,1040,-0.90,-0.53
Alice,994,-0.20,-0.50
Alice,1020,-0.94,0.92


## Groupby Apply with Scikit-Learn

Now that our data is sorted by name we can easily do operations like random access on name, or groupby-apply with custom functions.

Here we train a different Scikit-Learn linear regression model on each name.

In [19]:
from  sklearn.linear_model import LinearRegression

def train(partition):
    est = LinearRegression()
    est.fit(partition[['x']].values, partition.y.values)
    return est

In [21]:
df.groupby('name').apply(train, meta=object).compute()

name
Alice      LinearRegression(copy_X=True, fit_intercept=Tr...
Bob        LinearRegression(copy_X=True, fit_intercept=Tr...
Charlie    LinearRegression(copy_X=True, fit_intercept=Tr...
Dan        LinearRegression(copy_X=True, fit_intercept=Tr...
Edith      LinearRegression(copy_X=True, fit_intercept=Tr...
                                 ...                        
Victor     LinearRegression(copy_X=True, fit_intercept=Tr...
Wendy      LinearRegression(copy_X=True, fit_intercept=Tr...
Xavier     LinearRegression(copy_X=True, fit_intercept=Tr...
Yvonne     LinearRegression(copy_X=True, fit_intercept=Tr...
Zelda      LinearRegression(copy_X=True, fit_intercept=Tr...
Length: 26, dtype: object

## Further Reading

For a more in-depth introduction to Dask dataframes, see the [dask tutorial](https://github.com/dask/dask-tutorial), notebooks 04 and 07.