# ETF Pipeline

> ETF flow decompositions pipeline.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# | default_exp etf_pipeline

In [None]:
# | hide
from nbdev.showdoc import *

In [None]:
# | export
import os

import click
from arcticdb import Arctic, LibraryOptions
from hydra import initialize, initialize_config_module, initialize_config_dir, compose
from omegaconf import OmegaConf
from pathlib import Path
from lobster_tools.config import MainConfig, register_configs
from lobster_tools.preprocessing import *
from lobster_tools.querying import *
from lobster_tools.flow_decomposition import *
import pandas as pd
from sklearn.linear_model import LinearRegression
from itertools import product
import datetime
from dataclasses import dataclass
from functools import partial
import json
from typing import Literal
import numpy as np
from pprint import pprint

In [None]:
# | export

def set_hydra_overrides(execution_environment: Literal["local", "server"]):
    match execution_environment:
        case "local":
            overrides = ["hyperparameters=simple", "universe=simple", "data_config=local"]
        case "server":
            # overrides = ["hyperparameters=full", "universe=XLE", "data_config=server"]
            overrides = ["hyperparameters=full"]
        case _:
            raise ValueError("execution_environment must be either 'local' or 'server'")
    return overrides

os.environ["ETF_RUN_MODE"] = "server"
execution_environment = os.environ.get("ETF_RUN_MODE")
overrides = set_hydra_overrides(execution_environment)
print(overrides)
overrides = ["hyperparameters=simple", "universe=simple", "data_config=local"]

register_configs()
with initialize(version_base=None, config_path=None):
    
    # defaults = [{"hyperparameters": "simple"}, "_self_", {"override hydra/launcher": "joblib"}]
    overrides = ["data_config.csv_files_path=baz"]
    overrides = ["data_config=server"]
    cfg_omega = compose(config_name="config", overrides=overrides)
    print(cfg_omega)
    cfg: MainConfig = OmegaConf.to_object(compose(config_name="config", overrides=overrides))
    print(cfg)

directory_path = cfg.data_config.csv_files_path
etfs = cfg.universe.etfs
equities = cfg.universe.equities
date_range = cfg.data_config.date_range
markouts = cfg.hyperparameters.markouts
finest_resample = cfg.hyperparameters.finest_resample
max_markout = cfg.hyperparameters.max_markout

load: Literal["both", "messages", "book"] = "both"
clip_trading_hours = True
add_ticker_column = True

ticker = "AIG"


# print(cfg.overrides)
print(cfg.data_config.csv_files_path)

