<a href="https://colab.research.google.com/github/luck058/kaggle-gold-price-analysis/blob/model-1/model_1_kaggle_gold_price_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


In [None]:
!git clone https://github.com/luck058/kaggle-gold-price-analysis

%cd kaggle-gold-price-analysis

# Create X, y

In [None]:
assets = ["gold", "sp500", "nasdaq", "silver", "oil", "platinum", "palladium"]

# y_col = "gold close diff"
y_col = "gold close"

In [None]:
df = pd.read_csv('financial_regression_cleaned.csv')

original_cols = []
for asset in assets:
    # Only care about "asset"
    original_cols += [f'{asset} open', f'{asset} high', f'{asset} low', f'{asset} close', f'{asset} volume']

df = df[original_cols].reset_index(drop=True)

display(df.head())
print(df.shape)

In [None]:
for asset in assets:
    df[f"{asset} close diff"] = df[f"{asset} close"].diff()

    other_core_cols += [f'{asset} high-low', f'{asset} close-open', f'{asset} close diff']
    df[f'{asset} high-low'] = df[f'{asset} high'] - df[f'{asset} low']
    df[f'{asset} close-open'] = df[f'{asset} close'] - df[f'{asset} open']

    display(df.head())
    print(df.shape)

In [None]:
def create_lag(df, column, lookback, include_zero=True):
    assert column in df.columns
    if include_zero:
        df[f'{column}-0'] = df[column]

    for lag in range(1, lookback):
        if np.log2(lag) % 1 == 0:
            df[f'{column}-{lag}'] = df[column].shift(lag) #

In [None]:
create_lag(df, y_col, 20, include_zero=True)
display(df.head())

In [None]:
def get_max(df, column, lookback, name_append=None):
    assert column in df.columns
    df[f'{column} max{name_append}'] = df[column].rolling(lookback).max()

In [None]:
def get_min(df, column, lookback, name_append=None):
    assert column in df.columns
    df[f'{column} min{name_append}'] = df[column].rolling(lookback).min()

In [None]:
def get_min_max(df, column, lookback, name_append=None):
    assert column in df.columns
    get_max(df, column, lookback, name_append=name_append)
    get_min(df, column, lookback, name_append=name_append)

In [None]:
for column in original_cols + other_core_cols + [f"{y_col}-0"]:
    get_min_max(df, column, 5, " short")


In [None]:
for column in original_cols + other_core_cols + [f"{y_col}-0"]:
    get_min_max(df, column, 20, " long")


In [None]:
def get_mean(df, column, lookback, name_append=None):
    assert column in df.columns
    df[f'{column} mean{name_append}'] = df[column].rolling(lookback).mean()

In [None]:
for column in original_cols + other_core_cols + [f"{y_col}-0"]:
    get_min_max(df, column, 5, " short")

In [None]:
for column in original_cols + other_core_cols + [f"{y_col}-0"]:
    get_min_max(df, column, 20, " long")

## Create y

In [None]:
df[y_col] = df[y_col].shift(-1)
df = df.dropna(axis=0).reset_index(drop=True)

display(df.head())

In [None]:
y = df[y_col]
X = df.drop(y_col, axis=1)

print("y:")
display(pd.Series(y).head())
print("X:")
display(X.head())

print("len(y):")
print(len(y))
print("X.shape:")
print(X.shape)

# Models

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

from sklearn.model_selection import cross_validate

from sklearn.preprocessing import StandardScaler



In [None]:
np.random.seed(1)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=1)
y_class_train = y_train > y_train.mean()
y_class_test = y_test > y_train.mean()

scaler = StandardScaler()
X_train_normalized = pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns)
X_test_normalized = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns)

print("X_train.shape:", X_train.shape)
print("X_test.shape:", X_test.shape)
print("y_train.shape:", y_train.shape)
print("y_test.shape:", y_test.shape)
print("y_class_train.shape:", y_class_train.shape)
print("y_class_test.shape:", y_class_test.shape)

plt.hist(y_train, bins=20, label="Train set")
plt.hist(y_test, bins=10, label="Test set")
plt.title("Distribution of y")
plt.legend()
plt.show()

plt.bar([0, 1], [len(y_class_train) - y_class_train.sum(), y_class_train.sum()], label="Train set")
plt.bar([0, 1], [len(y_class_test) - y_class_test.sum(), y_class_test.sum()], label="Test set")
plt.xticks([0, 1], ["False", "True"])
plt.ylabel("Count")
plt.title("Distribution of y_class")
plt.legend()
plt.show()

