![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [None]:
# Importer les bibliothèques nécessaires
from AlgorithmImports import *
import numpy as np
import pandas as pd
import statsmodels.api as sm
from arch.unitroot.cointegration import engle_granger
import seaborn as sns
import matplotlib.pyplot as plt

# Initialiser QuantBook
qb = QuantBook()

# Sélectionner les ETFs sectoriels
sector_etfs = ["XLF", "XLK", "XLE", "XLY", "XLP", "XLI", "XLV", "XLU", "XLRE", "XLC", "XLB"]
symbols = [qb.AddEquity(etf, Resolution.Daily).Symbol for etf in sector_etfs]

# Obtenir les données historiques (2 ans)
history = qb.History(symbols, 500, Resolution.Daily)

# Préparer les données
prices = history.close.unstack(level=0)

# ** Étape 1 : Matrice de corrélation **
# Calculer les rendements quotidiens
daily_returns = prices.pct_change().dropna()

# Calculer la matrice de corrélation
correlation_matrix = daily_returns.corr()

# Afficher la matrice de corrélation sous forme de carte thermique
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", fmt=".2f", cbar=True)
plt.title("Matrice de corrélation des rendements quotidiens")
plt.show()

# ** Étape 2 : Tester la co-intégration entre deux ETFs **
etf1 = "XLK"  # Exemple ETF 1 (Technologie)
etf2 = "XLY"  # Exemple ETF 2 (Consommation discrétionnaire)

# Extraire les prix des deux ETFs
etf1_prices = prices[etf1].dropna()
etf2_prices = prices[etf2].dropna()

# Vérifier la co-intégration avec Engle-Granger
model = engle_granger(etf1_prices, etf2_prices, trend="n", lags=0)
print(f"P-valeur du test de co-intégration entre {etf1} et {etf2} : {model.pvalue}")

# Calculer les coefficients de co-intégration via une régression
X = sm.add_constant(etf2_prices)
regression = sm.OLS(etf1_prices, X).fit()

# Récupérer les coefficients
beta0 = regression.params[0]  # Constante
beta1 = regression.params[1]  # Coefficient du second ETF

# Calculer le spread
spread = etf1_prices - (beta0 + beta1 * etf2_prices)

# Visualiser les prix et les écarts
plt.figure(figsize=(14, 7))

# Visualisation des prix
plt.subplot(2, 1, 1)
plt.plot(etf1_prices, label=f"Prix de {etf1}")
plt.plot(etf2_prices, label=f"Prix ajusté de {etf2}")
plt.title("Prix des ETFs")
plt.legend()

# Visualisation du spread
plt.subplot(2, 1, 2)
plt.plot(spread, label="Écart (spread)")
plt.axhline(spread.mean(), color='red', linestyle='--', label='Moyenne du spread')
plt.title(f"Spread entre {etf1} et {etf2}")
plt.legend()

plt.tight_layout()
plt.show()


In [2]:
from itertools import combinations
# Ajouter un filtre basé sur la volatilité moyenne et le volume
def filter_pairs(prices, threshold=0.05):
    filtered_results = []
    for etf1, etf2 in combinations(prices.columns, 2):
        etf1_prices, etf2_prices = prices[etf1].dropna(), prices[etf2].dropna()
        if len(etf1_prices) == len(etf2_prices):
            model = engle_granger(etf1_prices, etf2_prices, trend="n", lags=0)
            if model.pvalue < threshold:
                volatility = etf1_prices.pct_change().std() + etf2_prices.pct_change().std()
                avg_volume = prices[[etf1, etf2]].mean(axis=0).mean()
                filtered_results.append((etf1, etf2, model.pvalue, volatility, avg_volume))
    return pd.DataFrame(filtered_results, columns=["ETF1", "ETF2", "P-Value", "Volatility", "Avg Volume"])

filtered_pairs = filter_pairs(prices, threshold=0.05)
print(filtered_pairs.sort_values(by="P-Value"))


## 1. Recalibrer la détection de paires

Dans cette première étape, nous allons réutiliser notre fonction `filter_pairs` pour détecter des paires co-intégrées, mais nous allons rendre les conditions **un peu moins strictes** que celles introduites récemment. Notamment :
- Nous allons fixer un seuil de p-value un peu plus élevé (0.10 au lieu de 0.05).
- Nous allons enlever la contrainte de `corr > 0.8` (ou la rendre optionnelle).
- Nous allons nous assurer que la taille de la période est suffisante (par exemple 500 barres daily).

Ensuite, nous observerons la liste des paires retenues pour voir lesquelles sont vraiment prometteuses.


In [3]:
from itertools import combinations
import pandas as pd
from arch.unitroot.cointegration import engle_granger

def filter_pairs_recalibrated(prices, pval_threshold=0.10):
    """
    Identique à filter_pairs, mais avec un p-val threshold un peu moins restrictif.
    """
    results = []
    for etf1, etf2 in combinations(prices.columns, 2):
        etf1_prices = prices[etf1].dropna()
        etf2_prices = prices[etf2].dropna()
        if len(etf1_prices) == len(etf2_prices) and len(etf1_prices) > 50:
            model = engle_granger(etf1_prices, etf2_prices, trend="n", lags=0)
            if model.pvalue < pval_threshold:
                # On pourrait ajouter un test de corrélation si on le souhaite
                # corr = etf1_prices.pct_change().corr(etf2_prices.pct_change())
                results.append(
                    (etf1, etf2, 
                     model.pvalue)
                )
    df = pd.DataFrame(results, columns=["ETF1", "ETF2", "P-Value"])
    return df.sort_values(by="P-Value")

# Recalibrons la liste des paires
filtered_pairs_looser = filter_pairs_recalibrated(prices, pval_threshold=0.10)
filtered_pairs_looser


## 2. Test de co-intégration glissant (rolling Engle-Granger)

Pour être plus robuste, nous allons tester la co-intégration sur une *fenêtre mobile* (rolling window) plutôt que sur l’intégralité de la période. Ainsi, nous vérifions si la relation reste stationnaire au fil du temps. Nous allons illustrer cette approche sur la paire `XLF–XLK` (ou celle de notre choix dans la liste filtrée).


In [4]:
import numpy as np

def rolling_cointegration_test(seriesA, seriesB, window=100, p_threshold=0.10):
    """
    Applique le test Engle-Granger sur des fenêtres mobiles.
    Renvoie un pd.Series des p-values, indexées par la date de fin de la fenêtre.
    """
    p_values = []
    index_vals = seriesA.index
    for i in range(window, len(seriesA)):
        # Fenêtre [i-window : i]
        A_window = seriesA.iloc[i-window:i]
        B_window = seriesB.iloc[i-window:i]
        model = engle_granger(A_window, B_window, trend="n", lags=0)
        p_values.append(model.pvalue)
    # Index des p-values = dates de la fin de la fenêtre
    return pd.Series(p_values, index=index_vals[window:])

# Choix d'une paire (ex. "XLF–XLK") selon la liste filtrée
etfA, etfB = "XLF", "XLK"

A_prices = prices[etfA].dropna()
B_prices = prices[etfB].dropna()

roll_pvals = rolling_cointegration_test(A_prices, B_prices, window=100, p_threshold=0.10)

plt.figure(figsize=(10,4))
roll_pvals.plot()
plt.axhline(0.10, color='red', linestyle='--', label='Threshold 0.10')
plt.title(f"P-values Engle-Granger rolling (fenêtre=100) pour {etfA}-{etfB}")
plt.legend()
plt.show()

roll_pvals.describe()


## 3. Génération de signaux via z-score sur le spread

Nous allons ici implémenter une fonction qui, pour deux séries de prix (A, B), calcule en rolling:
- Une régression (A ~ B) pour obtenir le coefficient β
- Le spread = A - β * B
- Le z-score = (spread - mean(spread)) / std(spread)

Nous fixerons un lookback (ex. 30 barres) pour calculer la moyenne et l’écart-type du spread, et un *threshold* (ex. ±1.5 ou ±2) pour générer des signaux LONG/SHORT.


In [5]:
import statsmodels.api as sm

def compute_signals_zscore(seriesA, seriesB, window=30, z_threshold=1.5):
    """
    Calcule un spread rolling + z-score, et renvoie un DataFrame
    avec les colonnes: ['spread', 'zscore', 'signal'].
    signal = +1 => LONG A / SHORT B
    signal = -1 => SHORT A / LONG B
    """
    # Assurons-nous qu'on a les mêmes dates
    common_index = seriesA.index.intersection(seriesB.index)
    A = seriesA.loc[common_index].copy()
    B = seriesB.loc[common_index].copy()
    
    results = []
    
    # Rolling approach
    for i in range(window, len(A)):
        # Fenêtre [i-window : i]
        A_win = A.iloc[i-window:i]
        B_win = B.iloc[i-window:i]
        
        # OLS pour estimer A ~ B
        X = sm.add_constant(B_win)
        reg = sm.OLS(A_win, X).fit()
        beta0 = reg.params.iloc[0]
        beta1 = reg.params.iloc[1]

        
        spread_arr = (A_win - (beta0 + beta1 * B_win))
        # Dernier point (barre i-1) => on l'utilise pour le signal
        spread_values = spread_arr.values
        spread_mean = np.mean(spread_values)
        spread_std = np.std(spread_values) if np.std(spread_values) != 0 else 1e-9
        
        # Spread actuel = A[i] - (beta0 + beta1*B[i])
        current_spread = A.iloc[i] - (beta0 + beta1 * B.iloc[i])
        zscore = (current_spread - spread_mean) / spread_std
        
        # Génération du signal
        if zscore > z_threshold:
            signal = -1  # SHORT A, LONG B
        elif zscore < -z_threshold:
            signal = +1  # LONG A, SHORT B
        else:
            signal = 0
        
        results.append((A.index[i], current_spread, zscore, signal))
    
    df_signals = pd.DataFrame(results, columns=["Date", "Spread", "Zscore", "Signal"])
    df_signals.set_index("Date", inplace=True)
    return df_signals

df_z = compute_signals_zscore(A_prices, B_prices, window=30, z_threshold=1.5)

plt.figure(figsize=(10,5))
df_z['Zscore'].plot()
plt.axhline(1.5, color='red', linestyle='--', label='+Threshold')
plt.axhline(-1.5, color='red', linestyle='--', label='-Threshold')
plt.title(f"Z-score rolling (win=30) pour {etfA}-{etfB}")
plt.legend()
plt.show()

df_z['Signal'].value_counts()


## 4. Mini backtest manuel sur la paire (A, B)

Nous allons simuler un PnL théorique en supposant qu’à chaque dépassement de z-score (+1 ou -1), on ouvre une position A/B inversée, et on la ferme quand le z-score revient vers 0.

Ceci reste un backtest artisanal, juste pour valider la logique. Pour aller plus loin, nous pourrions l’intégrer à QuantConnect (avec OnData, OnEndOfDay, etc.).


In [10]:
def mini_backtest(df_signals, seriesA, seriesB, capital=100000):
    """
    Hypothèse : 
      - On entre en position quand signal passe de 0 à +1/-1
      - On sort quand signal repasse à 0
      - On achète/vend un nominal fixe (ex. on alloue 50% capital à A, 50% à B en sens inverse).
    Simplifié à l'extrême, sans considération de frais, slippage, etc.
    """
    # On remerge les prix dans df_signals
    df_bt = pd.DataFrame({
        'Signal': df_signals['Signal'],
        'PriceA': seriesA[df_signals.index],
        'PriceB': seriesB[df_signals.index]
    }).dropna()
    
    positions = []  # Stocke les positions pour éviter SettingWithCopyWarning
    pnls = []       # Stocke les PnL cumulés
    current_position = 0  # Position actuelle : 0, +1, ou -1
    
    # Variables de tracking
    entry_priceA = 0
    entry_priceB = 0
    cum_pnl = 0.0  # PnL cumulatif

    # Parcours des lignes du DataFrame
    for i in range(len(df_bt)):
        signal_curr = df_bt['Signal'].iloc[i]
        priceA_curr = df_bt['PriceA'].iloc[i]
        priceB_curr = df_bt['PriceB'].iloc[i]

        # Vérifie si le signal change
        if signal_curr != current_position:
            # Fermer la position actuelle
            if current_position != 0:
                if current_position == +1:
                    # LONG A, SHORT B
                    trade_pnl = (priceA_curr - entry_priceA) - (priceB_curr - entry_priceB)
                else:
                    # SHORT A, LONG B
                    trade_pnl = -(priceA_curr - entry_priceA) + (priceB_curr - entry_priceB)
                
                cum_pnl += trade_pnl
            
            # Ouvrir la nouvelle position
            current_position = signal_curr
            if current_position != 0:  # Enregistre les prix d'entrée
                entry_priceA = priceA_curr
                entry_priceB = priceB_curr
        
        # Ajoute les valeurs actuelles aux listes
        positions.append(current_position)
        pnls.append(cum_pnl)

    # Ajout des colonnes au DataFrame
    df_bt['Position'] = positions
    df_bt['PnL'] = pnls

    return df_bt

df_bt_results = mini_backtest(df_z, A_prices, B_prices)
df_bt_results.tail(20)


#### Visualisation des résultats du mini-backtest

In [7]:
plt.figure(figsize=(10,4))
df_bt_results['PnL'].plot()
plt.title("Évolution du PnL théorique (mini-backtest artisanal)")
plt.show()


### Filtrer la prise de position avec la p-value rolling

Nous allons ici modifier la fonction `compute_signals_zscore` pour inclure aussi la rolling p-value. L'idée est de :
1. Calculer la p-value sur la même fenêtre (ou sur une fenêtre plus longue).
2. N'autoriser le trade que si p-value < 0.10 (ou un autre seuil).
3. Conserver le z-score pour déclencher le signal ±2.


In [8]:
def compute_signals_with_pval(seriesA, seriesB, window=30, z_threshold=2.0, p_threshold=0.10):
    """
    Calcule le z-score comme avant, mais intègre une rolling p-value
    et on ne génère un signal que si la p-value < p_threshold.
    """
    common_index = seriesA.index.intersection(seriesB.index)
    A = seriesA.loc[common_index].copy()
    B = seriesB.loc[common_index].copy()
    
    # Pour la rolling p-value, on peut utiliser la même fenêtre ou un plus grand
    # ex. fenetre de 60 ou 100 barres
    window_pval = max(window, 60)
    
    signals_data = []
    
    for i in range(window_pval, len(A)):
        # Fenêtre [i-window_pval : i]
        A_win = A.iloc[i-window_pval:i]
        B_win = B.iloc[i-window_pval:i]
        
        # Test Engle-Granger sur ce segment
        model = engle_granger(A_win, B_win, trend='n', lags=0)
        pval = model.pvalue
        
        # Ensuite, régression pour le z-score
        X = sm.add_constant(B_win)
        reg = sm.OLS(A_win, X).fit()
        beta0 = reg.params.iloc[0]
        beta1 = reg.params.iloc[1]
        
        # On se base maintenant sur la dernière sous-fenêtre "window" (plus court) pour le z-score
        shortA = A.iloc[i-window:i]
        shortB = B.iloc[i-window:i]
        
        # Spread dans la sous-fenêtre
        spread_arr = shortA - (beta0 + beta1*shortB)
        spread_mean = spread_arr.mean()
        spread_std = spread_arr.std() if spread_arr.std() != 0 else 1e-9
        
        current_spread = A.iloc[i] - (beta0 + beta1 * B.iloc[i])
        zscore = (current_spread - spread_mean)/spread_std
        
        # Condition : pval < p_threshold et |zscore| > z_threshold => signal
        if pval < p_threshold:
            if zscore > z_threshold:
                signal = -1
            elif zscore < -z_threshold:
                signal = +1
            else:
                signal = 0
        else:
            signal = 0
        
        signals_data.append((A.index[i], pval, current_spread, zscore, signal))
    
    df_signals = pd.DataFrame(signals_data, columns=["Date","Pval","Spread","Zscore","Signal"])
    df_signals.set_index("Date", inplace=True)
    return df_signals

df_signals_pv = compute_signals_with_pval(A_prices, B_prices, window=30, z_threshold=2, p_threshold=0.10)
df_signals_pv.tail(10)


### Mini backtest (p-value + z-score)

Nous réutilisons la fonction de backtest basique, mais appliquée à la DataFrame `df_signals_pv`. L'objectif est de voir si la prise en compte de la co-intégration au moment de l'entrée améliore ou non le résultat final.


In [None]:
df_bt_pv = mini_backtest(df_signals_pv, A_prices, B_prices)
df_bt_pv['PnL'].plot(figsize=(10,4), title="PnL cumulatif (co-intégration + z-score)")
plt.show()

df_bt_pv.tail(10)


### A) Comparaison de différents z-score thresholds

Nous allons définir une fonction permettant de faire varier les paramètres de notre stratégie (z-score threshold) afin de comparer l'impact sur le PnL. L'idée est de boucler sur plusieurs valeurs (par exemple 1.5, 1.8, 2.0, 2.2) et d'afficher un résumé des résultats ou un graphique comparatif.


In [12]:
def test_different_zscore_thresholds(seriesA, seriesB, thresholds=[1.5, 1.8, 2.0, 2.2], p_threshold=0.10):
    """
    Boucle sur plusieurs valeurs de z-score threshold.
    Pour chaque threshold, on génère df_signals puis on backteste.
    Retourne un DataFrame résumant le PnL final.
    """
    results = []
    for thr in thresholds:
        df_signals_pv = compute_signals_with_pval(
            seriesA, seriesB, 
            window=30, 
            z_threshold=thr, 
            p_threshold=p_threshold
        )
        df_bt_pv = mini_backtest(df_signals_pv, seriesA, seriesB)
        final_pnl = df_bt_pv['PnL'].iloc[-1] if len(df_bt_pv) > 0 else 0
        results.append((thr, final_pnl))
    
    return pd.DataFrame(results, columns=["ZScoreThreshold", "FinalPnL"])

threshold_values = [1.5, 1.8, 2.0, 2.2]
df_threshold_test = test_different_zscore_thresholds(A_prices, B_prices, threshold_values, p_threshold=0.10)
df_threshold_test


### B) Test sur plusieurs paires en parallèle

Pour aller plus loin, nous allons appliquer la même logique (p-value + z-score) à plusieurs paires simultanément (ex: XLF-XLK, XLC-XLF, XLI-XLK, etc.).
Nous allons ensuite agréger le PnL de chaque paire pour visualiser si la diversification améliore (ou non) le résultat global.


In [13]:
pairs_to_test = [
    ("XLF", "XLK"), 
    ("XLC", "XLF"), 
    ("XLI", "XLK"), 
    # Ajoute éventuellement d'autres paires jugées co-intégrées
]

def backtest_multiple_pairs(pairs, prices, p_threshold=0.10, z_threshold=2.0):
    """
    On parcourt chaque paire, on génère df_signals_pv et on fait un mini_backtest.
    On retourne un DataFrame combiné du PnL cumulé en supposant 
    qu'on additionne les PnL de chaque paire (sans gestion de risque globale).
    """
    combined_pnl = pd.DataFrame()
    
    for (A, B) in pairs:
        if A not in prices.columns or B not in prices.columns:
            continue
        A_prices = prices[A].dropna()
        B_prices = prices[B].dropna()
        
        df_signals_pv = compute_signals_with_pval(
            A_prices, B_prices, 
            window=30, 
            z_threshold=z_threshold, 
            p_threshold=p_threshold
        )
        df_bt_pv = mini_backtest(df_signals_pv, A_prices, B_prices)
        
        # on stocke le PnL sur la colonne "PnL_<A>_<B>"
        col_name = f"PnL_{A}_{B}"
        combined_pnl[col_name] = df_bt_pv['PnL']
    
    # On remplit les trous par 0
    combined_pnl = combined_pnl.fillna(method='ffill').fillna(0)
    
    # On crée une colonne "TotalPnL" qui somme toutes les paires
    combined_pnl['TotalPnL'] = combined_pnl.sum(axis=1)
    
    return combined_pnl

df_multi = backtest_multiple_pairs(pairs_to_test, prices, p_threshold=0.10, z_threshold=2.0)
df_multi[['TotalPnL']].plot(figsize=(10,4), title="PnL total sur multi-paires")
plt.show()

df_multi.tail(10)


### C) Ajout d'un take-profit partiel

Ici, nous allons modifier la fonction `mini_backtest` pour sortir une partie de la position si le z-score atteint un niveau extrême (disons ±2.5). L'objectif est de sécuriser une partie des gains. Cette logique reste simplifiée, mais illustre un mécanisme de money management basique.


In [14]:
def mini_backtest_takeprofit(df_signals, seriesA, seriesB, 
                             zscore_takeprofit=2.5):
    """
    Ajout d'un take-profit partiel : 
      - On suit la position existante (0, +1, -1)
      - Si on est +1 (LONG A, SHORT B) et que le z-score < -zscore_takeprofit,
        on sort la moitié de la position (ou la totalité), 
        etc.
    NOTE : c'est un pseudo-code, à adapter selon la logique désirée.
    """
    df_bt = pd.DataFrame({
        'Signal': df_signals['Signal'],
        'Zscore': df_signals['Zscore'],
        'PriceA': seriesA[df_signals.index],
        'PriceB': seriesB[df_signals.index]
    }).dropna()
    
    positions = []
    pnls = []
    current_position = 0
    entry_priceA = 0
    entry_priceB = 0
    cum_pnl = 0.0
    partial_closed = False  # Indique si le take-profit partiel a été exécuté

    for i in range(len(df_bt)):
        signal_curr = df_bt['Signal'].iloc[i]
        z_curr = df_bt['Zscore'].iloc[i]
        priceA_curr = df_bt['PriceA'].iloc[i]
        priceB_curr = df_bt['PriceB'].iloc[i]

        # Check changement de signal
        if signal_curr != current_position:
            # Fermer la position si existante
            if current_position != 0:
                if current_position == +1:
                    trade_pnl = (priceA_curr - entry_priceA) - (priceB_curr - entry_priceB)
                else:
                    trade_pnl = -(priceA_curr - entry_priceA) + (priceB_curr - entry_priceB)
                cum_pnl += trade_pnl
            # Ouvrir nouvelle
            current_position = signal_curr
            partial_closed = False
            if current_position != 0:
                entry_priceA = priceA_curr
                entry_priceB = priceB_curr
        
        # Take-profit partiel si position +1 et z-score tombe < -2.5
        # ou si position -1 et z-score monte > +2.5
        # => on simule la fermeture de moitié de la position
        if current_position == +1 and not partial_closed:
            if z_curr < -zscore_takeprofit:
                # Fermer la moitié
                trade_pnl_half = 0.5 * ((priceA_curr - entry_priceA) - (priceB_curr - entry_priceB))
                cum_pnl += trade_pnl_half
                partial_closed = True
        elif current_position == -1 and not partial_closed:
            if z_curr > zscore_takeprofit:
                # Fermer la moitié
                trade_pnl_half = 0.5 * (-(priceA_curr - entry_priceA) + (priceB_curr - entry_priceB))
                cum_pnl += trade_pnl_half
                partial_closed = True
        
        positions.append(current_position)
        pnls.append(cum_pnl)
    
    df_bt['Position'] = positions
    df_bt['PnL'] = pnls
    
    return df_bt

# Test:
df_signals_pv = compute_signals_with_pval(A_prices, B_prices, window=30, z_threshold=2, p_threshold=0.10)
df_bt_takeprofit = mini_backtest_takeprofit(df_signals_pv, A_prices, B_prices, zscore_takeprofit=2.5)

plt.figure(figsize=(10,4))
df_bt_takeprofit['PnL'].plot()
plt.title("PnL cumulatif (take-profit partiel)")
plt.show()

df_bt_takeprofit.tail(10)


### D) Mini risk management : stop-loss et cooldown

Enfin, voici un exemple de fonction qui ajoute:
1. Un stop-loss si la position perd plus de 5 (ex. en PnL).
2. Un cooldown de 5 jours après la fermeture avant de rouvrir sur la même paire.


In [15]:
def mini_backtest_risk(df_signals, seriesA, seriesB, 
                       stop_loss=5.0,  # ex. stop-loss = -5
                       cooldown_days=5):
    """
    On sort la position si la perte latente dépasse 'stop_loss'.
    On attend 'cooldown_days' barres après la fermeture d'une position avant de rouvrir.
    """
    df_bt = pd.DataFrame({
        'Signal': df_signals['Signal'],
        'PriceA': seriesA[df_signals.index],
        'PriceB': seriesB[df_signals.index]
    }).dropna()
    
    positions = []
    pnls = []
    current_position = 0
    entry_priceA = 0
    entry_priceB = 0
    cum_pnl = 0.0
    
    # Gestion du cooldown
    days_in_cooldown = 0
    is_in_cooldown = False

    for i in range(len(df_bt)):
        signal_curr = df_bt['Signal'].iloc[i]
        priceA_curr = df_bt['PriceA'].iloc[i]
        priceB_curr = df_bt['PriceB'].iloc[i]

        if is_in_cooldown:
            days_in_cooldown += 1
            # On sort tout signal
            signal_curr = 0  
            if days_in_cooldown >= cooldown_days:
                is_in_cooldown = False
                days_in_cooldown = 0
        
        # Check changement de signal
        if signal_curr != current_position:
            # Fermer la position si existante
            if current_position != 0:
                if current_position == +1:
                    trade_pnl = (priceA_curr - entry_priceA) - (priceB_curr - entry_priceB)
                else:
                    trade_pnl = -(priceA_curr - entry_priceA) + (priceB_curr - entry_priceB)
                cum_pnl += trade_pnl
                
                # On entre en cooldown quand on clôture une position
                is_in_cooldown = True
                days_in_cooldown = 0
            
            # Ouvrir (ou pas)
            if not is_in_cooldown and signal_curr != 0:
                current_position = signal_curr
                entry_priceA = priceA_curr
                entry_priceB = priceB_curr
            else:
                # Si cooldown, pas de nouvelle position
                current_position = 0
        
        else:
            # Vérifier le stop-loss
            if current_position == +1:
                unrealized_pnl = (priceA_curr - entry_priceA) - (priceB_curr - entry_priceB)
                if unrealized_pnl <= -stop_loss:
                    # Stop-loss
                    cum_pnl += unrealized_pnl
                    current_position = 0
                    is_in_cooldown = True
                    days_in_cooldown = 0
            elif current_position == -1:
                unrealized_pnl = -(priceA_curr - entry_priceA) + (priceB_curr - entry_priceB)
                if unrealized_pnl <= -stop_loss:
                    # Stop-loss
                    cum_pnl += unrealized_pnl
                    current_position = 0
                    is_in_cooldown = True
                    days_in_cooldown = 0

        positions.append(current_position)
        pnls.append(cum_pnl)
    
    df_bt['Position'] = positions
    df_bt['PnL'] = pnls
    return df_bt

# Test
df_signals_pv = compute_signals_with_pval(A_prices, B_prices, window=30, z_threshold=2, p_threshold=0.10)
df_bt_risk = mini_backtest_risk(df_signals_pv, A_prices, B_prices, stop_loss=5.0, cooldown_days=5)

plt.figure(figsize=(10,4))
df_bt_risk['PnL'].plot()
plt.title("PnL cumulatif (Stop-loss=5, cooldown=5j)")
plt.show()

df_bt_risk.tail(10)


### 1) Tester différents "window" pour le calcul du z-score

Nous allons comparer l'impact de différentes longueurs de fenêtre (rolling window) pour le calcul du z-score (ex. 20, 30, 40).  
L'objectif est de voir si un z-score calculé sur une fenêtre plus courte (20) est plus réactif et/ou plus rentable qu'un z-score sur une fenêtre plus longue (40).  

- **Fenêtre courte** : davantage de signaux, potentiellement plus de bruit.  
- **Fenêtre longue** : signaux plus rares, mais peut-être plus robustes.  

Ensuite, nous observerons la colonne `FinalPnL` pour chaque fenêtre testée.


In [None]:
def test_different_zscore_windows(seriesA, seriesB, windows=[20,30,40], z_threshold=2.0, p_threshold=0.10):
    """
    Compare l'impact de différentes tailles de fenêtre (rolling) 
    pour le calcul du z-score, avec un threshold fixe.
    """
    results = []
    for w in windows:
        df_signals_pv = compute_signals_with_pval(
            seriesA, seriesB,
            window=w,         # On varie ici
            z_threshold=z_threshold, 
            p_threshold=p_threshold
        )
        df_bt_pv = mini_backtest(df_signals_pv, seriesA, seriesB)
        final_pnl = df_bt_pv['PnL'].iloc[-1] if len(df_bt_pv) > 0 else 0
        results.append((w, final_pnl))
    
    df_res = pd.DataFrame(results, columns=["ZscoreWindow", "FinalPnL"])
    return df_res

windows_test = [20, 30, 40]
df_win_test = test_different_zscore_windows(A_prices, B_prices, windows=windows_test, z_threshold=2.0)
df_win_test


### 2) Ajouter un "time stop" à la stratégie

Ici, nous introduisons la notion de *time stop* :  
- Si une position reste ouverte plus de `max_bars_in_position` barres (jours), on la clôture de force, peu importe le z-score.  
- L'idée est d'éviter de maintenir une position indéfiniment dans l'espoir que l'écart revienne, alors que la co-intégration peut ne plus être valide.

Nous conservons le reste de la logique :  
- On ouvre la position quand le z-score franchit ±threshold (et p-value < threshold).  
- On la ferme si le z-score repasse le signal ou qu'on dépasse la durée maximale autorisée en position.


In [17]:
def mini_backtest_time_stop(df_signals, seriesA, seriesB, max_bars_in_position=20):
    """
    Identique à mini_backtest, mais on ajoute :
      - on compte le nombre de barres passées en position
      - si on dépasse max_bars_in_position, on sort de force
    """
    df_bt = pd.DataFrame({
        'Signal': df_signals['Signal'],
        'PriceA': seriesA[df_signals.index],
        'PriceB': seriesB[df_signals.index]
    }).dropna()

    positions = []
    pnls = []
    current_position = 0
    entry_priceA = 0
    entry_priceB = 0
    cum_pnl = 0.0
    bars_in_position = 0

    for i in range(len(df_bt)):
        signal_curr = df_bt['Signal'].iloc[i]
        priceA_curr = df_bt['PriceA'].iloc[i]
        priceB_curr = df_bt['PriceB'].iloc[i]

        if signal_curr != current_position:
            # Fermer si besoin
            if current_position != 0:
                if current_position == +1:
                    trade_pnl = (priceA_curr - entry_priceA) - (priceB_curr - entry_priceB)
                else:
                    trade_pnl = -(priceA_curr - entry_priceA) + (priceB_curr - entry_priceB)
                cum_pnl += trade_pnl
            # Ouvrir la nouvelle
            current_position = signal_curr
            bars_in_position = 0
            if current_position != 0:
                entry_priceA = priceA_curr
                entry_priceB = priceB_curr
        else:
            # si on est en position, incrémenter le compteur
            if current_position != 0:
                bars_in_position += 1
                if bars_in_position > max_bars_in_position:
                    # On force la sortie
                    if current_position == +1:
                        trade_pnl = (priceA_curr - entry_priceA) - (priceB_curr - entry_priceB)
                    else:
                        trade_pnl = -(priceA_curr - entry_priceA) + (priceB_curr - entry_priceB)
                    cum_pnl += trade_pnl
                    current_position = 0

        positions.append(current_position)
        pnls.append(cum_pnl)
    
    df_bt['Position'] = positions
    df_bt['PnL'] = pnls
    return df_bt

# Test rapide:
df_signals_time = compute_signals_with_pval(A_prices, B_prices, window=30, z_threshold=2, p_threshold=0.10)
df_bt_time = mini_backtest_time_stop(df_signals_time, A_prices, B_prices, max_bars_in_position=20)
df_bt_time['PnL'].plot(title="PnL (Time Stop = 20 barres)", figsize=(10,4))
plt.show()


### 3) Exprimer le PnL en pourcentage du capital

Pour mieux interpréter la performance, nous allons convertir le PnL absolu en pourcentage du capital initial (ex. 100 000\$).  
- Dans notre version simplifiée, +1 de PnL ≈ +1 \$, car nous simulons l'achat d'1 share de A et la vente d'1 share de B.  
- Dans une approche plus réaliste, il faudrait dimensionner la position en fonction du prix, de la volatilité, etc.  

Cette étape sert surtout à donner un ordre de grandeur du rendement (PnL%) plutôt que de simples montants bruts.


In [18]:
def mini_backtest_in_percentage(df_signals, seriesA, seriesB, capital=100000):
    """
    Convertit le PnL en % du capital initial, 
    en supposant qu'un PnL de +1 = +1$ dans la logique simplifiée actuelle.
    
    On suppose qu'on achète 1 share de A / short 1 share de B (ou l'inverse).
    => En réalité, on voudrait calibrer la taille de position pour être plus cohérent.
    """
    df_bt = pd.DataFrame({
        'Signal': df_signals['Signal'],
        'PriceA': seriesA[df_signals.index],
        'PriceB': seriesB[df_signals.index]
    }).dropna()

    positions = []
    pnls = []
    current_position = 0
    entry_priceA = 0
    entry_priceB = 0
    cum_pnl = 0.0

    for i in range(len(df_bt)):
        signal_curr = df_bt['Signal'].iloc[i]
        priceA_curr = df_bt['PriceA'].iloc[i]
        priceB_curr = df_bt['PriceB'].iloc[i]

        if signal_curr != current_position:
            if current_position != 0:
                # Fermer
                if current_position == +1:
                    trade_pnl = (priceA_curr - entry_priceA) - (priceB_curr - entry_priceB)
                else:
                    trade_pnl = -(priceA_curr - entry_priceA) + (priceB_curr - entry_priceB)
                cum_pnl += trade_pnl
            current_position = signal_curr
            if current_position != 0:
                entry_priceA = priceA_curr
                entry_priceB = priceB_curr
        
        positions.append(current_position)
        pnls.append(cum_pnl)
    
    df_bt['Position'] = positions
    df_bt['PnL'] = pnls
    # Conversion en % du capital
    df_bt['PnLPct'] = (df_bt['PnL'] / capital) * 100
    return df_bt

df_signals_pct = compute_signals_with_pval(A_prices, B_prices, window=30, z_threshold=2, p_threshold=0.10)
df_bt_pct = mini_backtest_in_percentage(df_signals_pct, A_prices, B_prices, capital=100000)
df_bt_pct[['PnL','PnLPct']].tail(10)
