<a href="https://colab.research.google.com/github/marikaba/Market-Scenario-Optimization/blob/main/%20Market%20Scenario%20Classification%20Framework.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from scipy.stats import uniform
import pandas as pd
import talib as ta
import numpy as np
from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold
from skopt import gp_minimize
from skopt.space import Real

# Calculate Technical Indicators
df['SMA'] = ta.SMA(df['Close'], timeperiod=50)
df['EMA'] = ta.EMA(df['Close'], timeperiod=21)
df['MACD'], df['MACD_Signal'], df['MACD_Hist'] = ta.MACD(df['Close'])
df['RSI'] = ta.RSI(df['Close'], timeperiod=14)
df['Upper_BB'], df['Middle_BB'], df['Lower_BB'] = ta.BBANDS(df['Close'], timeperiod=20)
df['%K'], df['%D'] = ta.STOCH(df['High'], df['Low'], df['Close'])
df['ADX'] = ta.ADX(df['High'], df['Low'], df['Close'], timeperiod=14)
df['DI+'] = ta.PLUS_DI(df['High'], df['Low'], df['Close'], timeperiod=14)
df['DI-'] = ta.MINUS_DI(df['High'], df['Low'], df['Close'], timeperiod=14)

# Create a dummy 'true_scenario' column for testing purposes if not available
df['true_scenario'] = np.random.choice(['Strong Bullish', 'Strong Bearish', 'Overbought', 'Oversold', 'Mixed Signals'], size=len(df))

# Function to classify scenarios based on scores and thresholds
def classify_scenario(bullish_score, bearish_score, strong_thresh, over_thresh):
    if bullish_score >= strong_thresh:
        return 'Strong Bullish'
    elif bearish_score >= strong_thresh:
        return 'Strong Bearish'
    elif bullish_score >= over_thresh:
        return 'Oversold'
    elif bearish_score >= over_thresh:
        return 'Overbought'
    return 'Mixed Signals'

# Bayesian optimization for weight and threshold optimization
def bayesian_score_function(params):
    weights = params[:6]
    strong_thresh = params[6]
    over_thresh = params[7]
    predictions = []

    for _, row in df.iterrows():
        bullish_score = (
            weights[0] * (row['SMA'] > row['EMA']) +
            weights[1] * (row['MACD'] > row['MACD_Signal']) +
            weights[2] * (25 <= row['RSI'] <= 75) +
            weights[3] * (row['ADX'] > 25 and row['DI+'] > row['DI-']) +
            weights[4] * (row['Close'] < row['Lower_BB']) +
            weights[5] * (row['%K'] < 20)
        )

        bearish_score = (
            weights[0] * (row['SMA'] < row['EMA']) +
            weights[1] * (row['MACD'] < row['MACD_Signal']) +
            weights[2] * (25 <= row['RSI'] <= 75) +
            weights[3] * (row['ADX'] > 25 and row['DI-'] > row['DI+']) +
            weights[4] * (row['Close'] > row['Upper_BB']) +
            weights[5] * (row['%K'] > 80)
        )

        predictions.append(classify_scenario(bullish_score, bearish_score, strong_thresh, over_thresh))

    # Replace 'true_scenario' with your actual label column
    return -accuracy_score(df['true_scenario'], predictions)

# Cross-validation setup for stability testing
kf = KFold(n_splits=5, shuffle=True, random_state=42)
thresholds_results = []
weights_results = []

# Define search space for Bayesian optimization
search_space = [
    Real(0, 2, name='weight_SMA_EMA'),
    Real(0, 2, name='weight_MACD'),
    Real(0, 2, name='weight_RSI'),
    Real(0, 2, name='weight_ADX_DI'),
    Real(0, 2, name='weight_BBANDS'),
    Real(0, 2, name='weight_STOCH'),
    Real(2.0, 3.0, name='strong_thresh'),
    Real(0.8, 1.2, name='over_thresh')
]

