In [None]:
import pandas as pd
import numpy as np
import yfinance as yf
from neuralforecast import NeuralForecast
from neuralforecast.models import PatchTST
import logging
import os
from ticker_data import get_df_for_period

#tickers = ['AAPL', '^SPX', 'MSFT', '^NDX']
tickers = ['^SPX','KO']

# NDX is an index: volume=0, and High/Low/Open == Close (no intraday range)
#tickers_with_vol  = ['AAPL', '^SPX', '^NDX', 'MSFT']
#tickers_with_ohlc = ['AAPL', '^SPX', '^NDX', 'MSFT']

period_train = {'start': '2020-06-01', 'end': '2025-01-01'}
period_test = {'start': '2025-01-01', 'end': '2025-09-01'}

df_train = get_df_for_period(tickers, period_train)
df_test = get_df_for_period(tickers, period_test)

print(f"Training endet am: {df_train['ds'].max()}")
print(f"Blind-Test startet am: {df_test['ds'].min()}")


In [None]:
from pytorch_lightning.loggers import CSVLogger
my_logger = CSVLogger("logs", name="patchtst_model")

from neuralforecast.losses.pytorch import MAE
from neuralforecast.losses.pytorch import MQLoss
import torch 
# Logger wie gehabt
my_logger = CSVLogger("logs", name="patchtst_model")
torch.set_float32_matmul_precision( 'high')
# 2. Modell-Konfiguration

#optuna:
#    Best params: {'input_size': 80, 'patch_len': 8, 'stride': 2, 'encoder_layers': 2, 'n_heads': 4, 'hidden_size': 32, 'linear_hidden_size': 256, 'dropout': 0.11277365231240732, 'learning_rate': 0.0002796795622851195}
#    Best hit rate: 0.738095238095238

model_stable = PatchTST(
    h=7,
    input_size=90,
    patch_len=16,
    stride=2,
    max_steps=200,           
    learning_rate=0.0003,
    encoder_layers = 5,
    n_heads = 8,
    hidden_size = 128, 
    linear_hidden_size=64,
    dropout=0.06, 
    loss=MQLoss(level=[80, 95]),
    #early_stop_patience_steps=5,
    # Early Stopping einbauen
    #early_stop_patience_steps=3, # Stoppt, wenn val_loss 3x hintereinander steigt
    val_check_steps=50,
    #optimizer_kwargs={'weight_decay': 0.01} , #Wir bestrafen das Modell für zu komplexe interne Gewichte. Das zwingt es dazu, einfachere (und damit oft stabilere) Lösungen zu finden.
    accelerator='gpu',
    devices=1,
    logger=my_logger, 
    enable_progress_bar=False,  
)

# 3. NeuralForecast initialisieren
nf_refined = NeuralForecast(models=[model_stable], freq='D')

# 4. Fit ausführen
# Wenn val_size > 0 ist, erkennt Lightning meist automatisch den 'valid_loss'
print("Start training with ", len(df_train), "samples")
print(df_train)
nf_refined.fit(df=df_train, val_size=50)

In [None]:
import pandas as pd
import glob
import os
import matplotlib.pyplot as plt
# Nach dem nf.fit(df=train_df, val_size=20) suchen wir die neueste Log-Datei
log_dir = 'logs/patchtst_model/'
latest_log = max(glob.glob(os.path.join(log_dir, 'version_*')), key=os.path.getmtime)
metrics_path = os.path.join(latest_log, 'metrics.csv')

if os.path.exists(metrics_path):
    print(metrics_path)
    # Separate DataFrames für Train und Validation ohne NaNs
    metrics = pd.read_csv(metrics_path)
    train_metrics = metrics[['step', 'train_loss_step']].dropna()
    val_metrics = metrics[['step', 'valid_loss']].dropna()

    plt.figure(figsize=(12, 6))

    # Train Loss (leicht transparent, da verrauscht)
    plt.plot(train_metrics['step'], train_metrics['train_loss_step'], 
            label='Train Loss (Step)', alpha=0.3, color='blue')

    # Rolling Mean zur Visualisierung des Trends
    plt.plot(train_metrics['step'], train_metrics['train_loss_step'].rolling(window=20).mean(), 
            label='Train Loss (Trend)', color='blue', linewidth=2)

    # Validation Loss (deutlich als Punkte)
    plt.scatter(val_metrics['step'], val_metrics['valid_loss'], 
                label='Validation Loss', color='orange', zorder=5)
    plt.plot(val_metrics['step'], val_metrics['valid_loss'], 
            color='orange', linestyle='--', alpha=0.6)

    plt.title('Bereinigter Trainingsverlauf: Signal vs. Rauschen')
    plt.xlabel('Schritte')
    plt.ylabel('MAE Loss')
    plt.legend()
    plt.grid(True, alpha=0.2)
    plt.show()
else:
    print("CSV-Metriken nicht gefunden. Stelle sicher, dass 'val_size' beim fit() angegeben wurde.")

