In [43]:
import yfinance as yf
import pandas as pd
import numpy as np
from collections.abc import Callable
import torch
import torch.nn as nn
from torch.utils.data import Dataset
import cvxpy as cp

In [2]:
prices = yf.download(["VTI", "AGG", "DBC", "^VIX"]).loc["2006-03":"2024-10", "Close"]
returns = prices.pct_change().dropna()

[*********************100%%**********************]  4 of 4 completed


In [None]:
print(prices)
prices.isna().sum()

In [67]:
# Custom allocations
alloc_1 = {"AGG": 0.25, "DBC": 0.25, "VTI": 0.25, "^VIX": 0.25}
alloc_2 = {"AGG": 0.1, "DBC": 0.2, "VTI": 0.5, "^VIX": 0.2}
alloc_3 = {"AGG": 0.5, "DBC": 0.2, "VTI": 0.1, "^VIX": 0.2}
alloc_4 = {"AGG": 0.4, "DBC": 0.1, "VTI": 0.4, "^VIX": 0.1}


def weight_from_alloc(returns: pd.DataFrame, prices: pd.DataFrame, weight_dict: dict[str: float]) -> np.ndarray:
    n_assets = len(returns.columns)
    weights = np.zeros(n_assets)

    for i, col in enumerate(returns.columns):
        weights[i] = weight_dict[col]

    return weights


def mvo(returns: pd.DataFrame, prices: pd.DataFrame):
    n_assets = len(returns.columns)
    mu = returns.mean()
    cov = returns.cov()
    
    w = cp.Variable(n_assets)
    # todo


def plot_rets(returns: pd.DataFrame):
    (1+returns).cumprod().plot()

In [68]:
def backtest(
    prices: pd.DataFrame,
    returns: pd.DataFrame,
    est_window_size: int,
    weight_fct: Callable,
    rebalance_freq: int = 1,    # todo how to handle daily, monthly, yearly
    tcost: float = 0,
    **kwargs
) -> pd.DataFrame:

    horizon = len(returns)
    assets = returns.columns
    weights_arr = []

    for i in range(horizon - est_window_size): # todo check
        if i % rebalance_freq == 0:
            price_window = prices.iloc[i : i + est_window_size]
            return_window = returns.iloc[i : i + est_window_size]
            weights = weight_fct(return_window, price_window, **kwargs)
            # todo calculate turnover
        else:
            weights = weights_arr[-1]
            # todo calculate drift

        weights_arr.append(weights)

    weights_df = pd.DataFrame(weights_arr, columns=assets, index=returns.index[est_window_size:])
    pf_ret = (weights_df * returns).sum(axis=1)

    return pf_ret

In [69]:
test_price = prices.loc["2006"]
test_rets = returns.loc["2006"]

In [None]:
pf_ret = backtest(test_price, test_rets, 100, weight_from_alloc, weight_dict=alloc_2)

### Create dataset

In [39]:
# input shape: 
# batch_first = True -> (N, seq_len, input_size)
lookback_window = 50

feature_df = pd.concat([prices, returns], axis=1).dropna().values
X_arr = []
y_arr = []
for i in range(len(feature_df) - lookback_window):
    X_arr.append(feature_df[i : i + lookback_window].tolist())
    y_arr.append(returns.iloc[i + lookback_window].values.tolist())

X = torch.Tensor(X_arr)
y = torch.Tensor(y_arr)

train_test_cutoff = round(0.90 * X.shape[0])

X_train = X[:train_test_cutoff]
X_test = X[train_test_cutoff:]
y_train = y[:train_test_cutoff]
y_test = y[train_test_cutoff:]


### Define model

In [40]:
class LSTM(nn.Module):
    def __init__(self, n_assets, hidden_size, num_layers=1, batch_first=True):
        super(LSTM, self).__init__()
        
        self.input_size = n_assets * 2  # prices + returns
        
        self.lstm = nn.LSTM(
            input_size=self.input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=batch_first
        )
        self.fc = nn.Linear(hidden_size, n_assets)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        output, (hn, cn) = self.lstm(x)
        pf_weights = self.softmax(self.fc(hn[-1]))
        
        return pf_weights

In [41]:
lstm = LSTM(n_assets=4, hidden_size=64)
optimizer = torch.optim.Adam(lstm.parameters(), lr=0.001)