# 3. Dask Delayed

## Read in + Clean the Time Series data

In [None]:
import pandas as pd

# Read in the csv
df = pd.read_csv(
    "./data/store-sales-time-series-forecasting/train.csv",
    usecols=["date", "store_nbr", "family", "sales"],
    parse_dates=["date"]
)

# Create a UID for store / family
df["store_family"] = (
    df["store_nbr"].astype(str) + "-" + df["family"]
)

# Drop store / family columns + rename for Prophet
df = (
    df.drop(["store_nbr", "family"], axis=1)
    .rename({
        "date": "ds",
        "sales": "y"
    }, axis=1)
    .sort_values(by=["store_family", "ds"])
    .reset_index(drop=True)
)

# Limit to 500 Time Series
first_500 = df["store_family"].unique().tolist()[0:500] 
df = df[
    df["store_family"].isin(first_500)
]

## Pandas Speed Test

In [None]:
from prophet import Prophet

def create_ts(df):
    """
    Runs a Prophet time series forecast. 
    Works on DataFrames and with .apply()
    """
    
    # Run the time series forecast
    model = Prophet(
        yearly_seasonality=True,
        weekly_seasonality=True,
        daily_seasonality=True
    )

    # Fit the Model
    model.fit(df)
    
    # Generate a future df
    forecast = model.make_future_dataframe(
        periods=365,
        include_history=True
    )
    
    # Make the forecast
    df_ts = model.predict(forecast)
    
    return df_ts

In [None]:
%%time

tab = (
    df.groupby("store_family")
    .apply(create_ts)
    .reset_index(drop=True)
)

## Dask Speed Test

In [None]:
from dask.distributed import Client

client = Client(
    n_workers=16,
    threads_per_worker=1,
    memory_limit="1 GiB"
)

client

In [None]:
%%time

from dask import delayed
from prophet import Prophet

@delayed
def create_ts(df):
    """
    Runs a Prophet time series forecast. 
    Works on DataFrames and with .apply()
    """
    
    # Run the time series forecast
    model = Prophet(
        yearly_seasonality=True,
        weekly_seasonality=True,
        daily_seasonality=True
    )

    # Fit the Model
    model.fit(df)
    
    # Generate a future df
    forecast = model.make_future_dataframe(
        periods=365,
        include_history=True
    )
    
    # Make the forecast
    df_ts = model.predict(forecast)
    
    return df_ts

In [None]:
%%time

from dask import compute

tab = (
    df.groupby("store_family")
    .apply(create_ts)
    .reset_index(drop=True)
)

tab_out = compute(*tab)[0]

## Dask Alternative Version (Quicker!)

In [None]:
%time

from dask import delayed
from prophet import Prophet
from prophet.plot import plot
from dask import compute

groups = df.groupby("store_family")

@delayed
def create_ts(group_name):
    """Runs a Prophet time series forecast"""
    
    # Retrive the group
    df_gp = groups.get_group(group_name)
    
    # Drop the store_family column
    df_gp = df_gp.drop("store_family", axis=1)
    
    # Run the time series forecast
    model = Prophet(
        yearly_seasonality=True,
        weekly_seasonality=True,
        daily_seasonality=True
    )

    # Fit the Model
    model.fit(df_gp)
    
    # Generate a future df
    forecast = model.make_future_dataframe(
        periods=365,
        include_history=True
    )
    
    # Make the forecast
    df_ts = model.predict(forecast)
    
    
    df_ts["store_family"] = group_name
    
    return df_ts


df_list = [create_ts(group_name) for group_name in group_names]
df_computed = pd.concat(compute(df_list)[0])