In [None]:
# Cross-Validation auf dem noch nie gesehenen Test-Set
df_full = pd.concat([df_train, df_test]).sort_values(['unique_id', 'ds'])
crossvalidation_results = nf_refined.cross_validation(df=df_full, 
                                       n_windows=len(df_test['ds'].unique()),  # one window per test day
                                       step_size=1, refit=False )


In [None]:
test_ticker = 'KO'
test_start = df_test['ds'].min()
crossvalidation_results = crossvalidation_results[crossvalidation_results['ds'] >= test_start]
 
# After training with MQLoss, nf.predict() returns:
#  unique_id   ds          PatchTST-median  PatchTST-lo-80  PatchTST-hi-80  PatchTST-lo-95  PatchTST-hi-95
#  XXX_price  2025-01-03  0.00312          -0.00821         0.01445         -0.01203         0.01827

cv_test_ticker = crossvalidation_results[crossvalidation_results['unique_id'] == f'{test_ticker}_price'].copy()
cv_test_ticker_clean = cv_test_ticker.groupby('cutoff').first().reset_index()
#print(len(cv_test_ticker), len(cv_test_ticker_clean) )

# Trading-Logik: Long-Short
#check the confidence interval
cv_test_ticker_clean['signal'] = 0
#cv_test_ticker_clean.loc[cv_test_ticker_clean['PatchTST-lo-80'] > 0, 'signal'] = 1   # entire 80% interval is positive
#cv_test_ticker_clean.loc[cv_test_ticker_clean['PatchTST-hi-80'] < 0, 'signal'] = -1  # entire 80% interval is negative
cv_test_ticker_clean['signal'] = np.where(cv_test_ticker_clean['PatchTST-median'] > 0, 1, -1)
#without confidence interval cv_test_ticker_clean['signal'] = np.where(cv_test_ticker_clean['PatchTST'] > 0, 1, -1)

cv_test_ticker_clean['strat_ret'] = cv_test_ticker_clean['signal'] * cv_test_ticker_clean['y']

# Kumulative Performance
cv_test_ticker_clean['cum_strat'] = cv_test_ticker_clean['strat_ret'].cumsum().apply(np.exp)
cv_test_ticker_clean['cum_market'] = cv_test_ticker_clean['y'].cumsum().apply(np.exp)
#print(cv_test_ticker_clean)

In [None]:
plt.figure(figsize=(12, 6))
plt.plot(cv_test_ticker_clean['ds'], cv_test_ticker_clean['cum_market'], label='Markt (Buy & Hold)', color='gray', alpha=0.6)
plt.plot(cv_test_ticker_clean['ds'], cv_test_ticker_clean['cum_strat'], label='Multivariat PatchTST (Long-Short)', color='green', linewidth=2)

plt.title('Blind-Test 2023: Multivariate PatchTST Strategie (Preis+Vol+SPY)')
plt.xlabel('Datum')
plt.ylabel('Portfolio Wert')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

print(f"Finale Outperformance (Blind-Test): {(cv_test_ticker_clean['cum_strat'].iloc[-1] - cv_test_ticker_clean['cum_market'].iloc[-1])*100:.2f} Prozentpunkte")

In [None]:
def optimize_threshold(results_df):
    thresholds = np.linspace(0, results_df['PatchTST-median'].std() * 3, 50)
    best_sharpe = -np.inf
    best_threshold = 0
    results_list = []

    for t in thresholds:
        # Signal berechnen
        sig = np.where(results_df['PatchTST-median'] > t, 1, 
              np.where(results_df['PatchTST-median'] < -t, -1, 0))
        
        # Rendite (shifted)
        strat_ret = pd.Series(sig).shift(1).values * results_df['y'].values
        strat_ret = strat_ret[~np.isnan(strat_ret)]
        
        # Sharpe berechnen
        if len(strat_ret) > 0 and strat_ret.std() > 0:
            sharpe = (strat_ret.mean() / strat_ret.std()) * np.sqrt(252)
        else:
            sharpe = 0
            
        results_list.append((t, sharpe))
        
        if sharpe > best_sharpe:
            best_sharpe = sharpe
            best_threshold = t

    # Visualisierung der Optimierung
    plt.figure(figsize=(10, 5))
    ts, ss = zip(*results_list)
    plt.plot(ts, ss, marker='o', color='purple')
    plt.axvline(best_threshold, color='red', linestyle='--', label=f'Optimum: {best_threshold:.4f}')
    plt.title('Threshold vs. Sharpe Ratio')
    plt.xlabel('Threshold (Log-Return Abweichung)')
    plt.ylabel('Sharpe Ratio')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
    
    print(f"Bester Threshold: {best_threshold:.6f}")
    print(f"Maximale Sharpe Ratio: {best_sharpe:.2f}")
    return best_threshold


print(cv_test_ticker_clean)
opt_t = optimize_threshold(cv_test_ticker_clean)
print(opt_t)

