<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">


# Delayed DataFrames

In two of the previous notebooks we saw two different methods to build parallel computations with Dask

1.  Use Dask.delayed to wrap custom code
2.  Use Dask.dataframe to handle large dataframes 

Most non-trivial problems require both of these methods.  We often deal with tabular data, but messy situations arise where we need to handle things manually.  For example our data might be in a messy form that requires special attention, or we may want to execute an algorithm that is not implemented in dask.dataframe.

In this notebook we use Dask.delayed to load some custom data and then convert these delayed values into a Dask dataframe.  This shows us how to use both together.

### How to convert between dask dataframes and dask delayed

If you have a dask.dataframe, you can construct a list of delayed objects pointing to each of the dataframe's partitions

```python
>>> df.to_delayed()
[...]
```

If you have a list of delayed values, each of which create a single Pandas dataframe, you can construct a Dask.dataframe.

```python
>>> parts = [delayed(read_pandas_dataframe)(arg) for arg in args]
>>> df = dd.from_delayed(parts)
```

Consider the following example ...

In [None]:
import pandas as pd

def f(n):
    """ This returns a simple Pandas DataFrame with n rows """
    return pd.DataFrame({'x': [i for i in range(n)],
                         'y': [i ** 2 for i in range(n)]})

f(5)

In [None]:
import dask

lazy_dataframes = [dask.delayed(f)(n) for n in [1, 3, 5, 7]]
lazy_dataframes

In [None]:
import dask.dataframe as dd
df = dd.from_delayed(lazy_dataframes)
df

In [None]:
df.compute()

### Prepare data

You can ignore this section.  It moves around our data in a way that makes it efficient, but also somewhat more complex.  We'll learn more about what we have to do with this data below.

In [None]:
import glob
import os
filenames = sorted(glob.glob(os.path.join('data', 'stocks', '*', '*.csv')))

In [None]:
dirname = os.path.join('data', 'messy')
if os.path.exists(dirname):
    import shutil
    shutil.rmtree(dirname)
    
os.mkdir(dirname)

In [None]:
import pandas as pd
import feather

def convert(fn):
    data, _, symbol, date = fn.split(os.sep)
    date = date.split('.')[0]
    df = pd.read_csv(fn, parse_dates=['timestamp'])
    df['timestamp'] = ((df.timestamp - df.timestamp.dt.floor('1d')).astype(int)/ 1e9).astype('int32')
    new_fn = os.path.join(data, 'messy', date, symbol + '.feather')
    if not os.path.exists(os.path.dirname(new_fn)):
        os.mkdir(os.path.dirname(new_fn))
    feather.write_dataframe(df, new_fn)

import dask
import dask.multiprocessing
values = [dask.delayed(convert)(fn) for fn in filenames]

dask.compute(values, get=dask.multiprocessing.get);

## Parallel access to custom data formats

Imagine that you work for a company that cares about financial time series data for many stocks over time.  Last year your company decided to organize data into a special directory structure that puts each day in a separate directory and then each stock in a separate file within this directory.  Your company has chosen to use the new Feather format because it is more efficient than CSV.

This makes your data look something like the following:

```
data/messy/2015-01-01
├── AAPL.feather
├── GOOG.feather
├── MSFT.feather
└── YHOO.feather
data/messy/2015-01-02
├── AAPL.feather
├── GOOG.feather
├── MSFT.feather
└── YHOO.feather
data/messy/2015-01-03
├── AAPL.feather
├── GOOG.feather
├── MSFT.feather
└── YHOO.feather
```

Each file contains the high/low/open/close values, along with the seconds within each day.  Lets look at the data for a single day.

In [None]:
import feather
fn = os.path.join('data', 'messy', '2015-01-02', 'GOOG.feather')
df = feather.read_dataframe(fn)
df.head()

In [None]:
df.dtypes

### Load Data into Pandas

Again for efficiency, your company has decided that each feather file includes neither the stock symbol  nor the date, because these are both encoded in the filename.  However, when people want to compare many files at the same time they end up adding this information back in.  

Your colleague has written a small routine to load all of the data into a single Pandas DataFrame.  It does the following:

1.  Load each dataframe into memory
2.  Alter the dataframe to include the symbol name and date in the filename
3.  Concatenate all of these Pandas dataframes into a larger Pandas dataframe

In [None]:
dfs = []
for dir in sorted(glob.glob(os.path.join('data', 'messy', '*'))):
    for fn in sorted(glob.glob(os.path.join(dir, '*'))):
        _, _, date, symbol = fn.split(os.path.sep)
        symbol = symbol[:-len('.feather')]
        date = pd.Timestamp(date)
        df = feather.read_dataframe(fn)
        df['timestamp'] = df.timestamp.astype('m8[s]') + date
        df['symbol'] = symbol
        dfs.append(df)
        
df = pd.concat(dfs, axis=0)

In [None]:
df.head()

### Parallelize 

This routine works well and has become popular within the company on small datasets.  However your company is now anticipating getting much bigger data in the near future and wants to be able to scale out this process beyond just Pandas.  

You have been asked to paralellize your colleague's code so that it can run in parallel and scale out to a cluster.

### Exercise: Delayed + Dataframes

Build a lazy Dask dataframe from the sequential dataframe munging code we had above.  You will have to use dask.delayed to parallelize/lazify the for-loop code from before and then use `dd.from_delayed` to convert these many lazy Pandas dataframes into a dask.dataframe.

*Hint: You may at some point need to rely on [pandas.DataFrame.assign](http://pandas.pydata.org/pandas-docs/version/0.18.1/generated/pandas.DataFrame.assign.html) to avoid mutating a delayed object*

In [None]:
# Sequential Code

dfs = []
for dir in sorted(glob.glob(os.path.join('data', 'messy', '*'))):
    for fn in sorted(glob.glob(os.path.join(dir, '*'))):
        _, _, date, symbol = fn.split(os.path.sep)
        symbol = symbol[:-len('.feather')]
        date = pd.Timestamp(date)
        df = feather.read_dataframe(fn)
        df['timestamp'] = df.timestamp.astype('m8[s]') + date
        df['symbol'] = symbol
        dfs.append(df)

In [None]:
%%time

# Parallel code

# TODO: Parallelize the sequential code above using dask.delayed.  
# Get back a Dask.dataframe 


In [None]:
%time df.head()

In [None]:
%load solutions/04-delayed-dataframes.py