# Backtesting

Para este proyecto se utilizará el backtest con:

---

#### Initial State

Para el precio inicial
- Cash
- N Shares

$\theta : 1$

A: Activo 1 : (MSFT)

B: BRK-B

$\beta_{20} : $ HR 

---

### **For Each Day** 

        ↓   

Generate Signals → Kalman → $X^\pi (S_t)$ → $X_t$
        
↓

Close Positions → Check

↓

If Z > 0 --- (Yes) → Buy A , Short B

↓

If Z < 0 → Short A , Buy B


    

---

In [None]:
if vecm_norm < -theta and not active_long_positions and not active_short_positions:
            # Short y
            available = cash * 0.4
            n_shares_short = available // (p1 * (1 + COM))
            if available > n_shares_short * p1 * (1 + COM) and n_shares_short > 0:
                # Do short
                position_value = p1 * n_shares_short * COM
                cash -= position_value
                sell += 1
                operation = Operation(
                    ticker=y,
                    type="SHORT",
                    n_shares=n_shares_short,
                    entry=p1,
                    exit=0.0,
                    time=i
                )
                active_short_positions.append(operation)
                all_positions.append(operation)

            # Long x
            n_shares_long = int(n_shares_short * hr)
            position_value = p2 * n_shares_long * (1 + COM)
            if cash > position_value:
                # Do long
                cash -= position_value
                buy += 1
                operation = Operation(
                    ticker=x,
                    type="LONG",
                    n_shares=n_shares_long,
                    entry=p2,
                    exit=0.0,
                    time=i
                )
                active_long_positions.append(operation)
                all_positions.append(operation)

            else:
                hold += 1

        # Update portfolio value
        portfolio_value.append(get_portfolio_value(
            cash, active_long_positions, active_short_positions, x, y, row[x], row[y]))

BASE BACKTEST

In [None]:
def backtest(data: pd.DataFrame) -> pd.DataFrame:
    
    data = data.copy()

    cash = config.capital
    COM = config.COM
    anual_borrow = config.BR
    borrow_daily  = anual_borrow / 252
    invest_ratio = config.INVEST

    theta_entry = 2         
    theta_exit  = 0.5        


    kalman_1 = KalmanFilter(theta_entry)
    kalman_2 = KalmanFilter(theta_entry)

    vecms_hat = []

    long_ops  = []
    short_ops = []

    results = []

    for i, row in data.iterrows():

        p1 = row[0]   # Y
        p2 = row[1]   # X

        # ====================
        # 1) UPDATE KALMAN 1
        # ====================
        x_1 = p2
        y_1 = p1
        kalman_1.update(x_1, y_1)
        w0, w1 = kalman_1.params()
        hr = w1

        # ====================
        # 2) UPDATE KALMAN 2
        # ====================
        if i < 252:
            continue

        window_data = data.iloc[i-252:i]
        eig = coint_johansen(window_data, det_order=0, k_ar_diff=1)

        e1 = eig.evec[0,0]
        e2 = eig.evec[1,0]

        VECM = e1*p1 + e2*p2

        kalman_2.update(p1, p2, VECM)
        e1_hat, e2_hat = kalman_2.params()

        VECM_hat = e1_hat*p1 + e2_hat*p2
        vecms_hat.append(VECM_hat)

        if len(vecms_hat) < 252:
            continue

        sample = np.array(vecms_hat[-252:])
        VECM_norm = (sample[-1] - sample.mean()) / sample.std()


        ############################################################



        if VECM_norm <-theta and not active_long and not active_short:

            available_cash = cash * 0.4
            n_shares_short = available_cash // (p1 * (1 + COM))
            if available_cash > n_shares_short * p1 * (1 + COM) and n_shares_short > 0:
                position_value = p1 * n_shares_short * COM 
                cash -= position_value 
                sell += 1
                position = Position(
                    ticker = "Y",
                    type_of_trade = "SHORT"
                    n_shares = n_shares_short,
                    entry_price = p1,
                    exit_price = 0,
                    time = i
                )
                active_short.append(position)
                all_positions.append(position)
        
            n_shares_long = int(n_shares_short * hr)
            position_value = p2 * n_shares_long * (1+COM)
            if cash > position_value:
                cash -= position_value
                buy += 1
                position = Position(
                    ticker = "X",
                    type_of_trade = "LONG"
                    n_shares = n_shares_long,
                    entry_price = p2,
                    exit_price = 0,
                    time = i
                )
                active_long.append(position)
                all_positions.append(position)
            
            else : 
                hold += 1

            # Update portfolio value
            portfolio_value = get_portfolio_value(cash, active_long, active_short, p1, p2)


Este se ve bien

In [None]:

