# Assignment 5

Deadline: 11.06.2025 12:00 CEST

## Task

Develop an investment strategy for the Swiss equity market, backtest it using the provided datasets (`market_data.parquet`, `jkp_data.parquet`, `spi_index.csv`) and analyze its performance by benchmarking it against the SPI index. Work with the existing code infrastructure (`qpmwp-course`) and extend it by implementing any additional components needed for the strategy. Write a report that presents your methodology and the results.

### Coding (15 points)

- Selection:
  Implement selection item builder functions (via `SelectionItemBuilder`) to filter stocks based on specific criteria (e.g., exclude low-quality or high-volatility stocks).

- Optimization Data & Constraints:
  Implement functions to prepare optimization data (via `OptimizationItemBuilder`), including any econometric or machine learning-based predictions. These functions should also define optimization constraints (e.g., stock, sector, or factor exposure limits).

- Optimization Model:
  If you choose to create a custom optimization model, develop a class inheriting from Optimization (similar to `MeanVariance`, `LeastSquares`, or `BlackLitterman`). Your class should include methods set_objective and solve for defining the objective function and solving the optimization problem.

- Machine Learning Prediction:
  Integrate a machine learning model to estimate inputs for the optimization, such as expected returns or risk. This could include regression, classification, or learning-to-rank models. I suggest you to use the provided jkp_data as features, but you may also create your own (e.g., technical indicators computed on the return or price series).

- Simulation:
  Backtest the strategy and simulate portfolio returns. Account for fixed costs (1% per annum) and variable (transaction) costs (0.2% per rebalancing).


### Report (15 points):

Generate an HTML report with the following sections:

- High-level strategy overview: Describe the investment strategy you developed.

- Detailed explanation of the backtesting steps: Offer a more comprehensive breakdown of the backtesting process, including a description of the models implemented (e.g., details of the machine learning method used).

- Backtesting results:
    
    - Charts: Include visual representations (e.g., cumulative performance charts, rolling 3-year returns, etc.).
    - Descriptive statistics: Present key statistics such as mean, standard deviation, drawdown, turnover, and Sharpe ratio (or any other relevant metric) for the full backtest period as well as for subperiods (e.g., the last 5 years, or during bull vs. bear market phases).
    - Compare your strategy against the SPI index.


In [None]:
# LSTM Mean–Variance Portfolio  ·  50 JKP factors 

#  deterministic μ (no MC-Dropout)
#  α-boost 1.8  +  turnover shrink 1.00 / 0.95
#  fixed λ = 1.5   (no dynamic rescale)

import os, sys, random, logging, warnings, types, itertools
from typing import Optional

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import matplotlib.pyplot as plt

from helper_functions import load_data_spi
from estimation.covariance import Covariance
from optimization.optimization import MeanVariance, Objective
from backtesting.backtest_data import BacktestData
from backtesting.backtest_service import BacktestService
from backtesting.backtest import Backtest
from backtesting.backtest_item_builder_classes import SelectionItemBuilder, OptimizationItemBuilder
from backtesting.backtest_item_builder_functions import (
    bibfn_selection_min_volume, bibfn_selection_gaps, bibfn_return_series,
    bibfn_budget_constraint, bibfn_box_constraints)
from qpsolvers import available_solvers

#  reproducibility and logging 
SEED = 42
random.seed(SEED); np.random.seed(SEED)
torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)

warnings.filterwarnings("ignore")