In [None]:
class PredictZero:
    """Model which just predicts y as 0 irrespective of X"""
    def fit(self, X, y):
        pass

    def predict(self, X):
        return np.zeros(len(X))

    def score(self, X, y):
        y_pred = self.predict(X)
        return r2_score(y, y_pred)

    def get_params(self, deep=True):
        return {}

In [None]:
class PredictOne:
    """Model which just predicts y as 0 irrespective of X"""
    def fit(self, X, y):
        pass

    def predict(self, X):
        return np.ones(len(X))

    def score(self, X, y):
        y_pred = self.predict(X)
        return r2_score(y, y_pred)

    def get_params(self, deep=True):
        return {}

In [None]:
class PredictPrevious:
    """Model which just predicts y as 0 irrespective of X"""
    def fit(self, X, y):
        pass

    def predict(self, X):
        return X[f"{y_col}-0"]

    def score(self, X, y):
        y_pred = self.predict(X)
        return r2_score(y, y_pred)

    def get_params(self, deep=True):
        return {}

In [None]:
class PredictMean:
    """Model which just predicts y as the mean of y in the training set irrespective of X"""
    def fit(self, X, y):
        self.mean_y = np.mean(y)

    def predict(self, X):
        return np.full(len(X), self.mean_y)

    def score(self, X, y):
        y_pred = self.predict(X)
        return r2_score(y, y_pred)

    def get_params(self, deep=True):
        return {}

In [None]:
%%capture
! pip install ISLP

In [None]:
from sklearn.model_selection import GridSearchCV

from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
from ISLP.bart import BART
from sklearn.ensemble import RandomForestRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.linear_model import Ridge
from sklearn.linear_model import Lasso
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.neural_network import MLPRegressor



In [None]:
def evaluate_model(model, X_train, y_train, other_scoring=[], return_estimator=False):
    cv_results = cross_validate(model, X_train, y_train, scoring=["r2", "neg_mean_squared_error"]+other_scoring, cv=10, return_estimator=return_estimator)
    print(f"R2: {print_cv_results(cv_results['test_r2'])}")
    print(f"MSE: {print_cv_results(cv_results['test_neg_mean_squared_error'] * (-1))}")
    for scoring in other_scoring:
        print(f"{scoring}: {print_cv_results(cv_results[f'test_{scoring}'])}")
    print()
    return cv_results

In [None]:
def print_cv_results(results, z=1.96):
    """Prints the mean with confidence intervals (z-score can be modified, default 95% confidence)"""
    mean = np.mean(results)
    std_err = np.std(results) / np.sqrt(len(results))
    return f"{round(mean, 2)} " + u"\u00B1" + f"{round(z * std_err, 2)}"

### Select models to test

In [None]:
run_regression_models = True # @param {type:"boolean"}
run_tree_based_models = False # @param {type:"boolean"}
run_classification_models = False # @param {type:"boolean"}


### Trivial models

In [None]:
print(f"Model: PredictZero")
evaluate_model(PredictZero(), X_train, y_train)

print(f"Model: PredictMean")
evaluate_model(PredictMean(), X_train, y_train)

print(f"Model: PredictPrevious")
evaluate_model(PredictPrevious(), X_train, y_train)
pass

### Linear regression/ lasso/ ridge

In [None]:
if run_regression_models:
    print(f"Model: LinearRegression")
    linear_regression = evaluate_model(LinearRegression(), X_train, y_train, return_estimator=True)
    display(pd.DataFrame(zip(X.columns, linear_regression["estimator"][0].coef_.round(3))).sort_values(by=1))

In [None]:
if run_regression_models:
    print(f"Model: LinearRegression")
    linear_regression = evaluate_model(LinearRegression(), X_train_normalized, y_train, return_estimator=True)
    display(pd.DataFrame(zip(X.columns, linear_regression["estimator"][0].coef_.round(3))).sort_values(by=1))

In [None]:
if run_regression_models:
    print(f"Model: Ridge")
    alphas = [0.00001, 0.00003, 0.0001, 0.0003, 0.001, 0.003, 0.01, 0.03, 0.1, 0.3, 1, 3, 10, 30, 100]
    best_error = -100
    best_alpha = None
    best_estimator = None
    for alpha in alphas:
        print(f"alpha: {alpha}")
        ridge_results = evaluate_model(Ridge(alpha=alpha), X_train_normalized, y_train, return_estimator=True)
        if ridge_results["test_neg_mean_squared_error"].mean() > best_error:
            best_error = ridge_results["test_neg_mean_squared_error"].mean()
            best_alpha = alpha
            best_estimator = ridge_results["estimator"][0]

    print("best_alpha:", best_alpha)
    print("Best MSE:", -best_error.round(2))
    display(pd.DataFrame(zip(X.columns, best_estimator.coef_.round(3))).sort_values(by=1))


