In [7]:
!pip3 install seaborn tqdm

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [8]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.stattools import coint
from itertools import combinations
import matplotlib.gridspec as gridspec
from tqdm import tqdm

import warnings
warnings.filterwarnings('ignore')

# Plot settings
# Plot settings
plt.style.use('classic')
plt.rcParams['figure.figsize'] = [16, 8]
plt.rcParams['figure.dpi'] = 100

# LaTeX-Schriftart für Matplotlib aktivieren
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman', 'Computer Modern Roman']
plt.rcParams['mathtext.fontset'] = 'cm'  # Computer Modern für mathematischen Text

In [9]:
# Zeiträume konfigurieren
TRAIN_START = pd.Timestamp('2021-02-02')
TRAIN_END = pd.Timestamp('2024-01-01')
TEST_END = pd.Timestamp('2025-01-01')
TRADING_DAYS_PER_YEAR = 252

DATE_CONFIG = {
    'TRAIN_START': TRAIN_START,
    'TRAIN_END': TRAIN_END,
    'TEST_END': TEST_END,
    'TRADING_DAYS_PER_YEAR': TRADING_DAYS_PER_YEAR  
}

def get_training_period():
    return {
        'start': DATE_CONFIG['TRAIN_START'],
        'end': DATE_CONFIG['TRAIN_END']
    }

def get_test_period():
    return {
        'start': DATE_CONFIG['TRAIN_END'],
        'end': DATE_CONFIG['TEST_END']
    }

In [10]:
def load_and_prepare_data(file_path):
    df = pd.read_parquet(file_path)
    df['date'] = pd.to_datetime(df['date'])
    
    mask = (df['date'] >= DATE_CONFIG['TRAIN_START']) & (df['date'] <= DATE_CONFIG['TEST_END'])
    df = df[mask]
    
    price_matrix = df.pivot(index='date', columns='symbol', values='close')
    
    # Fehlende Werte auffüllen
    price_matrix = price_matrix.ffill().bfill()
    
    symbols = price_matrix.columns.tolist()
    
    print(f"Daten geladen von {DATE_CONFIG['TRAIN_START']} bis {DATE_CONFIG['TEST_END']}")
    print(f"Anzahl Symbole: {len(symbols)}")
    print(f"Anzahl Handelstage: {len(price_matrix)}")
    
    return price_matrix, symbols

In [11]:
def find_cointegrated_pairs(price_matrix, p_threshold=0.05, symbols=None):
    """
    Findet cointegrierte Paare in der Preismatrix.
    
    Parameters:
    price_matrix (DataFrame): Matrix mit Preisdaten
    p_threshold (float): Signifikanzschwelle für den p-Wert
    symbols (list): Liste von Symbolen, die berücksichtigt werden sollen (optional)
    
    Returns:
    tuple: score_matrix, pvalue_matrix, pairs, symbols
    """
    if symbols is None:
        symbols = price_matrix.columns
    
    n = len(symbols)
    score_matrix = np.zeros((n, n))
    pvalue_matrix = np.ones((n, n))
    pairs = []
    
    # Fortschrittsbalken für die Berechnung
    total_combinations = n * (n - 1) // 2
    with tqdm(total=total_combinations, desc="Berechne Cointegration") as pbar:
        for i in range(n):
            for j in range(i+1, n):
                s1 = price_matrix[symbols[i]]
                s2 = price_matrix[symbols[j]]
                
                # NaN-Werte entfernen
                valid = ~(np.isnan(s1) | np.isnan(s2))
                if valid.sum() <= 1:
                    pbar.update(1)
                    continue
                    
                s1 = s1[valid]
                s2 = s2[valid]
                
                # Augmented Dickey-Fuller-Test durchführen
                result = coint(s1, s2)
                score = result[0]
                pvalue = result[1]
                
                score_matrix[i, j] = score
                score_matrix[j, i] = score
                pvalue_matrix[i, j] = pvalue
                pvalue_matrix[j, i] = pvalue
                
                if pvalue < p_threshold:
                    pairs.append((symbols[i], symbols[j]))
                
                pbar.update(1)
    
    return score_matrix, pvalue_matrix, pairs, symbols

