# Loading training data from object store

In [None]:
import pandas as pd

qb = QuantBook()
path = qb.ObjectStore.GetFilePath("group_4_crypto_trading_with_sentiment_sprin_2024/final_indicators_df.csv")
df = pd.read_csv(path)
df = df.set_index('Unnamed: 0')

# Filter the data to only include the dates in training set
df = df[df.index >= '2021-04-15']
df = df[df.index < '2024-04-12']
df = df.ffill()

df.head(10)

# Baseline Random Forest Model

In [None]:
# converting returns to 15 day moving average
df['15_day_moving_avg_return'] = df['return'].rolling(window=15).mean()
df = df.dropna(subset=['15_day_moving_avg_return'])

df.head(10)

In [None]:
from sklearn.ensemble import RandomForestRegressor
import numpy as np
from sklearn.metrics import mean_squared_error
import matplotlib.dates as mdates
import seaborn as sns

def train_evaluate_and_plot(X_train, y_train, X_test, y_test, plot_title):
    """Train, evaluate and plot a random forest model."""
    model = RandomForestRegressor()
    # fit model
    model.fit(X_train, y_train)

    # make predictions
    y_pred = model.predict(X_test)

    # evaluate model
    mse = mean_squared_error(y_test, y_pred)
    print(f"Mean Squared Error: {mse}")

    r_squared = model.score(X_test, y_test)
    print(f"R Squared: {r_squared}")

    # make a dataframe with y_test and y_pred
    results = pd.DataFrame({"y_test": y_test, "y_pred": y_pred})

    # compute residuals
    results["residuals"] = results["y_test"] - results["y_pred"]

    # Set the style of seaborn
    sns.set_style("darkgrid")

    # plot results
    plt.figure(figsize=(15, 8))
    plt.plot(results["y_test"], label="Actual_15DAR", linewidth=2)
    plt.plot(results["y_pred"], label="Predicted_15DAR", linewidth=2)
    plt.plot(results["y_test"].rolling(15).mean(), label="15_SMA", linewidth=2)
    plt.legend(loc="upper center")
    plt.title(plot_title, fontsize=16)
    plt.xticks(rotation=90)
    days_fmt = mdates.DateFormatter("%Y-%m-%d")
    plt.gca().xaxis.set_major_formatter(days_fmt)
    # Set x-axis major ticks to occur every 15 days.
    plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=15))
    plt.ylabel("BTC Returns (percentage change)", fontsize=14)


    plt.xticks(rotation=90)
    plt.show()

    # feature importance
    importances = model.feature_importances_
    indices = np.argsort(importances)[::-1]

    # Print the feature ranking
    print("Feature ranking:")
    for f in range(X_train.shape[1]):
        print(f"{f + 1}. {X_train.columns[indices[f]]} ({importances[indices[f]]})")

    # Plot the feature importances of the forest
    plt.figure(figsize=(15, 8))
    plt.title("Feature importances", fontsize=16)
    plt.bar(
        range(X_train.shape[1]), importances[indices], align="center", color="skyblue"
    )
    plt.xticks(range(X_train.shape[1]), np.array(X_train.columns)[indices], rotation=90)
    plt.xlim([-1, X_train.shape[1]])

    plt.show()

    return model, results

In [None]:
from datetime import datetime

# implementing the random forest model
X = df.drop(
    columns=[
        "15_day_moving_avg_return",
        "return",
    ]
)
y = df["15_day_moving_avg_return"]


train_start = "2022-04-10"
train_end = "2024-03-12"
test_start = "2021-04-10"
test_end = "2021-12-10"

X_train = X[(X.index >= train_start) & (X.index <= train_end)]
y_train = y[(y.index >= train_start) & (y.index <= train_end)]
X_train.index = pd.to_datetime(X_train.index)
y_train.index = pd.to_datetime(y_train.index)

X_test = X[(X.index >= test_start) & (X.index <= test_end)]
y_test = y[(y.index >= test_start) & (y.index <= test_end)]
X_test.index = pd.to_datetime(X_test.index)
y_test.index = pd.to_datetime(y_test.index)

model, results = train_evaluate_and_plot(
    X_train, y_train, X_test, y_test, "Basline Random Forest Model"
)

In [None]:
# calculate metrics
results["actual_direction"] = results["y_test"].apply(lambda x: 1 if x > 0 else 0)
results["Pred_direction"] = results["y_pred"].apply(lambda x: 1 if x > 0 else 0)

tp = results[
    (results["actual_direction"] == 1) & (results["Pred_direction"] == 1)
].shape[0]
fp = results[
    (results["actual_direction"] == 0) & (results["Pred_direction"] == 1)
].shape[0]
fn = results[
    (results["actual_direction"] == 1) & (results["Pred_direction"] == 0)
].shape[0]
tn = results[
    (results["actual_direction"] == 0) & (results["Pred_direction"] == 0)
].shape[0]

