In [None]:
# Parameter Optimization for Trading Profiles
# This notebook tests different parameter combinations and exports results to CSV

import sys
from pathlib import Path
import pandas as pd
import numpy as np
from itertools import product
from datetime import datetime
from joblib import Parallel, delayed
from tqdm.auto import tqdm
import time
import warnings

# Add project root to path
PROJECT_ROOT = Path.cwd().parent
sys.path.insert(0, str(PROJECT_ROOT / "src"))


warnings.filterwarnings('ignore')

print(f"Project root: {PROJECT_ROOT}")
print("✅ Imports successful")

In [None]:
# Import strategy components
from src.ingestion.data_fetcher import DataFetcher
from src.metrics.technical import TechnicalIndicators
from src.models.ml_model import MLModel
from src.metrics.backtester import Backtester
from src.metrics.performance import PerformanceMetrics

print("✅ Strategy modules imported")

In [None]:
# Define parameter ranges to test
# Adjust these ranges based on what you want to optimize

param_grid = {
    # SMA periods
    'sma_fast': [20, 30, 40],
    'sma_slow': [50, 60, 80],
    
    # ML threshold (minimum predicted gain to enter trade)
    'ml_threshold': [0.02, 0.03, 0.04, 0.05],  # 2%, 3%, 4%, 5%
    
    # Exit parameters
    'profit_target': [0.03, 0.04, 0.05, 0.06],  # 3%, 4%, 5%, 6%
    'stop_loss': [0.015, 0.02, 0.025],  # 1.5%, 2%, 2.5%
    
    # Volume filter
    'require_volume': [True, False],
    
    # Max hold time (hours)
    'max_hold_hours': [48, 72, 96],
}

# Calculate total combinations
total_combinations = np.prod([len(v) for v in param_grid.values()])
print(f"Total parameter combinations to test: {total_combinations:,}")
print(f"\nParameter ranges:")
for param, values in param_grid.items():
    print(f"  {param}: {values}")

In [None]:
# Load data once (reuse for all parameter combinations)
print("Fetching 360 days of BTC/USDT 1h data...")
df = DataFetcher.fetch_ohlcv("BTC/USDT", "1h", 360)
print(f"✅ Loaded {len(df)} candles")

# Split into train/test (70/30)
train_size = int(len(df) * 0.7)
df_train = df.iloc[:train_size]
df_test = df.iloc[train_size:]

print(f"Train: {len(df_train)} bars, Test: {len(df_test)} bars")

In [None]:
# Optimization function
def test_parameters(params, df_train, df_test):
    """
    Test a single parameter combination
    
    Returns dict with parameters and performance metrics
    """
    # Build config from parameters
    indicator_config = {
        'sma_fast': params['sma_fast'],
        'sma_slow': params['sma_slow'],
        'volume_ma_period': 20,
        'rsi_period': 14,
        'bb_period': 20,
        'bb_std': 2,
    }
    
    profile_config = {
        'market': {'symbol': 'BTC/USDT', 'timeframe': '1h'},
        'indicators': indicator_config,
        'ml': {
            'enabled': True,
            'prediction_horizon': 48,
            'threshold': params['ml_threshold'],
            'model_params': {
                'n_estimators': 200,
                'max_depth': 4,
                'learning_rate': 0.05,
                'subsample': 0.8,
                'colsample_bytree': 0.8,
                'random_state': 42,
            }
        },
        'entry': {
            'require_sma_cross': True,
            'require_volume': params['require_volume'],
            'require_ml': True,
            'entry_fee': 0.001,
        },
        'exit': {
            'profit_target': params['profit_target'],
            'stop_loss': params['stop_loss'],
            'max_hold_hours': params['max_hold_hours'],
            'exit_on_sma_cross_down': True,
            'exit_fee': 0.001,
        },
    }
    
    output_config = {'base_path': '../output', 'save_model': False}
    
    try:
        # Calculate indicators
        df_train_ind = TechnicalIndicators.calculate_all(df_train.copy(), indicator_config)
        df_test_ind = TechnicalIndicators.calculate_all(df_test.copy(), indicator_config)
        
        # Train ML model
        ml_model = MLModel(profile_config['ml'], output_config, 'OPTIMIZE')
        ml_model.train(df_train_ind)
        ml_predictions = ml_model.predict(df_test_ind)
        
        # Run backtest
        backtester = Backtester(profile_config, ml_predictions)
        trades_df, signals_df, final_equity = backtester.run(df_test_ind)
        
        # Calculate metrics
        metrics = PerformanceMetrics.calculate(trades_df, profile_config)
        
        if not metrics:
            return None
        
        # Combine parameters and results
        result = {
            **params,  # All parameters
            'net_return_pct': round(metrics['net_return'] * 100, 2),
            'annualized_pct': round((metrics['net_return'] / (len(df_test_ind) / 24)) * 365 * 100, 1),
            'total_trades': metrics['num_trades'],
            'win_rate_pct': round(metrics['win_rate'] * 100, 1),
            'profit_factor': round(metrics['profit_factor'], 2),
            'sharpe_ratio': round(metrics['sharpe_ratio'], 2),
            'max_drawdown_pct': round(metrics['max_drawdown'] * 100, 2),
            'avg_win_pct': round(metrics['avg_win'] * 100, 2),
            'avg_loss_pct': round(metrics['avg_loss'] * 100, 2),
        }
        
        return result
        
    except Exception as e:
        print(f"Error with params {params}: {str(e)}")
        return None