In [None]:
if run_regression_models:
    print(f"Model: Lasso")
    alphas = [0.001, 0.003, 0.01, 0.03, 0.1, 0.3, 1, 3, 10]
    best_error = -100
    best_alpha = None
    best_estimator = None
    for alpha in alphas:
        print(f"alpha: {alpha}")
        lasso_results = evaluate_model(Lasso(alpha=alpha, max_iter=100000), X_train_normalized, y_train, return_estimator=True)
        if lasso_results["test_neg_mean_squared_error"].mean() > best_error:
            best_error = lasso_results["test_neg_mean_squared_error"].mean()
            best_alpha = alpha
            best_estimator = lasso_results["estimator"][0]

    print("best_alpha:", best_alpha)
    print("Best MSE:", -best_error.round(2))
    display(pd.DataFrame(zip(X.columns, best_estimator.coef_.round(3))).sort_values(by=1))


### Tree-based models

In [54]:
# if run_tree_based_models:
#     print(f"Model: BART")
#     evaluate_model(BART(), X_train, y_train)
#     pass

Model: BART
R2: 0.99 ±0.0
MSE: 7.16 ±0.77



In [None]:
if run_tree_based_models:
    print(f"Model: RandomForestRegressor")
    random_forest = RandomForestRegressor()
    grid_search = GridSearchCV(random_forest, param_grid={'n_estimators': [30, 100, 300]}, cv=2)
    grid_search.fit(X_train, y_train)
    best_n_estimators = grid_search.best_params_['n_estimators']
    print("best_n_estimators:", best_n_estimators)
    evaluate_model(RandomForestRegressor(n_estimators=best_n_estimators), X_train, y_train)
    pass

Model: RandomForestRegressor
best_n_estimators: 300


In [None]:
if run_tree_based_models:
    print(f"Model: RegressionTree with Pruning")
    regressor = DecisionTreeRegressor()
    ccp_path = regressor.cost_complexity_pruning_path(X_train, y_train)
    ccp_path.alphas = ccp_path.ccp_alphas[::len(ccp_path.ccp_alphas)//3]
    grid_search = GridSearchCV(regressor, param_grid={'ccp_alpha': ccp_path.ccp_alphas}, cv=3)
    grid_search.fit(X_train, y_train)
    best_ccp_alpha = grid_search.best_params_['ccp_alpha']
    print("best_ccp_alpha:", best_ccp_alpha)
    evaluate_model(DecisionTreeRegressor(ccp_alpha=best_ccp_alpha), X_train, y_train)

    pass

In [None]:
# print(f"Model: MLPRegressor")
# evaluate_model(MLPRegressor(max_iter=2000, hidden_layer_sizes=(100,100,100)), X_train, y_train)

### Classification models

In [None]:
if run_classification_models:
    print(f"Model: PredictZero")
    evaluate_model(PredictZero(), X_train, y_class_train, other_scoring=["accuracy"])

    print(f"Model: PredictOne")
    evaluate_model(PredictOne(), X_train, y_class_train, other_scoring=["accuracy"])

    print(f"Model: LogisticRegression")
    evaluate_model(LogisticRegression(), X_train, y_class_train, other_scoring=["accuracy"])

    print(f"Model: LinearDiscriminantAnalysis")
    evaluate_model(LogisticRegression(), X_train, y_class_train, other_scoring=["accuracy"])

    print(f"Model: QuadraticDiscriminantAnalysis")
    evaluate_model(LogisticRegression(), X_train, y_class_train, other_scoring=["accuracy"])


pass

In [None]:
df2 = pd.read_csv('financial_regression_cleaned.csv')

# Only care about "asset"
original_cols = [f'{asset} open', f'{asset} high', f'{asset} low', f'{asset} close', f'{asset} volume']
df2 = df2[original_cols].reset_index(drop=True)
df2[f'{asset} high-low'] = df2[f'{asset} high'] - df2[f'{asset} low']
df2[f'{asset} close-open'] = df2[f'{asset} close'] - df2[f'{asset} open']

display(df2.head())
print(df2.shape)

In [None]:
df3 = df[original_cols+other_core_cols]

In [None]:
display(df2[20:25])

In [None]:
display(df3.head())
print(df3.shape)

### Use best model

In [None]:
model_1 = LinearRegression()
# model_1 = PredictPrevious()
model_1 = Lasso(alpha=0.001, max_iter=100000)

In [None]:
model_1.fit(X_train, y_train)
predictions = model_1.predict(X_test)
actual = y_test
print("R2:", round(r2_score(actual, predictions), 3))
print("MSE:", round(mean_squared_error(actual, predictions), 3))
