# Example of building an ml pipeline for return prediction

In [None]:
from pprint import pprint

import pandas as pd
import numpy as np
from aika import putki
from aika.putki import CalendarChecker, IrregularChecker
from aika.putki.context import Defaults, GraphContext
from aika.putki.graph import Graph, TaskModule
from aika.putki.runners import LocalRunner
from aika.putki.interface import Dependency
from aika.time.calendars import TimeOfDayCalendar, OffsetCalendar
from aika.time.time_of_day import TimeOfDay
from aika.time.time_range import TimeRange#
from aika.time.timestamp import Timestamp
from aika.utilities.fin.macd import macd, ewm_volatility
from aika.utilities.fin.returns import arithmetic_bar_returns
from aika.ml.generators.walkforward import CausalDataSetGenerator
from aika.ml.interface import Pipeline, SklearnEstimator, UnivariateStatelessTransformer, BivariateDataSet, apply_trained_models

from aika.datagraph.persistence.hash_backed import HashBackedPersistanceEngine
from aika.datagraph.persistence.mongo_backed import MongoBackedPersistanceEngine
from pandas_datareader import data
import typing as t
from pandas.tseries.offsets import BDay
import pymongo

## Preamble
Hopefully if you have been following the other tutorials, the below should now be familiar. We just create a few tasks that will acquire and produce the data that we want.

In [None]:
engine = HashBackedPersistanceEngine()
context = GraphContext(
    defaults=Defaults(
        version="research", 
        persistence_engine=engine, 
        time_range= TimeRange("2010", "2020")
    )
)

In [None]:
def pull_google_finance_data(
    tickers : t.List,
    time_range,
):
    df = data.DataReader(list(tickers), "yahoo", start=time_range.start, end=time_range.end)
    df.index.name = None
    df.index = df.index.map(Timestamp) # this ensures it has a timezone.
    return df["Adj Close"]

close_prices = context.time_series_task(
    "close_prices",
    pull_google_finance_data,
    tickers=("AAPL", "GOOGL"),
    completion_checker=CalendarChecker(
        TimeOfDayCalendar(time_of_day=TimeOfDay.from_str("00:00 [UTC]"))
    ),
)
close_prices.run()

returns = context.time_series_task(
    "returns",
    arithmetic_bar_returns,
    prices=close_prices,
    step=1,
    time_level="end"
)
returns.run()
returns.read()

def risk_adjusted_returns(returns):
    return returns.divide(ewm_volatility(returns, span=30).shift(1))

risk_adjusted_returns = context.time_series_task(
    "returns.risk_adjusted",
    risk_adjusted_returns,
    returns=returns,
    time_level="end"
)
risk_adjusted_returns.run()

weekly_returns = context.time_series_task(
    "weekly_returns",
    arithmetic_bar_returns,
    prices=close_prices,
    step=5,
    time_level="end"
)
weekly_returns.run()

In [None]:
def macd_multi_horizon(
    prices : pd.DataFrame,
    horizons : t.List[t.Tuple[int, int]],
    vol_span : int
):
    results = []
    for fast, slow in horizons:
        foo = macd(prices, fast, slow, vol_span)
        foo.columns = pd.MultiIndex.from_tuples(
            [(name, fast, slow) for name in prices.columns], 
            names=("Symbols", "fast", "slow")
        )
        results.append(foo)
    return pd.concat(results, axis=1)

all_macd = context.time_series_task(
    "all_macd",
    macd_multi_horizon,
    prices=close_prices,
    horizons=(
        (10,20),
        (20,40),
        (40,80),
        (80,160),
        (160,320)
    ),
    vol_span=90
)
all_macd.run()
all_macd.read()

## Using the pipeline model
Here we create a pipeline. It has the followng steps:
1. fill na with zeros
2. stack the data so time and asset are the index and the columns are the various macd measures.
3. Fit a regression from those signals onto the one day ahead future returns (note the use of index_level="start" in the dataset generator, so we index signals (features) to the start of the next bar return (response)).
4. Unstack the result.