print("✅ Optimization function defined")

In [None]:
# Run optimization (PARALLEL - uses all CPU cores)
# Set n_jobs to specific number if you want to limit cores (e.g., n_jobs=4)
# Set n_jobs=-1 to use all available cores

import multiprocessing
n_cores = multiprocessing.cpu_count()
print(f"🚀 Using {n_cores} CPU cores for parallel processing\n")

param_combinations = list(product(*param_grid.values()))
param_keys = list(param_grid.keys())

print(f"Starting optimization of {len(param_combinations)} combinations...")

# Time a small sample to estimate total time
print("\n⏱️  Running sample batch to estimate completion time...")
sample_size = min(n_cores * 2, len(param_combinations))
sample_start = time.time()

sample_results = Parallel(n_jobs=-1)(
    delayed(test_parameters)(dict(zip(param_keys, values)), df_train, df_test)
    for values in param_combinations[:sample_size]
)

sample_time = time.time() - sample_start
avg_time_per_combo = sample_time / sample_size
estimated_total_time = (avg_time_per_combo * len(param_combinations)) / n_cores

print(f"✅ Sample completed in {sample_time:.1f}s")
print(f"📊 Average time per combination: {avg_time_per_combo:.2f}s")
print(f"⏰ Estimated total time: {estimated_total_time/60:.1f} minutes\n")

# Run full optimization with progress bar
print(f"Running full optimization with progress bar...\n")
start_time = time.time()

results = Parallel(n_jobs=-1)(
    delayed(test_parameters)(dict(zip(param_keys, values)), df_train, df_test)
    for values in tqdm(param_combinations, desc="Optimizing", unit="combo")
)

# Filter out None results (failed runs)
results = [r for r in results if r is not None]

elapsed_time = time.time() - start_time
print(f"\n✅ Optimization complete in {elapsed_time/60:.1f} minutes!")
print(f"Successfully tested {len(results)} combinations")
print(f"Average time per combination: {elapsed_time/len(param_combinations):.2f}s")

# Convert to DataFrame
results_df = pd.DataFrame(results)
print(f"\nResults DataFrame shape: {results_df.shape}")
display(results_df.head())

In [None]:
# Export results to CSV
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = PROJECT_ROOT / "output" / f"optimization_results_{timestamp}.csv"

results_df.to_csv(output_path, index=False)
print(f"✅ Results exported to: {output_path}")
print(f"\nFile contains {len(results_df)} rows with {len(results_df.columns)} columns")
print(f"\nColumns: {', '.join(results_df.columns.tolist())}")

In [None]:
# Quick analysis - Top performers
print("=" * 80)
print("TOP 10 by Annualized Return")
print("=" * 80)
top_annualized = results_df.nlargest(10, 'annualized_pct')
display(top_annualized)

print("\n" + "=" * 80)
print("TOP 10 by Sharpe Ratio")
print("=" * 80)
top_sharpe = results_df.nlargest(10, 'sharpe_ratio')
display(top_sharpe)

print("\n" + "=" * 80)
print("TOP 10 by Profit Factor (with at least 3 trades)")
print("=" * 80)
top_pf = results_df[results_df['total_trades'] >= 3].nlargest(10, 'profit_factor')
display(top_pf)

In [None]:
# Custom filtering examples
# Modify these filters to find what you're looking for

print("=" * 80)
print("CUSTOM FILTER: High return, low drawdown, decent trade count")
print("=" * 80)

custom_filter = results_df[
    (results_df['annualized_pct'] > 30) &           # >30% annual
    (results_df['max_drawdown_pct'] > -5) &         # <5% drawdown
    (results_df['total_trades'] >= 3) &              # At least 3 trades
    (results_df['win_rate_pct'] >= 50)               # >50% win rate
].sort_values('annualized_pct', ascending=False)

print(f"Found {len(custom_filter)} combinations matching criteria\n")
display(custom_filter)

# You can add more custom filters here
# Example: Focus on specific SMA combinations
print("\n" + "=" * 80)
print("Filter by specific SMA combination (30/60)")
print("=" * 80)
sma_filter = results_df[
    (results_df['sma_fast'] == 30) &
    (results_df['sma_slow'] == 60)
].sort_values('annualized_pct', ascending=False)
display(sma_filter.head(10))