In [None]:
from itertools import combinations, accumulate
from functools import partial
from multiprocessing import Pool
import numpy as np
from numpy.linalg import norm
import pandas as pd
# from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt

# Get univ

In [None]:
prices = pd.read_parquet('prices_yf.parquet')
THRES = 0.8
nulls = prices.isnull().mean(axis=0)
stocks_w_nans = nulls[nulls > THRES].sort_values()
stocks_w_nans

In [None]:
cols = prices.columns
stocks = cols[~cols.isin(stocks_w_nans.index)]
prices = (
    prices
    .loc[:, stocks]
    .ffill(limit=2)
#     .dropna()
)

In [None]:
univ_stacked = pd.concat([
    prices.stack(dropna=False).reset_index(),
    prices.pct_change().stack(dropna=False).reset_index()[0]  # column 0; not clean (dividends, splits, ...)
], axis=1)
univ_stacked.columns = ['date', 'id', 'price', 'chg']  # price is mid_price
CUT = '2007'  # '2006-09-19'
univ_stacked = univ_stacked.query(f'date >= {CUT}')

In [None]:
univ = univ_stacked.pivot(index='date', columns='id')
univ

# Formation period

In [None]:
def compute_coint_coeff(pair, univ):
    prices = univ['price'].loc[:, pair]
    invs = prices.iloc[0].rdiv(1)
    invs.loc[pair[1]] *= -1
    return invs

pair = ('dis', 'pg')
coeff = compute_coint_coeff(pair, univ)
coeff

In [None]:
def form_spread(pair, coeff, prices):
    return prices.loc[:, pair].mul(coeff, axis=1).sum(axis=1)

spread = form_spread(pair, coeff, univ['price'])
spread.plot(grid=True)

In [None]:
def compute_score(spread):
    return norm(spread)

compute_score(spread)

In [None]:
def get_scores(univ):
    pairs = list(combinations(univ['price'].columns, 2))
    coeffs = {pair: compute_coint_coeff(pair, univ) for pair in pairs}
    spreads = pd.DataFrame({
        pair: form_spread(pair, coeff, univ['price']) for pair, coeff in coeffs.items()})
    scores = spreads.apply(compute_score)
    return pairs, coeffs, spreads, scores

In [None]:
pairs, coeffs, spreads, scores = get_scores(univ)
scores

In [None]:
def selection(scores, **kwargs):
    return list(scores.sort_values().head(kwargs['n_pairs']).index)

kwargs = {'n_pairs': 10}
selected = selection(scores, **kwargs)
selected

In [None]:
def fit(univ, **kwargs):
    _, coeffs, spreads, scores = get_scores(univ)
    selected = selection(scores, **kwargs)
    coeffs = {pair: coeffs[pair] for pair in selected}
    spreads = spreads[selected]
    descs = spreads.describe()
    return selected, coeffs, spreads, descs, scores[selected]

kwargs = {'n_pairs': 10}
top_pairs, coeffs, spreads, descs, scores = fit(univ, **kwargs)
spreads

# Trading period

## Trading rule

In [None]:
def trading_rule(cur_pos, st_spread, **kwargs):
    assert 'threshold' in kwargs and kwargs['threshold'] > 0
    new_pos = 0
    if st_spread < -kwargs['threshold']:
        new_pos = +1
    elif st_spread > kwargs['threshold']:
        new_pos = -1
    # st_spread in [-kwargs['threshold'], kwargs['threshold']]
    elif np.sign(st_spread) * cur_pos == -1:
        new_pos = cur_pos
    return new_pos

xxx = np.linspace(0, 3, 100)
yyy = 2 * np.sin(3 * xxx)

kwargs = dict(threshold=1)
rule = partial(trading_rule, **kwargs)
pos = accumulate(yyy, rule, initial=0)
pos = list(pos)[:-1]
plt.plot(xxx, pos)
plt.plot(xxx, yyy)
plt.grid(True)

In [None]:
def trade_series(st_spread, trade_rule, **kwargs):
    rule = partial(trade_rule, **kwargs)
    pos = accumulate(st_spread.values, rule, initial=0)
    return pd.Series(index=st_spread.index, data=list(pos)[:-1])

xxx = np.linspace(0, 3, 100)
yyy = pd.Series(2 * np.sin(3 * xxx))

kwargs = dict(threshold=1)
pos = trade_series(yyy, trading_rule, **kwargs)
plt.plot(xxx, pos)
plt.plot(xxx, yyy)
plt.grid(True)

