In [None]:
import pandas as pd
import numpy as np
import talib
import yfinance as yf
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import classification_report
import shap
import plotly.express as px
import plotly.graph_objects as go
import sys

In [None]:
df = pd.read_csv('meta_historical.csv')
df['Date'] = pd.to_datetime(df['Date'])
df['Close/Last'] = df['Close/Last'].str.replace("$", "", case=False, regex=False)
df = df.astype({'Close/Last': float})
df['Open'] = df['Open'].str.replace("$", "", case=False, regex=False)
df = df.astype({'Open': float})
df['High'] = df['High'].str.replace("$", "", case=False, regex=False)
df = df.astype({'High': float})
df['Low'] = df['Low'].str.replace("$", "", case=False, regex=False)
df = df.astype({'Low': float})
df['Return'] = df['Close/Last'].pct_change()
df.sort_values(by='Date',inplace=True)

In [None]:
df

In [None]:
sp500 = yf.download('^GSPC', start='2015-02-09', end='2025-02-07')
vix = yf.download('^VIX', start='2015-02-09', end='2025-02-07')

In [None]:
sp500.columns = sp500.columns.get_level_values(0)
vix.columns = vix.columns.get_level_values(0)

In [None]:
vix

In [None]:
sp500

In [None]:
df = pd.merge(df, sp500[['Close']], left_on='Date', right_index=True, how='left', suffixes=('', '_SP500'))
df = pd.merge(df, vix[['Close']], left_on='Date', right_index=True, how='left', suffixes=('', '_VIX'))

In [None]:
df.rename(columns={'Close': 'Close_SP500'}, inplace=True)
df

In [None]:
# df['Target'] = (df['Price_Change'] >= 0.01).astype(int)
df['Price_Change'] = (df['Close/Last'].shift(-1) - df['Close/Last']) / df['Close/Last']
df

In [None]:
df['Target'] = (df['Price_Change'] >= 0.01).astype(int)
df.at[0, 'Target'] = np.nan
df

<h1>Adding Technical Indicators</h1>

---

**Moving Average (MA)** is the average price of a stock over a specific period (e.g., 10 days).
- smooths out price fluctuations and helps identify trends.

Moving averages are used to:
- Identify trends (upward, downward, or sideways).
- Generate buy/sell signals (e.g., when the price crosses above/below the MA).


In [None]:
# df['MA_10'] = df['Close/Last'].rolling(window=10).mean()
df['MA_10_Return'] = df['Return'].rolling(10).mean()
df

---

**Relative Strength Index (RSI)** is a momentum oscillator that measures the speed and change of price movements.
- ranges from 0 to 100
- used to identify *overbought (RSI > 70)* or *oversold (RSI < 30)* conditions.

Why It Matters:
- helps traders identify *potential reversals in price trends*.
- It’s useful for spotting when a stock might be overbought (due for a pullback) or oversold (due for a bounce).

In [None]:
# df['RSI_14'] = talib.RSI(df['Close/Last'], timeperiod=14)
df['RSI_14_Return'] = talib.RSI(df['Return'], timeperiod=14)
df

---

**Moving Average Convergence Divergence (MACD)** is a trend-following momentum indicator.

It consists of:
- MACD Line: The difference between a 12-day and 26-day exponential moving average (EMA).
- Signal Line: A 9-day EMA of the MACD Line.
- MACD Histogram: The difference between the MACD Line and the Signal Line.

Helps traders identify:
- Trend direction (upward or downward).
- Buy/sell signals (e.g., when the MACD Line crosses above/below the Signal Line).

In [None]:
df['MACD'], df['MACD_Signal'], df['MACD_Hist'] = talib.MACD(df['Return'], fastperiod=12, slowperiod=26, signalperiod=9)
df[['Date', 'Close/Last', 'MACD', 'MACD_Signal', 'MACD_Hist']]#.head(30)

In [None]:
df[df['MACD'] > 0]

**Volatility**
(10-Day Rolling Standard Deviation)