def backtest(data: pd.DataFrame) -> pd.DataFrame:

    data = data.copy()

    # CONFIG
    TDays = config.TDays
    cash = config.capital
    COM = config.COM
    anual_borrow = config.BR
    theta = config.theta
    daily_borrow = anual_borrow / TDays 

    # Kalman 1 → hedge ratio dinámico
    kalman_1 = KalmanFilter(n=2)

    # Kalman 2 → suavizado del VECM
    kalman_2 = KalmanFilter(n=1)

    vecms_hat = []

    active_long = []
    active_short = []
    all_positions = []
    equity_curve = []

    buy = sell = hold = 0

    for i, row in data.iterrows():

        p1 = row[0]  # Y
        p2 = row[1]  # X

        # ====================
        # 1) UPDATE KALMAN 1
        # ====================
        x1_t = np.array([1, p2])   # regresión Y ~ X
        y1_t = p1

        w_pred_1, P_pred_1 = kalman_1.predict()
        kalman_1.update(x1_t, y1_t, w_pred_1, P_pred_1)

        w0_hat, w1_hat = kalman_1.w_t
        hr = w1_hat

        # ====================
        # 2) UPDATE KALMAN 2
        # ====================
        if i < TDays:
            # registrar equity igualmente
            portfolio_value = get_portfolio_value(cash, active_long, active_short, p1, p2)
            equity_curve.append(portfolio_value)
            continue

        window_data = data.iloc[i-TDays:i]
        eig = coint_johansen(window_data, det_order=0, k_ar_diff=1)

        e1 = eig.evec[0, 0]
        e2 = eig.evec[1, 0]

        VECM = e1*p1 + e2*p2

        x2_t = np.array([1])
        y2_t = VECM

        w_pred_2, P_pred_2 = kalman_2.predict()
        kalman_2.update(x2_t, y2_t, w_pred_2, P_pred_2)

        e_hat = kalman_2.w_t[0]
        VECM_hat = e_hat
        vecms_hat.append(VECM_hat)

        if len(vecms_hat) < TDays:
            portfolio_value = get_portfolio_value(cash, active_long, active_short, p1, p2)
            equity_curve.append(portfolio_value)
            continue

        sample = np.array(vecms_hat[-TDays:])
        VECM_norm = (sample[-1] - sample.mean()) / sample.std()

        # ============================================
        # ENTRADA SHORT SPREAD (POR DEBAJO DE -THETA)
        # ============================================

        if VECM_norm < -theta and not active_long and not active_short:

            # SHORT Y
            available_cash = cash * 0.4
            n_shares_short = int(available_cash // (p1 * (1 + COM)))

            if n_shares_short > 0:
                position_value = p1 * n_shares_short * (1 + COM)
                cash -= position_value
                sell += 1

                pos = Position(
                    n_shares=n_shares_short,
                    ticker="Y",
                    entry_price=p1,
                    exit_price=0,
                    type_of_trade="SHORT"
                )
                active_short.append(pos)
                all_positions.append(pos)

            # LONG X
            n_shares_long = int(n_shares_short * hr)
            position_value = p2 * n_shares_long * (1 + COM)

            if cash > position_value:
                cash -= position_value
                buy += 1

                pos = Position(
                    n_shares=n_shares_long,
                    ticker="X",
                    entry_price=p2,
                    exit_price=0,
                    type_of_trade="LONG"
                )
                active_long.append(pos)
                all_positions.append(pos)
            else:
                hold += 1

            portfolio_value = get_portfolio_value(cash, active_long, active_short, p1, p2)
            equity_curve.append(portfolio_value)

        # ===========================================
        # ENTRADA LONG SPREAD (POR ENCIMA DE THETA)
        # ===========================================

        if VECM_norm > theta and not active_long and not active_short:

            # LONG Y
            available_cash = cash * 0.4
            n_shares_long_Y = int(available_cash // (p1 * (1 + COM)))

            if n_shares_long_Y > 0:
                position_value = p1 * n_shares_long_Y * (1 + COM)
                cash -= position_value
                buy += 1

                pos = Position(
                    n_shares=n_shares_long_Y,
                    ticker="Y",
                    entry_price=p1,
                    exit_price=0,
                    type_of_trade="LONG"
                )
                active_long.append(pos)
                all_positions.append(pos)

            # SHORT X
            n_shares_short_X = int(n_shares_long_Y * hr)
            position_value = p2 * n_shares_short_X * (1 + COM)

            if cash > position_value:
                cash -= position_value
                sell += 1

                pos = Position(
                    n_shares=n_shares_short_X,
                    ticker="X",
                    entry_price=p2,
                    exit_price=0,
                    type_of_trade="SHORT"
                )
                active_short.append(pos)
                all_positions.append(pos)
            else:
                hold += 1

            portfolio_value = get_portfolio_value(cash, active_long, active_short, p1, p2)
            equity_curve.append(portfolio_value)


        # ===========================================
        # CIERRE DE POSICIONES (|VECM_norm| <= 0.5)
        # ===========================================


        if active_long or active_short:
            if abs(VECM_norm) <= 0.5:

                # -------------------------------
                # CERRAR LONG POSITIONS
                # -------------------------------
                for pos in active_long[:]:
                    if pos.ticker == "X":
                        exit_price = p2
                    elif pos.ticker == "Y":
                        exit_price = p1
                    cash += pos.n_shares * exit_price * (1 - COM)
                    pos.exit_price = exit_price
                    active_long.remove(pos)

                # -------------------------------
                # CERRAR SHORT POSITIONS
                # -------------------------------
                for pos in active_short[:]:
                    if pos.ticker == "X":
                        exit_price = p2
                    elif pos.ticker == "Y":
                        exit_price = p1
                    profit = (pos.entry_price - exit_price) * pos.n_shares
                    cash += profit
                    cash -= pos.n_shares * exit_price * COM
                    pos.exit_price = exit_price
                    active_short.remove(pos)

                portfolio_value = get_portfolio_value(cash, active_long, active_short, p1, p2)
                equity_curve.append(portfolio_value)


    return pd.DataFrame({
        "cash": [cash],
        "positions": [len(all_positions)],
        "buy": [buy],
        "sell": [sell],
        "hold": [hold],
        "equity_curve": [equity_curve]
    })