In [None]:
spread = spreads.iloc[:, 0]
pos = trade_series(spread, trading_rule, threshold=0.1)
spread.plot()
pos.mul(spread.max()).plot(grid=True)

## On real data

In [None]:
CUT = '2014'
univ_in = univ.loc[:CUT]
univ_out = univ.loc[CUT:]

In [None]:
def trade(univ_out, top_pairs, descs, trade_rule, **kwargs):
    coeffs = {pair: compute_coint_coeff(pair, univ_out) for pair in top_pairs}
    # it is not forward looking as only the first price is used
    spreads = pd.DataFrame({pair: form_spread(
        pair, coeff, univ_out['price']) for pair, coeff in coeffs.items()})
    st_spreads = (spreads - descs.loc['mean', :]) / descs.loc['std', :]
    pos = st_spreads.apply(partial(trade_series, trade_rule=trade_rule, **kwargs))
    return coeffs, spreads, pos

top_pairs, coeffs, spreads, descs, scores = fit(univ_in, n_pairs=10)
coeffs, spreads, pos = trade(univ_out, top_pairs, descs, trading_rule, threshold=2)

In [None]:
spreads.plot()

In [None]:
pos.plot()

In [None]:
def pos_spread_to_pos(pos_spread, coeff):
    return pd.DataFrame(
        index=pos_spread.index,
        columns=coeff.index,
        data=pos_spread.values.reshape(-1, 1) * coeff.values.reshape(1, -1)
    )

pair = pos.columns[3]
pos_spread_to_pos(pos.loc[:, pair], coeffs[pair])

In [None]:
def net_positions(pos):
    stacked = (
        pd.concat(pos.values(), axis=1)
        .stack()
        .reset_index())
    stacked.columns = ['date', 'id', 'pos']
    return (
        stacked
        .groupby(['date', 'id'])
        ['pos'].sum()
        .reset_index()
        .pivot(index='date', columns='id', values='pos'))

top_pairs, _, _, descs, _ = fit(univ_in, n_pairs=10)
coeffs, _, pos_spread = trade(univ_out, top_pairs, descs, trading_rule, threshold=2)
pos = {pair: pos_spread_to_pos(pos_spread.loc[:, pair], coeffs[pair]) for pair in top_pairs}
net_pos = net_positions(pos)
net_pos

In [None]:
def fit_n_trade(univ_in, univ_out, n_pairs, thres):
    top_pairs, _, _, descs, _ = fit(univ_in, n_pairs=n_pairs)
    coeffs, _, pos_spread = trade(univ_out, top_pairs, descs, trading_rule, threshold=thres)
    pos = {pair: pos_spread_to_pos(pos_spread.loc[:, pair], coeffs[pair]) for pair in top_pairs}
    return net_positions(pos)

In [None]:
%%timeit
_ = fit_n_trade(univ_in, univ_out, 10, 2)

# 205 ms ± 1.25 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# Rolling

In [None]:
univ = univ.copy()
n_pairs = 10
thres = 2
gcd = '2Q'

In [None]:
%%time
dates = pd.Series(index=univ.index, data=0)
cuts = dates.resample(gcd).first().index
splits = [
    ((start, start_2), (start_2, start_3))
    for start, start_2, start_3
    in zip(cuts, cuts[2:], cuts[3:])]

def fit_n_trade_split(split):
    return fit_n_trade(
        univ.loc[split[0][0]:split[0][1]],
        univ.loc[split[1][0]:split[1][1]],
        n_pairs, thres)

with Pool() as pool:
    positions = pd.concat(pool.imap_unordered(fit_n_trade_split, splits)).sort_index()

In [None]:
positions[positions.abs().gt(0)].count(axis=1).plot(grid=True)

In [None]:
positions.sum(axis=1).plot(grid=True)

In [None]:
positions.abs().sum(axis=1).plot(grid=True)

In [None]:
positions = (
    positions
    .div(positions.abs().sum(axis=1), axis=0)
    .fillna(0))
positions.abs().sum(axis=1).plot(grid=True)

In [None]:
positions.sum(axis=1).plot(grid=True)

In [None]:
positions.diff().abs().sum(axis=1).plot(grid=True)

In [None]:
# # Cointegration criteria

# def returns_corr(cpl, method, returns):
#     # method in {‘pearson’, ‘kendall’, ‘spearman’}
#     return returns.loc[:, cpl].corr(method=method).iloc[0, 1]

# def diff_prices(cpl, normalized_prices):
#     prices_0 = normalized_prices.loc[:, cpl[0]]
#     prices_1 = normalized_prices.loc[:, cpl[1]]
#     return norm(prices_0 - prices_1)