In [None]:
# df['Volatility'] = df['Close/Last'].pct_change().rolling(10).std()
df['Volatility_Return'] = df['Return'].rolling(10).std()
df

In [None]:
df.isnull().sum()

**Lagged Prices**

In [None]:
# df['Close_Lag_5'] = df['Close/Last'].shift(5)
df['Return_Lag_5'] = df['Return'].shift(5)
df.head(10)

**Volume Trends** 
(10-Day Moving Average of Volume):

In [None]:
df['Volume_MA_10'] = df['Volume'].rolling(window=10).mean()
df[['Date', 'Volume', 'Volume_MA_10']].head(15)

**High-Low Range**

In [None]:
df['Range'] = df['High'] - df['Low']
df[['Date', 'High', 'Low', 'Range']].head()

**SP500_Return & VIX_MA_10**

In [None]:
df['SP500_Return'] = df['Close_SP500'].pct_change()
df['VIX_MA_10'] = df['Close_VIX'].rolling(window=10).mean()

In [None]:
print(df.isnull().sum())

df = df.dropna()

print(df.isnull().sum())

In [None]:
df

In [None]:
df.value_counts('Target')

---

<h1>Building Prediction Model</h1>
<h4>Classification: price will rise by at least 1% (1) or not (0)</h4>

Why It Matters:

- A predictive model can help traders and investors make better decisions by forecasting price movements.
- For market-making firms and banks, such models are used to:
    - Set bid-ask spreads.
    - Manage inventory risk.
    - Identify arbitrage opportunities.

In [None]:
# train_df = df[(df['Date'] >= '2018-01-01') & (df['Date'] <= '2023-04-30')]
train_df = df[(df['Date'] >= '2023-01-01') & (df['Date'] <= '2024-04-30')]
test_df = df[df['Date'] > '2024-04-30']

In [None]:
test_df

In [None]:
train_df = train_df.dropna()
test_df = test_df.dropna()

In [None]:
# 193 test rows
# 333 train rows

In [None]:
df.columns

**Removed (accroding to feature importances):**
- `Return_Lag_5`
- `MACD`
- `MACD_Signal`

In [None]:
# features = ['MA_10_Return', 'RSI_14_Return', 'MACD', 'MACD_Signal', 'MACD_Hist', 'Volatility_Return', 'Return_Lag_5', 'Volume_MA_10', 'Range', 'SP500_Return', 'VIX_MA_10']
features = ['MA_10_Return', 'RSI_14_Return', 'MACD_Hist', 'Volatility_Return', 'Volume_MA_10', 'Range', 'SP500_Return', 'VIX_MA_10']
X_train = train_df[features]
X_test = test_df[features]
y_train = train_df['Target']
y_test = test_df['Target']

In [None]:
df

In [None]:
y_train.value_counts()

In [None]:
y_test.value_counts()

In [None]:
scale_pos_weight = len(y_train[y_train == 0]) / len(y_train[y_train == 1])
model = XGBClassifier(
    scale_pos_weight=scale_pos_weight,
    n_estimators=200,
    max_depth=5,
    learning_rate=0.1,
    subsample=0.8,
    random_state=42
)
model.fit(X_train, y_train)

# y_proba = model.predict_proba(X_test)[:, 1]
# y_pred = (y_proba >= 0.3).astype(int)
y_pred = model.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

print(f'Results:')
print(f'Accuracy: {accuracy:.2f}')
print(f'Precision: {precision:.2f}')
print(f'Recall: {recall:.2f}')
print(f'F1-Score: {f1:.2f}')

In [None]:
# generate signals
# Add predicted probabilities and signals to the test set
test_df = X_test.copy()
test_df['Close/Last'] = df['Close/Last'].loc[y_test.index]
test_df['Predicted_Prob'] = y_pred
test_df['Signal'] = y_pred

# simulate trades
initial_capital = 10000
capital = initial_capital
position = 0
shares_bought = 0
portfolio_value = []
holding_days = 0
buy_price = 0