['hyperparameters=full']
{'hyperparameters': {'tolerances': ['150us', '500us'], 'resample_freq': '5min', 'markouts': ['30S', '5min']}, 'universe': {'etfs': ['SPY'], 'equities': ['AIG', 'GE']}, 'data_config': {'date_range': ['2020-01-02', '2020-01-02'], 'csv_files_path': '/nfs/home/nicolasp/home/data/tmp', 'zip_files_path': '/nfs/lobster_data/lobster_raw'}, 'db': {'db_path': '/nfs/home/nicolasp/home/data/arctic', 'library_name': 'lobster', 'columns_per_segment': 63}, 'sample_data': {'ticker': 'AMZN', 'levels': 5}}
MainConfig(defaults=[{'hyperparameters': 'simple'}, {'universe': 'simple'}, {'data_config': 'local'}, '_self_'], db=ArcticDBConfig(db_path='/nfs/home/nicolasp/home/data/arctic', library_name='lobster', columns_per_segment=63), data_config=DataConfig(date_range=['2020-01-02', '2020-01-02'], csv_files_path='/nfs/home/nicolasp/home/data/tmp'), hyperparameters=HyperparametersConfig(tolerances=['150us', '500us'], resample_freq='5min', markouts=['30S', '5min']), universe=UniverseCon

In [None]:
# with initialize_config_module(version_base=None, config_module="lobster_tools.config"):
#     cfg = compose(overrides=["data_config=server"])
#     print(cfg)

In [None]:
cfg.data_config.csv_files_path

'/home/petit/Documents/data/lobster/csv'

In [None]:
overrides


In [None]:
os.environ.get("ETF_RUN_MODE")

In [None]:
directory_path

In [None]:
Data = partial(Data, 
                    ticker="GE", 
                    date_range=date_range,
                    directory_path=directory_path,
                    load=load,
                )

### Single day

In [None]:
# | eval: false
equity_data = Data(directory_path=directory_path,
                   ticker="AIG",
                   date_range=date_range,
                   load=load,
                   clip_trading_hours=clip_trading_hours,
                   add_ticker_column=add_ticker_column)

equity_lobsters = Lobster(equity_data)

### Multi-day

In [None]:
# | eval: false
equity_data = [
    Data(
        directory_path=directory_path,
        ticker=ticker,
        date_range=date_range,
        load=load,
        clip_trading_hours=clip_trading_hours,
        add_ticker_column=add_ticker_column,
    )
    for ticker in equities
]

equity_lobsters = [Lobster(data) for data in equity_data]

equity_executions = pd.concat(
    [lobster.messages.pipe(get_executions) for lobster in equity_lobsters]
)
equity_executions.sort_index(inplace=True)

In [None]:
# | eval: false
etf_data = [
    Data(
        directory_path=directory_path,
        ticker=ticker,
        date_range=date_range,
        load=load,
        clip_trading_hours=clip_trading_hours,
        add_ticker_column=add_ticker_column,
    )
    for ticker in etfs
]

etf_lobsters = [Lobster(data) for data in etf_data]

etf_executions = pd.concat(
    [lobster.messages.pipe(get_executions) for lobster in etf_lobsters]
)
etf_executions.sort_index(inplace=True)

In [None]:
#TODO: pickle and load pickles.. looks like nbdev doesn't work with cell magics

In [None]:
# | eval: false
%store etf_executions
%store equity_executions

In [None]:
# | eval: true
%store -r

In [None]:
# | eval: false
ofi_all = ofi(etf_executions, resample_freq="5T", suffix="all")
ofi_all

In [None]:
# | eval: false
markout_times = markout_returns(ofi_all, markouts=markouts)
markout_times

In [None]:
# | eval: false
markout_times = markout_returns(ofi_all, markouts=markouts)
mids = [resample_mid(lobster.book, resample_freq=finest_resample).rename(lobster.data.ticker) for lobster in etf_lobsters]
mids = pd.concat(mids, axis=1)
mids

In [None]:
# | eval: false
def compute_returns():
    index = clip_for_markout(etf_executions.resample(resample_freq, label="right").last(), max_markout=max_markout).index

    returns = {}
    for ticker in etfs:
        df = pd.DataFrame(index=index)
        for markout in ["0S"] + markouts:
            df[f"_{markout}"] = mids.loc[df.index + pd.Timedelta(markout), ticker].values

        for markout in markouts:
            df.eval(f"return_{markout} = (_{markout} / _0S ) - 1", inplace=True)

        df["return_contemp"] = mids[ticker].resample("5T").first().pct_change()
        df_returns = df.filter(regex="return")
        df_returns.columns = [column.replace("return_", "") for column in df_returns.columns]
        df_returns.columns = [("_" + column if column[0].isdigit() else column) for column in df_returns.columns ]
        returns[ticker] = df_returns
    return returns

returns = compute_returns()
returns[etfs[0]]

In [None]:
# | eval: false
def regression_table(
    X: pd.DataFrame,  # covariates. in this case, OFI for a single ETF
    y: pd.DataFrame,  # response variable. in this case, mid to mid returns at various markouts
):
    """Compute X.len * y.len univariate regressions. For each column in X, regress against each one column of y."""
    X, y = restrict_common_index(X, y)
    
    regression_results = []

    for x_col_name, y_col_name in product(X.columns, y.columns):
        x_col = X[x_col_name].values.reshape(-1, 1)
        y_col = y[y_col_name].values
        
        model = LinearRegression(fit_intercept=False)
        model.fit(x_col, y_col)

        intercept = model.intercept_
        coefficient = model.coef_[0]
        r2 = model.score(x_col, y_col)

        regression_results.append(
            {
                "id": x_col_name + "_" + y_col_name,
                "intercept": intercept,
                "coefficient": coefficient,
                "r2": r2,
            }
        )
        
    regression_results_df = pd.DataFrame(regression_results)
    regression_results_df.set_index("id", inplace=True)
    return regression_results_df

regression_table(ofi_all, returns[etfs[0]])

In [None]:
# | eval: false
drop_all_neighbor_cols(etf_executions)

In [None]:
# | eval: false
etf_executions_neighbors = add_neighbors(etf_executions=etf_executions, equity_executions=equity_executions, tolerance=tolerances)
etf_executions_neighbors

In [None]:
# | eval: false
def compute_neighbor_statistics(etf_executions_neighbors: pd.DataFrame):
    neighbor_statistics = etf_executions_neighbors.filter(regex="^_").notna().sum() / len(etf_executions_neighbors)
    return neighbor_statistics

neighbor_statistics = compute_neighbor_statistics(etf_executions_neighbors)
pd.DataFrame({'has_neighbor':neighbor_statistics}).style.format("{:.2%}")

In [None]:
# | eval: false
etf_executions_features = append_features(etf_executions=etf_executions_neighbors, equity_executions=equity_executions)

In [None]:
%store etf_executions_neighbors
%store etf_executions_features

In [None]:
# etf_executions_features = marginalise(etf_executions_features, over='same_sign/opposite_sign')

In [None]:
# etf_executions_features.columns

In [None]:
# etf_executions_features["_500us_num_trades"] = etf_executions_features._500us_num_trades_os_af + etf_executions_features._500us_num_trades_os_bf + etf_executions_features._500us_num_trades_ss_af + etf_executions_features._500us_num_trades_ss_bf

In [None]:
# etf_execution_features.filter(regex="^_").hist()

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()