# def ratio_prices(cpl, normalized_prices):
#     prices_0 = normalized_prices.loc[:, cpl[0]]
#     prices_1 = normalized_prices.loc[:, cpl[1]]
#     return norm(prices_0 / prices_1 - 1)

# def diff_ranks(cpl, normalized_prices):
#     prices_ranks_0 = normalized_prices.loc[:, cpl[0]].rank()
#     prices_ranks_1 = normalized_prices.loc[:, cpl[1]].rank()
#     return norm(prices_ranks_0 - prices_ranks_1)

# def ratio_ranks(cpl, normalized_prices):
#     prices_ranks_0 = normalized_prices.loc[:, cpl[0]].rank()
#     prices_ranks_1 = normalized_prices.loc[:, cpl[1]].rank()
#     return norm(prices_ranks_0 / prices_ranks_1 - 1)

# methods = ['pearson', 'kendall', 'spearman']
# funs = [
#     diff_prices,
#     diff_ranks,
#     ratio_prices,
#     ratio_ranks]

# def cointegration_criteria(cpl, returns, normalized_prices):
#     crits = {
#         f'corr_returns_{method}': returns_corr(cpl, method, returns)
#         for method in methods}
#     crits.update({fun.__name__: fun(cpl, normalized_prices) for fun in funs})
#     return pd.Series(crits)

# def eval_crits_n_hierarchical_agg(returns, normalized_prices, agg='mean'):
#     stocks = normalized_prices.columns
#     index = pd.DataFrame(data=combinations(stocks, 2))
#     index.columns = 'stock_' + index.columns.astype(str)
#     coint_crit = partial(
#         cointegration_criteria, returns=returns, normalized_prices=normalized_prices)
#     ranks_crits = index.apply(coint_crit, axis=1).rank()
#     groups = [fun.__name__.split('_')[-1] for fun in funs] + ['corr', agg]
#     for group in groups:
#         members = ranks_crits.columns.str.contains(group)
#         ranks_crits[f'{group}_{agg}'] = getattr(
#             ranks_crits.loc[:, members], agg)(axis=1)
#     ranks_crits = pd.concat([index, ranks_crits], axis=1).set_index(list(index.columns))
#     return (
#         ranks_crits
#         .sort_values(by=f'{agg}_{agg}')
# #         .reset_index(drop=True)
#     )

# %%time
# ranks_crits = eval_crits_n_hierarchical_agg(returns, normalized_prices)
# ranks_crits

# cpl = ranks_crits.iloc[0].name
# normalized_prices.loc[:, cpl].plot(grid=True)

# cpl = ranks_crits.iloc[-1].name
# normalized_prices.loc[:, cpl].plot(grid=True)

# # Compute spread

# def compute_spreads(ranks_crits, spread_comp, **kwargs):
#     index = ranks_crits.index
#     spreads = (
#         index
#         .to_frame(index=False)
#         .T
#         .apply(partial(spread_comp, **kwargs)))
#     spreads.columns = index
#     return spreads

# def spread_simple(cpl, prices):
#     prices_0 = prices.loc[:, cpl[0]]
#     prices_1 = prices.loc[:, cpl[1]]
#     return prices_0 - prices_1

# def spread_lin_reg(cpl, prices, fit_intercept=False):
#     xxx = prices.loc[:, cpl[0]].values.reshape(-1, 1)
#     yyy = prices.loc[:, cpl[1]]
#     lin_reg = LinearRegression(fit_intercept=fit_intercept)
#     lin_reg.fit(xxx, yyy)
#     spread = np.dot(xxx, lin_reg.coef_) + lin_reg.intercept_ - yyy
#     return spread

# spreads_simple = compute_spreads(ranks_crits, spread_simple, prices=normalized_prices)
# spread = spreads_simple.iloc[:, 0]
# spread.plot(grid=True)
# spread.describe()

# prcs = {'prices': prices, 'n_prices': normalized_prices}
# spreads_lin_regs = {
#     (name_prc, fit_intercept): compute_spreads(
#         ranks_crits, spread_lin_reg, prices=prc, fit_intercept=fit_intercept)
#     for name_prc, prc in prcs.items()
#     for fit_intercept in [True, False]
# }

# for params, spreads in spreads_lin_regs.items():
#     print(params)
#     spread = spreads.iloc[:, 0]
#     spread.plot(grid=True)
#     print(spread.describe())
#     plt.show()