for i in range(len(test_df)):
    close_price = test_df['Close/Last'].iloc[i]
    
    # buy Signal
    if test_df['Signal'].iloc[i] == 1 and position == 0:
        buy_price = close_price * (1 + 0.0035)  # add slippage
        shares_bought = capital / buy_price
        capital = 0
        position = 1
        holding_days = 0
    
    # sell signal (after holding period or stop-loss)
    elif position == 1:
        holding_days += 1
        sell_price = close_price * (1 - 0.0035)  # subtract slippage
        
        # stop-loss check
        if (sell_price / buy_price - 1) <= -0.05:  # 5% stop-loss
            capital = shares_bought * sell_price
            position = 0
        
        # holding period check
        elif holding_days >= 5:  # hold for 5 days
            capital = shares_bought * sell_price
            position = 0
    
    # portfolio value
    portfolio_value.append(capital if position == 0 else shares_bought * close_price)

portfolio = pd.Series(portfolio_value, index=range(len(test_df)))

# performance Metrics
total_profit = portfolio.iloc[-1] - initial_capital
daily_returns = portfolio.pct_change().dropna()
sharpe_ratio = np.sqrt(252) * (daily_returns.mean() / daily_returns.std())
running_max = portfolio.expanding().max()
drawdown = (portfolio - running_max) / running_max
max_drawdown = drawdown.min()

print("Backtesting Results:")
print(f"Total Profit: ${total_profit:.2f}")
print(f"Sharpe Ratio: {sharpe_ratio:.2f}")
print(f"Maximum Drawdown: {max_drawdown:.2%}")

# Plot Portfolio Performance
# plt.figure(figsize=(12, 6))
# plt.plot(portfolio, label='Portfolio Value')
# plt.title('Portfolio Performance')
# plt.xlabel('Date')
# plt.ylabel('Value ($)')
# plt.legend()
# plt.show()

In [None]:
# plot portfolio performance
fig = go.Figure()
fig.add_trace(go.Scatter(x=vis['Date'], y=portfolio, name='Portfolio Value'))
fig.update_layout(title='Portfolio Performance Over Time', xaxis_title='Day', yaxis_title='Value ($)',)
fig.show()

# Sharpe Ratio and drawdown
metrics_fig = go.Figure()
metrics_fig = go.Figure()
metrics_fig.add_trace(go.Indicator(
    mode="number",
    value=sharpe_ratio,
    title={"text": f"Sharpe Ratio:"},
    domain={'row': 0, 'column': 0}
))
metrics_fig.add_trace(go.Indicator(
    mode="number",
    value=max_drawdown,
    title={"text": f"Max Drawdown:"},
    domain={'row': 0, 'column': 1}
))
metrics_fig.update_layout(grid={'rows': 1, 'columns': 2})
metrics_fig.show()

In [None]:
portfolio

In [None]:
importances = model.feature_importances_

# feature importances
feature_importance_df = pd.DataFrame({
    'Feature': features,
    'Importance': importances
}).sort_values(by='Importance', ascending=False)

feature_importance_df

In [None]:
test_og = df[(df['Date'] > '2024-04-30')]
vis = test_df.copy()
vis['Date'] = df['Date'].loc[vis.index]
vis['Close'] = df['Close/Last'].loc[vis.index]

fig = go.Figure()

# actual price line
fig.add_trace(go.Scatter(x=test_og['Date'], y=test_og['Close/Last'], mode='lines', name='Actual Price', line=dict(color='blue', width=2)))

# buy signals
fig.add_trace(go.Scatter(x=vis[vis['Signal'] == 1]['Date'], y=vis[vis['Signal'] == 1]['Close'], mode='markers', name='Buy Signal', marker=dict(color='green', symbol='triangle-up', size=10)))

# sell signals
fig.add_trace(go.Scatter(x=vis[vis['Signal'] == 0]['Date'], y=vis[vis['Signal'] == 0]['Close'], mode='markers', name='Sell Signal', marker=dict(color='red', symbol='triangle-down', size=10)))