def analyze_pairs(price_matrix, pvalue_threshold=0.05, symbols=None):
    """
    Analysiert Paare auf Cointegration.
    
    Parameters:
    price_matrix (DataFrame): Matrix mit Preisdaten
    pvalue_threshold (float): Signifikanzschwelle für den p-Wert
    symbols (list): Liste von Symbolen, die berücksichtigt werden sollen (optional)
    
    Returns:
    tuple: score_matrix, pvalue_matrix, pairs, symbols
    """
    if symbols is None:
        symbols = price_matrix.columns.tolist()
    
    return find_cointegrated_pairs(price_matrix, pvalue_threshold, symbols)

def plot_cointegration_heatmap(pvalue_matrix, symbols, title=None, ax=None, cmap="YlGnBu_r"):
    """
    Erstellt eine Heatmap der p-Werte für die Cointegration.
    
    Parameters:
    pvalue_matrix (ndarray): Matrix mit p-Werten
    symbols (list): Liste der Symbole
    title (str): Titel des Plots (optional)
    ax (Axes): Matplotlib-Axes-Objekt für den Plot (optional)
    cmap (str): Colormap für die Heatmap
    
    Returns:
    Axes: Das verwendete Axes-Objekt
    """
    if ax is None:
        _, ax = plt.subplots(figsize=(12, 10))
    
    # Nur die obere Dreiecksmatrix verwenden
    mask = np.triu(np.ones_like(pvalue_matrix, dtype=bool), k=0)
    
    # Logarithmische Farbe für bessere Visualisierung
    pvalue_log = -np.log10(pvalue_matrix + 1e-10)
    
    # Heatmap erstellen
    sns.heatmap(pvalue_log, mask=mask, cmap=cmap, 
                square=True, linewidths=.5, ax=ax,
                xticklabels=symbols, yticklabels=symbols)
    
    # Colorbar-Label anpassen
    cbar = ax.collections[0].colorbar
    cbar.set_label('-log10(p-value)', rotation=270, labelpad=20)
    
    # Titel hinzufügen
    if title:
        ax.set_title(title, fontsize=14, pad=20)
    
    return ax