## Run Bayesian optimization with cross-validation
for train_index, test_index in kf.split(df):
    train_df = df.iloc[train_index]
    test_df = df.iloc[test_index]

    def score_function_cv(params):
        weights = params[:6]
        strong_thresh = params[6]
        over_thresh = params[7]
        predictions = []

        for _, row in test_df.iterrows():
            bullish_score = (
                weights[0] * (row['SMA'] > row['EMA']) +
                weights[1] * (row['MACD'] > row['MACD_Signal']) +
                weights[2] * (25 <= row['RSI'] <= 75) +
                weights[3] * (row['ADX'] > 25 and row['DI+'] > row['DI-']) +
                weights[4] * (row['Close'] < row['Lower_BB']) +
                weights[5] * (row['%K'] < 20)
            )

            bearish_score = (
                weights[0] * (row['SMA'] < row['EMA']) +
                weights[1] * (row['MACD'] < row['MACD_Signal']) +
                weights[2] * (25 <= row['RSI'] <= 75) +
                weights[3] * (row['ADX'] > 25 and row['DI-'] > row['DI+']) +
                weights[4] * (row['Close'] > row['Upper_BB']) +
                weights[5] * (row['%K'] > 80)
            )

            predictions.append(classify_scenario(bullish_score, bearish_score, strong_thresh, over_thresh))

        return -accuracy_score(test_df['true_scenario'], predictions)

    result = gp_minimize(score_function_cv, search_space, n_calls=50, random_state=42)
    optimal_params = result.x

    thresholds_results.append({
        'strong_thresh': optimal_params[6],
        'over_thresh': optimal_params[7]
    })
    weights_results.append(optimal_params[:6])

# Average the thresholds and weights across folds
final_thresholds = {
    'strong_thresh': np.mean([t['strong_thresh'] for t in thresholds_results]),
    'over_thresh': np.mean([t['over_thresh'] for t in thresholds_results])
}
final_weights = np.mean(weights_results, axis=0)

print("Final Averaged Thresholds:", final_thresholds)
print("Final Averaged Weights:", final_weights)

# Define updated weights dictionary
weights = {
    'SMA_EMA': final_weights[0],
    'MACD': final_weights[1],
    'RSI': final_weights[2],
    'ADX_DI': final_weights[3],
    'BBANDS': final_weights[4],
    'STOCH': final_weights[5]
}

# Function to determine market scenario using updated weights and thresholds
def determine_scenario(row):
    bullish_score = 0
    bearish_score = 0

    # Calculate scores
    if row['SMA'] > row['EMA']:
        bullish_score += weights['SMA_EMA']
    elif row['SMA'] < row['EMA']:
        bearish_score += weights['SMA_EMA']

    if row['MACD'] > row['MACD_Signal']:
        bullish_score += weights['MACD']
    elif row['MACD'] < row['MACD_Signal']:
        bearish_score += weights['MACD']

    if 25 <= row['RSI'] <= 75:
        bullish_score += weights['RSI']
        bearish_score += weights['RSI']

    if row['ADX'] > 25:
        if row['DI+'] > row['DI-']:
            bullish_score += weights['ADX_DI']
        elif row['DI-'] > row['DI+']:
            bearish_score += weights['ADX_DI']

    if row['Close'] > row['Upper_BB']:
        bearish_score += weights['BBANDS']
    elif row['Close'] < row['Lower_BB']:
        bullish_score += weights['BBANDS']

    if row['%K'] > 80:
        bearish_score += weights['STOCH']
    elif row['%K'] < 20:
        bullish_score += weights['STOCH']

    # Classify scenario based on optimized thresholds
    if bullish_score >= final_thresholds['strong_thresh']:
        return 'Strong Bullish'
    elif bearish_score >= final_thresholds['strong_thresh']:
        return 'Strong Bearish'
    elif bullish_score >= final_thresholds['over_thresh']:
        return 'Oversold'
    elif bearish_score >= final_thresholds['over_thresh']:
        return 'Overbought'
    return 'Mixed Signals'

# Apply function to DataFrame
df['Scenario'] = df.apply(determine_scenario, axis=1)