In [None]:
def run_backtest_with_fees(results_df, threshold, fee_per_trade=0.0005):
    """
    Simulation mit Gebühren (0.0005 = 0.05% pro Kauf/Verkauf)
    """
    df = results_df.copy()
    
    # Signal mit optimalem Threshold
    df['signal'] = 0
    df.loc[df['PatchTST-median'] > threshold, 'signal'] = 1
    df.loc[df['PatchTST-median'] < -threshold, 'signal'] = -1
    
    # Trades identifizieren (Wechsel des Signals)
    # Ein Trade findet statt, wenn sich das Signal im Vergleich zum Vortag ändert
    df['trade_action'] = df['signal'].diff().abs()
    
    # Brutto-Rendite
    df['strat_ret_brutto'] = df['signal'].shift(1) * df['y']
    
    # Netto-Rendite (Brutto - (Anzahl Trades * Gebühr))
    df['strat_ret_netto'] = df['strat_ret_brutto'] - (df['trade_action'] * fee_per_trade)
    
    # Kumulierte Werte
    df['cum_strat_brutto'] = np.exp(df['strat_ret_brutto'].fillna(0).cumsum())
    df['cum_strat_netto'] = np.exp(df['strat_ret_netto'].fillna(0).cumsum())
    #print(df)
    # Plot
    plt.figure(figsize=(12, 6))
    plt.plot(df['ds'], df['cum_strat_brutto'], label='Strategie Brutto', alpha=0.5)
    plt.plot(df['ds'], df['cum_strat_netto'], label='Strategie Netto (inkl. Gebühren)', color='darkgreen', linewidth=2)
    plt.plot(df['ds'], np.exp(df['y'].cumsum()), label='Markt', color='gray', linestyle='--')
    plt.title(f'Backtest mit Gebühren ({fee_per_trade*100:.2f}% pro Trade)')
    plt.legend()
    plt.show()
    
    return df

# Simulation mit 0.05% Gebühren (typisch für viele Online-Broker/Krypto)

df_netto = run_backtest_with_fees(cv_test_ticker_clean, threshold=0.003, fee_per_trade=0.0005)

In [None]:
#Wir führen eine Confidence-Schwelle ein.
# Wenn das Modell nur eine minimale Bewegung vorhersagt, bleiben wir an der Seitenlinie (Signal 0). 
# Wir handeln nur bei "starken" Vorhersagen.    
# Berechne die Standardabweichung der Vorhersagen, um eine Schwelle zu finden
long_results = df_netto.copy()
threshold = long_results['PatchTST-median'].std() * 0.5

# Neues Signal-Design:
# 1  bei starkem Aufwärtstrend
# -1 bei starkem Abwärtstrend
# 0  bei Unsicherheit (Rauschen)
long_results['signal_filtered'] = 0
long_results.loc[long_results['PatchTST-median'] > threshold, 'signal_filtered'] = 1
long_results.loc[long_results['PatchTST-median'] < -threshold, 'signal_filtered'] = -1

# Neue Performance berechnen
long_results['strat_ret_filtered'] = long_results['signal_filtered'].shift(1) * long_results['y']
long_results['cum_strat_filtered'] = np.exp(long_results['strat_ret_filtered'].cumsum())

def print_stats_filtered(results):
    # Annualisierte Sharpe Ratio (ausgehend von 252 Handelstagen)
    std = results['strat_ret_filtered'].std()
    mean = results['strat_ret_filtered'].mean()
    sharpe = (mean / std) * np.sqrt(252) if std != 0 else 0
    
    # Maximaler Drawdown
    cum_ret = results['cum_strat_filtered']
    running_max = cum_ret.cummax()
    drawdown = (cum_ret - running_max) / running_max
    max_dd = drawdown.min() * 100
    
    print(f"--- Strategie-Check cum_strat_filtered ---")
    print(f"Sharpe Ratio:      {sharpe:.2f}")
    print(f"Max Drawdown:      {max_dd:.2f}%")
    print(f"Win Rate (Tage):   {(results['strat_ret'] > 0).mean()*100:.1f}%")
    
def print_stats(results):
    # Annualisierte Sharpe Ratio (ausgehend von 252 Handelstagen)
    std = results['strat_ret'].std()
    mean = results['strat_ret'].mean()
    sharpe = (mean / std) * np.sqrt(252) if std != 0 else 0
    
    # Maximaler Drawdown
    cum_ret = results['cum_strat_netto']
    running_max = cum_ret.cummax()
    drawdown = (cum_ret - running_max) / running_max
    max_dd = drawdown.min() * 100
    
    print(f"--- Strategie-Check unfiltered ---")
    print(f"Sharpe Ratio:      {sharpe:.2f}")
    print(f"Max Drawdown:      {max_dd:.2f}%")
    print(f"Win Rate (Tage):   {(results['strat_ret'] > 0).mean()*100:.1f}%")

print_stats(long_results)
print_stats_filtered(long_results)

In [None]:
# Das Modell lokal speichern
nf_refined.save(path='./checkpoints/patchtst_momentum_model_multivar_100days_KO_conf_interval/')

print("Modell erfolgreich gespeichert. Du kannst es nun jederzeit laden.")