# Dask Delayed ETL Tutorial

In this example we will be taking exported files from Yahoo! finance, adding the ticker to a column, and loading into a sqlite database.

In many cases, if the data contained the ticker as one of the columns in the csv, I would just use `dask.DataFrame` and parse a glob for the file names 
so it would read the files in parallel and save me from having to use `dask.delayed`. but since i need the name of the file to add as a column value, I 
need to take this approach.

In [1]:
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
import dask
import os

In [2]:
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:57595  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


In [3]:
def extract_csv_data(ticker, path_to_csv):
    df = pd.read_csv(f'{path_to_csv}{ticker}.csv')

    column_dict = {
        'Adj Close': 'close_adj',
        'Date': 'date',
        'Open': 'open',
        'High': 'high',
        'Low': 'low',
        'Close': 'close',
        'Volume': 'volume'
    }
    df = df.rename(columns=column_dict)
    df['ticker'] = ticker
    return df

In [4]:
input_file_path = '../../../../data/raw/'
output_file_path = '../../../../data/'
tickers = 'AAPL,AMZN,FB,IBM,MSFT'.split(',')
ticker_dfs_list = [dask.delayed(extract_csv_data)(ticker, input_file_path) for ticker in tickers]

In [5]:
tickers_df = dd.from_delayed(ticker_dfs_list)

In [6]:
tickers_df.visualize

<bound method DaskMethodsMixin.visualize of Dask DataFrame Structure:
                 date     open     high      low    close close_adj volume  ticker
npartitions=5                                                                     
               object  float64  float64  float64  float64   float64  int64  object
                  ...      ...      ...      ...      ...       ...    ...     ...
...               ...      ...      ...      ...      ...       ...    ...     ...
                  ...      ...      ...      ...      ...       ...    ...     ...
                  ...      ...      ...      ...      ...       ...    ...     ...
Dask Name: from-delayed, 10 tasks>

In [7]:
output_path_full = f'{output_file_path}all-tickers'

In [8]:
tickers_df.head()

Unnamed: 0,date,open,high,low,close,close_adj,volume,ticker
0,2015-10-19,27.700001,27.9375,27.5275,27.932501,25.81282,119036800,AAPL
1,2015-10-20,27.834999,28.5425,27.705,28.442499,26.284117,195871200,AAPL
2,2015-10-21,28.5,28.895,28.424999,28.440001,26.281809,167180800,AAPL
3,2015-10-22,28.5825,28.875,28.525,28.875,26.683798,166616400,AAPL
4,2015-10-23,29.174999,29.807501,29.0825,29.77,27.510878,237467600,AAPL


In [9]:
tickers_df.to_parquet(output_path_full, write_index=False, partition_on=['ticker'], compression='snappy')

In [10]:
client.close()