To use this pipeline we firstly need a dataset generator. We use here the causal dataset generator, in effect this will align and then slice the data, this one is set up to provide to the model and expanding series of data, with a minimum period of 300 days and an essentially infinite max, and to step forward by 100 days. So there is a new dataset everytime we have 100 complete days.

These are fed into the pipeline so that we get a new model every 100 days.

Finally, we go back to the original data and apply the trained models. Trained models become available at point of the last row of data in the training set. So we apply them only after they were causally available. The function "apply_trained_pipelines" will take the list of trained models, and data that should be equivalent in form the "features" that were given to the dataset generator, and the pipelines transform method will take that data and produce a y. 

Note that the sklearn estimator will learn the column names when fitting, so that the output y data will have the same column names as the data that it was fitted to. These functions are put into aika tasks so their outputs are persisted and can be inspected.

In [None]:
from sklearn.linear_model import LinearRegression

def fill_zeros(df : pd.DataFrame):
    return df.fillna(0.0)

def stack(df : pd.DataFrame):
    return df.stack(level="Symbols")

def unstack(df : pd.DataFrame):
    return df.unstack(level="Symbols")


def fit_model(all_macd : pd.DataFrame, returns : pd.DataFrame):
    gen = CausalDataSetGenerator(
        features=all_macd,
        responses=returns,
        step_size=100,
        window_size=50000,
        min_periods=300,
        strict_step_size=True,
        causal_kwargs={
            "index_level":"start",
            "contemp":True
        }
    )
    pipeline = Pipeline(
            steps=[
                UnivariateStatelessTransformer(fill_zeros),
                UnivariateStatelessTransformer(stack),
                SklearnEstimator(LinearRegression(fit_intercept=True, copy_X=True)),
                UnivariateStatelessTransformer(unstack)
            ]
        )
    return pipeline.apply_to_dataset_generator(gen, time_level="end")


fitted_models = context.time_series_task(
    "fitted_models",
    fit_model,
    all_macd=all_macd,
    returns=risk_adjusted_returns,
    completion_checker=IrregularChecker()
)
fitted_models.run()

model_outputs = context.time_series_task(
    "model_outputs",
    apply_trained_models,
    models=Dependency(fitted_models, lookback=200 * BDay(), inherit_frequency=False),
    data=all_macd,
    time_level=0,
    contemp=True
)
model_outputs.run()
model_outputs.read()

## Inspect the Outputs

Here we can inspect the coefficients of the regression of the learned model, but looking inside the pipeline. 

In [None]:
fitted_models.read().iloc[-1].steps[-2]._estimator.coef_

Or we can compare the predictions with the actual returns.

In [None]:
o = model_outputs.read().copy()
r = risk_adjusted_returns.read().copy().droplevel("end")

o.columns = pd.MultiIndex.from_tuples([("Prediction", symbol) for symbol in o.columns])
r.columns = pd.MultiIndex.from_tuples([("Returns", symbol) for symbol in r.columns])

results = pd.concat([o,r], axis=1)
results

Or we can look at eg deciles. Clearly trying to find the optimal way to combine one day ahead momentum signals needs a little more than to use two assets over a reasoanably short time period, so don't spend any time wondering why the results are so feeble!.

In [None]:
foo = results.stack(level=1).dropna()#.pipe(lambda df : df.groupby(df["Prediction"].quantile([x/10 for x in range(10)])).mean())
quantiles = foo["Prediction"].quantile([x/10 for x in range(10)])
quantiles.index = [x * 10 for x in quantiles.index]
grouper = np.searchsorted(quantiles.values, foo["Prediction"].values, side="right")
r = foo.groupby(grouper).mean()["Returns"]
display(foo.Returns.rolling(10).mean().hist(bins=50))
pd.DataFrame({"p":quantiles, "r": r}).plot.scatter(x="p", y="r")

In [None]:
display(foo.corr())
foo.plot.scatter(x="Prediction", y="Returns")