logging.basicConfig(level=logging.INFO,
                    
                    format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.getLogger(__name__)

sys.modules["xgboost"] = types.ModuleType("xgboost")   # qpsolvers workaround

# paths/dates

DATA_PATH  = r"C:\Valentino\UZH\Quant portfolio management with Python\quant-python-last-assing\data"
START_DATE = "2016-01-01"
REB_PERIOD = 84 # quarterly
TRADING_DAYS = 252

# model and optimiser knobs
INIT_LAMBDA = 1.5
ALPHA_BOOST   = 1.80
GAMMA         = 0.002

HIDDEN, DROPOUT = 128, 0.30
MAX_EPOCHS, PATIENCE = 100, 10
LR, WD  = 1e-3, 1e-4
SEQ_LEN = 30   # look-back 30 days
FCAST_HORIZON = 1

TURN_SHRINK_LOW, TURN_SHRINK_HIGH = 1.00, 0.95
DISP_TH = 0.05

FIXED_ANNUAL, VC = 0.01, 0.002
FC = FIXED_ANNUAL * (REB_PERIOD / TRADING_DAYS)


#  50 JKP factors 
FUND_COLS = [
    
    "be_me", "op_at", "inv_gr1a", "ret_12_1", "beta_60m",
    
    "gp_at", "at_me", "rd_sale", "turnover_126d", "ivol_capm_252d", "f_score",
    
    "niq_be", "cash_at", "sale_gr1", "sale_gr3", "sale_me", "ocf_me",
    "netdebt_me", "gp_atl1", "op_atl1", "oaccruals_at", "taccruals_at",
    "capx_gr1", "capx_gr2", "capx_gr3", "inv_gr1", "rd_me", "rd5_at",
    "age", "tangibility", "kz_index", "ivol_ff3_21d", "ivol_hxz4_21d",
    "rvol_21d", "betadown_252d", "bidaskhl_21d", "rmax1_21d",
    "iskew_capm_21d", "coskew_21d", "turnover_var_126d", "dolvol_126d",
    "ami_126d", "zero_trades_252d", "corr_1260d", "rmax5_21d", "ret_6_1",
    "ret_9_1", "ret_12_7", "mispricing_mgmt", "qmj_growth"
]  

technical_features: pd.DataFrame = pd.DataFrame()

# helper to build tech panel 
def create_tech(ret: pd.DataFrame, vol: pd.DataFrame) -> pd.DataFrame:

    f, cum = pd.DataFrame(index=ret.index), (1+ret.fillna(0)).cumprod()
    for m in [1,3,6,12]:
        d=m*21; f[f"mom_{m}m"]=(cum/cum.shift(d)-1).mean(axis=1)

    for m in [1,3,6]:
        d=m*21; f[f"vol_{m}m"]=(ret.rolling(d,int(.7*d)).std().mean(axis=1)*np.sqrt(252))

    f["vol_trend"] = (vol.rolling(21,15).mean()/vol.rolling(63,45).mean()-1).mean(axis=1)
    f["breadth"]    = (ret>0).mean(axis=1)
    f["dispersion"] = ret.std(axis=1)

    return f.fillna(0)

class LSTMReturnPredictor(nn.Module):

    def __init__(self, inp:int, n:int):
        super().__init__()
        self.lstm=nn.LSTM(inp,HIDDEN,2,dropout=DROPOUT,batch_first=True)
        self.dp=nn.Dropout(DROPOUT); self.fc=nn.Linear(HIDDEN,n)

    def forward(self,x):
        out,_=self.lstm(x)
        return torch.tanh(self.fc(self.dp(out[:,-1,:])))*0.02

def pick_solver(pref="osqp"):
    solvers=available_solvers() if callable(available_solvers) else available_solvers
    return pref if pref in solvers else (solvers[0] if solvers else pref)

# optimisation class
class LSTMOptimization(MeanVariance):

    def __init__(self, **kw):
        super().__init__(solver_name=pick_solver(), **kw)
        self.net:Optional[LSTMReturnPredictor]=None
        self.input_dim:Optional[int]=None
        self.data:Optional[BacktestData]=None

    def _train(self,net,x,y):
        dev=torch.device("cuda" if torch.cuda.is_available() else "cpu")
        net.to(dev); x,y=x.to(dev),y.to(dev)
        opt=torch.optim.AdamW(net.parameters(),lr=LR,weight_decay=WD)
        crit=nn.MSELoss(); best=np.inf; pat=0

        for epoch in range(MAX_EPOCHS):
            opt.zero_grad(); loss=crit(net(x),y); loss.backward(); opt.step()

            if loss.item()+1e-8<best: best,pat=loss.item(),0
            else:
                pat+=1
                if pat>=PATIENCE: break
        net.cpu()

    def _flat_funda(self,date,ids):
        jkp=self.data.jkp_data

        if jkp is None: return None
        dates=jkp.index.get_level_values(0).unique()
        idx=dates.searchsorted(date,side="right")-1

        if idx<0: return None
        eff=dates[idx]
        try: mat=jkp[FUND_COLS].xs(eff,level=0).loc[ids].fillna(0)
        except KeyError: return None
        mat=(mat-mat.mean())/mat.std().replace(0,1)
        logger.info("Fundamentals %s → %s | k=%d",eff.date(),date.date(),len(mat.columns))

        return mat.to_numpy().ravel()
    
    def set_objective(self,optimization_data):

        X=optimization_data["return_series"]; ids=X.columns.tolist()
        cov=self.covariance.estimate(X=X,inplace=False).fillna(0)
        tech=technical_features.reindex(X.index).fillna(0)
        reb=X.index[-1]; funda=self._flat_funda(reb,ids)

        seqs,tgt=[],[]

        for i in range(SEQ_LEN,len(X)-FCAST_HORIZON):
            blk=np.hstack([X.values[i-SEQ_LEN:i],tech.values[i-SEQ_LEN:i]])
            if funda is not None:
                blk=np.hstack([blk,np.repeat(funda[np.newaxis,:],SEQ_LEN,0)])
            seqs.append(blk); tgt.append(X.values[i+FCAST_HORIZON])
        mu=pd.Series(0.,index=ids)

        if len(seqs)>=50:
            x_t=torch.tensor(np.stack(seqs),dtype=torch.float32)
            y_t=torch.tensor(np.stack(tgt ),dtype=torch.float32)
            d=x_t.shape[-1]
            if (self.net is None) or (d!=self.input_dim):
                self.input_dim=d; self.net=LSTMReturnPredictor(d,len(ids))
            self._train(self.net,x_t,y_t)
            with torch.no_grad():
                mu_hat=self.net(torch.tensor(seqs[-1:],dtype=torch.float32)).cpu().numpy().flatten()
            mu=pd.Series(mu_hat,index=ids).clip(-0.05,0.05)

        sig=pd.Series(np.sqrt(np.diag(cov)),index=ids)
        mu=(mu/sig.pow(0.5)*ALPHA_BOOST).fillna(0)
        disp=technical_features.loc[reb,"dispersion"]
        mu*=TURN_SHRINK_LOW if disp<DISP_TH else TURN_SHRINK_HIGH

        lam=INIT_LAMBDA  
        P=cov.values*2*lam + np.eye(len(ids))*2*GAMMA
        self.objective=Objective(q=-mu,
                                 P=pd.DataFrame(P,index=ids,columns=ids))

# performance helper
def perf(ret,bench):
    df=pd.concat([ret.rename("LSTM"),bench.rename("SPI")],axis=1).dropna()
    cum=(1+df).cumprod(); out={}
    for c in df:
        r=df[c]; ar, av=r.mean()*252, r.std()*np.sqrt(252)
        dd=(cum[c]-cum[c].cummax())/cum[c].cummax()
        out[c]=[ar,av,ar/av if av>0 else np.nan,dd.min()]
    return pd.DataFrame(out,index=["AnnRet","AnnVol","Sharpe","MaxDD"]).T,cum

# back-test runner 
def run_backtest(plot=True):
    global technical_features
    data=BacktestData()
    data.market_data=pd.read_parquet(os.path.join(DATA_PATH,"market_data.parquet"))
    try: data.jkp_data=pd.read_parquet(os.path.join(DATA_PATH,"jkp_data.parquet"))
    except FileNotFoundError: data.jkp_data=None
    data.bm_series=load_data_spi(path=DATA_PATH)
    technical_features=create_tech(data.get_return_series(),
                                   data.get_volume_series())

    dates=data.market_data.index.get_level_values("date").unique().sort_values()
    rdates=dates[dates>=pd.to_datetime(START_DATE)][::REB_PERIOD]

    sel={"vol":SelectionItemBuilder(bibfn=bibfn_selection_min_volume,width=252,
                                    min_volume=500_000,agg_fn=np.median),
         "gaps":SelectionItemBuilder(bibfn=bibfn_selection_gaps,width=252,n_days=7)}
    opt={"ret":OptimizationItemBuilder(bibfn=bibfn_return_series,width=252,fill_value=0),
         "budget":OptimizationItemBuilder(bibfn=bibfn_budget_constraint,budget=1.0),
         "box":OptimizationItemBuilder(bibfn=bibfn_box_constraints,lower=0,upper=0.20)}

    optimization=LSTMOptimization()
    optimization.data=data
    bs=BacktestService(data=data,
                       selection_item_builders=sel,
                       optimization_item_builders=opt,
                       optimization=optimization,
                       rebdates=[d.strftime("%Y-%m-%d") for d in rdates])
    bt=Backtest(); bt.run(bs)

    sim=bt.strategy.simulate(return_series=data.get_return_series(),fc=FC,vc=VC)
    metrics,cum=perf(sim,data.bm_series)
    if plot:
        print("\nPerformance metrics\n"); print(metrics.round(4))
        cum.plot(figsize=(12,6),title="Cumulative return – LSTM vs SPI")
        plt.ylabel("Cumulative return"); plt.tight_layout(); plt.show()
    return metrics,cum

# CLI 
if __name__=="__main__":
    run_backtest(plot=True)