print(f"True Positives: {tp}")
print(f"False Positives: {fp}")
print(f"False Negatives: {fn}")
print(f"True Negatives: {tn}")

# calculate precision
precision = tp / (tp + fp)
print(f"Precision: {precision}")

# calculate recall
recall = tp / (tp + fn)
print(f"Recall: {recall}")

# calculate f1 score
f1 = 2 * (precision * recall) / (precision + recall)
print(f"F1 Score: {f1}")

# calculate accuracy
accuracy = (tp + tn) / (tp + tn + fp + fn)
print(f"Accuracy: {accuracy}")

# Memory Feature Random Forest

In [None]:
# Define the number of lags
n_lags = 3

# Create a new DataFrame for lagged features
lagged_features = []
lagged_features_cols = []

X_now = df.drop(
    columns=[
        "15_day_moving_avg_return",
        "return",
    ]
)
y_now = df["15_day_moving_avg_return"]

# Create lagged versions of features
for column in X_now.columns:
    for lag in range(1, n_lags + 1):
        lagged_features.append(X_now[column].rolling(lag * 10).sum())
        lagged_features_cols.append(f"{column}_rolling_{lag * 10}")

lagged_features = pd.concat(lagged_features, axis=1)
lagged_features.columns = lagged_features_cols

# Concatenate the original DataFrame with the DataFrame of lagged features
X_now = pd.concat([X_now, lagged_features], axis=1)

# Drop rows with NaN values caused by lagging in both X and y
X_now = X_now.dropna()
y_now = y_now[X_now.index]

X_train = X_now[(X_now.index >= train_start) & (X_now.index <= train_end)]
y_train = y_now[(y_now.index >= train_start) & (y_now.index <= train_end)]
X_train.index = pd.to_datetime(X_train.index)
y_train.index = pd.to_datetime(y_train.index)

X_test = X_now[(X_now.index >= test_start) & (X_now.index <= test_end)]
y_test = y_now[(y_now.index >= test_start) & (y_now.index <= test_end)]
X_test.index = pd.to_datetime(X_test.index)
y_test.index = pd.to_datetime(y_test.index)

# Train, evaluate, and plot the model
model, results = train_evaluate_and_plot(
    X_train, y_train, X_test, y_test, "Fine Tuned Random Forest Model"
)

In [None]:
# make another model with best features from the above random forest model
importances = model.feature_importances_
indices = np.argsort(importances)[::-1]
cols = np.array(X_train.columns)[indices][:30]
X = X_now[cols]
y = y_now

X_train = X[(X.index >= train_start) & (X.index <= train_end)]
y_train = y[(y.index >= train_start) & (y.index <= train_end)]
X_train.index = pd.to_datetime(X_train.index)
y_train.index = pd.to_datetime(y_train.index)

X_test = X[(X.index >= test_start) & (X.index <= test_end)]
y_test = y[(y.index >= test_start) & (y.index <= test_end)]
X_test.index = pd.to_datetime(X_test.index)
y_test.index = pd.to_datetime(y_test.index)

model, results = train_evaluate_and_plot(
    X_train,
    y_train,
    X_test,
    y_test,
    "Fine Tuned Random Forest Model with Best Features",
)

In [None]:
# calculate metrics
results["actual_direction"] = results["y_test"].apply(lambda x: 1 if x > 0 else 0)
results["Pred_direction"] = results["y_pred"].apply(lambda x: 1 if x > 0 else 0)

tp = results[
    (results["actual_direction"] == 1) & (results["Pred_direction"] == 1)
].shape[0]
fp = results[
    (results["actual_direction"] == 0) & (results["Pred_direction"] == 1)
].shape[0]
fn = results[
    (results["actual_direction"] == 1) & (results["Pred_direction"] == 0)
].shape[0]
tn = results[
    (results["actual_direction"] == 0) & (results["Pred_direction"] == 0)
].shape[0]

print(f"True Positives: {tp}")
print(f"False Positives: {fp}")
print(f"False Negatives: {fn}")
print(f"True Negatives: {tn}")

# calculate precision
precision = tp / (tp + fp)
print(f"Precision: {precision}")

# calculate recall
recall = tp / (tp + fn)
print(f"Recall: {recall}")

# calculate f1 score
f1 = 2 * (precision * recall) / (precision + recall)
print(f"F1 Score: {f1}")

# calculate accuracy
accuracy = (tp + tn) / (tp + tn + fp + fn)
print(f"Accuracy: {accuracy}")

> Visualize a sample tree (until depth 2)

In [None]:
from sklearn.tree import plot_tree

# Plot the first tree from the forest
plt.figure(figsize=(15, 10))
plot_tree(model.estimators_[0], filled=True, max_depth=2, feature_names=cols)
plt.show()