In [12]:
def plot_combined_heatmaps(result_market1, result_market2, 
                        market_name1="FTSE", market_name2="NASDAQ", 
                        p_threshold=0.05, max_symbols=None, cmap="viridis", figsize=(22, 10)):
    """
    Erstellt zwei Heatmaps nebeneinander mit einer gemeinsamen Legende.
    
    Parameters:
    result_market1, result_market2: Ergebnistupel aus der Cointegration-Analyse
    market_name1, market_name2: Namen der Märkte
    p_threshold: Signifikanzschwelle (nur für den Titel)
    max_symbols: Optional, begrenzt die Anzahl der angezeigten Symbole
    cmap: Farbpalette für die Heatmap
    figsize: Größe der Gesamtfigur
    
    Returns:
    fig, (ax1, ax2): Die erstellte Figure und die Axes-Objekte
    """
    import numpy as np
    import matplotlib.pyplot as plt
    import seaborn as sns
    from matplotlib.colors import Normalize
    
    score_matrix1, pvalue_matrix1, pairs1, symbols1 = result_market1
    score_matrix2, pvalue_matrix2, pairs2, symbols2 = result_market2
    
    # Optional: Begrenze die Anzahl der Symbole für bessere Lesbarkeit
    if max_symbols and len(symbols1) > max_symbols:
        symbols1 = symbols1[:max_symbols]
        pvalue_matrix1 = pvalue_matrix1[:max_symbols, :max_symbols]
        
    if max_symbols and len(symbols2) > max_symbols:
        symbols2 = symbols2[:max_symbols]
        pvalue_matrix2 = pvalue_matrix2[:max_symbols, :max_symbols]
    
    # Anzahl der signifikanten Paare
    num_pairs1 = sum(1 for i in range(len(symbols1)) for j in range(i+1, len(symbols1)) 
                     if pvalue_matrix1[i, j] < p_threshold)
    num_pairs2 = sum(1 for i in range(len(symbols2)) for j in range(i+1, len(symbols2)) 
                     if pvalue_matrix2[i, j] < p_threshold)
    
    # Kopien erstellen
    pvalue_matrix1_filtered = pvalue_matrix1.copy()
    pvalue_matrix2_filtered = pvalue_matrix2.copy()
    
    # Dreieck-Masken erstellen (nur unteres Dreieck anzeigen)
    mask1 = np.triu(np.ones_like(pvalue_matrix1_filtered, dtype=bool))
    mask2 = np.triu(np.ones_like(pvalue_matrix2_filtered, dtype=bool))
    
    # Erstelle Figure mit zwei Subplots
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
    
    # Weißen Hintergrund setzen
    fig.patch.set_facecolor('white')
    
    # Normalisierung für beide Heatmaps (gemeinsame Farbskala)
    vmin = min(np.min(pvalue_matrix1_filtered), np.min(pvalue_matrix2_filtered))
    vmax = max(np.max(pvalue_matrix1_filtered), np.max(pvalue_matrix2_filtered))
    norm = Normalize(vmin=vmin, vmax=vmax)
    
    # Erste Heatmap mit schwarzem Rand
    hm1 = sns.heatmap(pvalue_matrix1_filtered, 
                     mask=mask1,
                     cmap=cmap, 
                     square=True, 
                     linewidths=0.1,
                     cbar=False,  # Keine Legende für die erste Heatmap
                     xticklabels=symbols1,
                     yticklabels=symbols1,
                     norm=norm,
                     ax=ax1)
    
    # Schwarzen Rand für die erste Heatmap
    for _, spine in ax1.spines.items():
        spine.set_visible(True)
        spine.set_color('black')
        spine.set_linewidth(3.5)
    
    # Zweite Heatmap mit gemeinsamer Legende
    hm2 = sns.heatmap(pvalue_matrix2_filtered, 
                     mask=mask2,
                     cmap=cmap, 
                     square=True, 
                     linewidths=0.1,
                     cbar_kws={"shrink": 0.9, 
                              "label": "p-value",
                              "aspect": 15},  # Breite der Colorbar anpassen
                     xticklabels=symbols2,
                     yticklabels=symbols2,
                     norm=norm,
                     ax=ax2)
    
    # Schwarzen Rand für die zweite Heatmap
    for _, spine in ax2.spines.items():
        spine.set_visible(True)
        spine.set_color('black')
        spine.set_linewidth(3.5)
    
    # Größe der Tick-Labels in der Colorbar anpassen NACH dem Erstellen der Heatmap
    cbar = ax2.collections[0].colorbar
    cbar.ax.tick_params(labelsize=25)  # Größere Tick-Labels in der Colorbar
    cbar.set_label("p-value", fontsize=30)  # Größere Beschriftung der Colorbar
    
    # Beschriftungen anpassen
    for ax in [ax1, ax2]:
        plt.sca(ax)
        plt.xticks(fontsize=20, rotation=90)
        plt.yticks(fontsize=20)
        ax.set_xlabel("Symbol", fontsize=30)
    
    # Nur für die erste Achse Y-Label setzen
    ax1.set_ylabel("Symbol", fontsize=30)
    
    # Titel hinzufügen
    ax1.set_title(f"{market_name1} - Cointegration\n({num_pairs1} Pairs found with p < {p_threshold})", fontsize=30, pad=20)
    ax2.set_title(f"{market_name2} - Cointegration\n({num_pairs2} Pairs found with p < {p_threshold})", fontsize=30, pad=20)
    
    # Layout optimieren
    plt.tight_layout()
    
    return fig, (ax1, ax2)

In [None]:
result_ftse = (score_matrix_ftse, pvalue_matrix_ftse, pairs_ftse, symbols_ftse)
result_nasdaq = (score_matrix_nasdaq, pvalue_matrix_nasdaq, pairs_nasdaq, symbols_nasdaq)


# Beispiel mit "plasma" Farbskala
fig, (ax1, ax2) = plot_combined_heatmaps(
    result_ftse, result_nasdaq, 
    market_name1="FTSE", market_name2="NASDAQ", 
    p_threshold=0.05, max_symbols=20, cmap="plasma"
)

# Um die Grafik zu speichern (optional)
# fig.savefig('cointegration_comparison.png', dpi=300, bbox_inches='tight')

plt.show()

NameError: name 'score_matrix_ftse' is not defined