fig.update_layout(
    title='Actual Price vs. Model Predictions',
    xaxis_title='Date',
    yaxis_title='Price ($)',
    legend=dict(x=0, y=1, traceorder='normal')
)
fig.show()

In [None]:
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)

shap.summary_plot(shap_values, X_test, plot_type="bar", max_display=10, color='orange')

LogisticRegression as ML model

In [None]:
# param_grid = {
#     'C': [0.01, 0.1, 1, 10],  # Regularization strength
#     'penalty': ['l1', 'l2']    # Regularization type
# }

# model = LogisticRegression(random_state=42, solver='liblinear')

# grid_search = GridSearchCV(model, param_grid, cv=3, scoring='accuracy')
# grid_search.fit(X_train, y_train)

# print(f'Best Parameters: {grid_search.best_params_}')

# best_model = grid_search.best_estimator_
# best_model.fit(X_train, y_train)

# y_pred = best_model.predict(X_test)

# accuracy = accuracy_score(y_test, y_pred)
# precision = precision_score(y_test, y_pred)
# recall = recall_score(y_test, y_pred)
# f1 = f1_score(y_test, y_pred)

# # Print the results
# print(f'Logistic Regression Results:')
# print(f'Accuracy: {accuracy:.2f}')
# print(f'Precision: {precision:.2f}')
# print(f'Recall: {recall:.2f}')
# print(f'F1-Score: {f1:.2f}')

RandomForest as ML model

In [None]:
# param_grid = {
#     'n_estimators': [50, 100, 200, 500, 750],
#     'max_depth': [None, 2, 5, 10, 20, 50]
# }

# grid_search = GridSearchCV(RandomForestClassifier(), param_grid, cv=3)

# grid_search.fit(X_train, y_train)
# print(grid_search.best_params_)

In [None]:
# model = RandomForestClassifier(max_depth=10, n_estimators=500, random_state=42)

# model.fit(X_train, y_train)
# y_pred = model.predict(X_test)

# accuracy = accuracy_score(y_test, y_pred)
# precision = precision_score(y_test, y_pred)
# recall = recall_score(y_test, y_pred)
# f1 = f1_score(y_test, y_pred)

# print(f'Accuracy: {accuracy:.2f}')
# print(f'Precision: {precision:.2f}')
# print(f'Recall: {recall:.2f}')
# print(f'F1-Score: {f1:.2f}')

In [None]:
# param_grid = {
#     'n_estimators': [100, 200, 250],
#     'max_depth': [3, 5, 7, 10, None],
#     'min_samples_split': [2, 5, 7, 10]
# }

# model = RandomForestClassifier(random_state=42)

# grid_search = GridSearchCV(model, param_grid, cv=3, scoring='accuracy')
# grid_search.fit(X_train, y_train)

# print(f'Best Parameters: {grid_search.best_params_}')

# # best_model = grid_search.best_estimator_
# best_model.fit(X_train, y_train)

# y_pred = best_model.predict(X_test)

# accuracy = accuracy_score(y_test, y_pred)
# precision = precision_score(y_test, y_pred)
# recall = recall_score(y_test, y_pred)
# f1 = f1_score(y_test, y_pred)

# print(f'Random Forest Results:')
# print(f'Accuracy: {accuracy:.2f}')
# print(f'Precision: {precision:.2f}')
# print(f'Recall: {recall:.2f}')
# print(f'F1-Score: {f1:.2f}')

In [None]:
# RandomForestClassifier(max_depth=3, min_samples_split=2, n_estimators=250, random_state=42)

# model.fit(X_train, y_train)
# y_pred = model.predict(X_test)

# accuracy = accuracy_score(y_test, y_pred)
# precision = precision_score(y_test, y_pred)
# recall = recall_score(y_test, y_pred)
# f1 = f1_score(y_test, y_pred)

# print(f'Accuracy: {accuracy:.2f}')
# print(f'Precision: {precision:.2f}')
# print(f'Recall: {recall:.2f}')
# print(f'F1-Score: {f1:.2f}')