In [None]:
# Data Manipulation
import numpy as np
import pandas as pd

# Visualizaiton
import matplotlib.pyplot as plt
import seaborn as sns
plt.style.use('seaborn')
sns.set(rc={'figure.figsize': (20, 8)})

# Data Preprocessing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (StandardScaler,
                                   MinMaxScaler)
from sklearn.pipeline import Pipeline

# Regressor
from sklearn.linear_model import (LinearRegression,
                                  Lasso,
                                  Ridge,
                                  ElasticNet)

# Metrics
from sklearn.metrics import mean_squared_error

# set display options
pd.options.display.float_format = "{:,.4f}".format
pd.set_option('display.max_columns', 100)

# Ignore warnings
import warnings
warnings.filterwarnings('ignore')

In [None]:
# load nifty index data
df = pd.read_csv('https://raw.githubusercontent.com/kannansingaravelu/datasets/main/niftyindex.csv', index_col=0, dayfirst=True)
df

In [None]:
# descriptive statistics
df.describe()

In [None]:
# check for missing values
df.isnull().sum()

In [None]:
# create features
def create_features(frame):
    df = frame.copy()
    multiplier = 2

    # features
    df['OC'] = df['Close'] / df['Open'] - 1
    df['HC'] = df['High'] / df['Low'] - 1
    df['GAP'] = df['Open'] / df['Close'].shift(1) - 1
    df['RET'] = np.log(df['Close'] / df['Close'].shift(1))

    for i in [7, 14, 28]:
        df['PCHG' + str(i)] = df['Close'].pct_change(i)
        df['VCHG' + str(i)] = df['Volume'].pct_change(i)
        df['RET' + str(i)] = df['RET'].rolling(i).sum()
        df['MA' + str(i)] = df['Close'] / df['Close'].rolling(i).mean()
        df['VMA' + str(i)] = df['Volume'] / df['Volume'].rolling(i).mean()
        df['OC'+ str(i)] = df['OC'].rolling(i).mean()
        df['HC'+ str(i)] = df['HC'].rolling(i).mean()
        df['GAP'+ str(i)] = df['GAP'].rolling(i).mean()
        df['STD'+ str(i)] = df['RET'].rolling(i).std()
        df['UB'+str(i)] = df['Close'].rolling(i).mean() + df['Close'].rolling(i).std() * multiplier
        df['LB'+str(i)] = df['Close'].rolling(i).mean() - df['Close'].rolling(i).std() * multiplier

    # drop NaN values
    df['Label'] = df['Close'].shift(-1)
    df.drop(['Open', 'High', 'Low', 'Close', 'Volume'], axis=1, inplace=True)
    df.dropna(inplace=True)
    
    return df

In [None]:
# features
df1 = create_features(df)
display(df1.shape)

# verify the output
df1.head(2)

In [None]:
# label
y = np.array(df1['Label'])
y

In [None]:
# drop label from dataframe
df1.drop('Label', axis=1, inplace=True)

# remove features that are highly correlated
sns.heatmap(df1.corr()>0.9,
            annot=True,
            annot_kws={"size": 8},
            fmt=".2f",
            linewidth=.5,
            cmap="coolwarm",
            cbar=True); #cmap="crest", virids, magma

plt.title('Features Set Correlations');

In [None]:
# remove the first feature that is correlated with any other feature
def correlated_features(data, threshold=0.9):
    col_corr = set()
    corr_matrix = df1.corr()
    for i in range(len(corr_matrix.columns)):
        for j in range(i):
            if abs(corr_matrix.iloc[i, j]) > threshold:
                colname = corr_matrix.columns[i]
                col_corr.add(colname)
                
    return col_corr

In [None]:
# total correlated features
drop_correlated_features = correlated_features(df1)

# drop the highly correlated features
X = df1.drop(drop_correlated_features, axis=1)

# record feature names
feature_names = X.columns

In [None]:
# display the new features set
X

In [None]:
# decide which scaling to use
X.describe()

In [None]:
# study the distribution
# fig, ax = plt.subplots(figsize=(14,8))
sns.boxplot(x='variable', y='value', data=pd.melt(X))
plt.xlabel(' ')
plt.title('Boxplot of Features');

In [None]:
class Regression:
    def __init__(self, X, y, testsize=0.20):
        self.X = X
        self.y = y
        self.testsize = testsize
        
        # split training and testing dataset
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(self.X, self.y, test_size=self.testsize, random_state=0, shuffle=False)
        
    # fit and predict
    def fit_predict(self, estimator, transformer, alpha=None, l1_ratio=None):
        try:
            # subsume estimators and transformer into a pipeline
            model = Pipeline([
            ('scaler', transformer),
            ('regressor', estimator)
            ])
            
            # fit/train model
            model.fit(self.X_train, self.y_train)
            
            # predict lables
            y_pred = model.predict(self.X_test)
            
        except Exception as e:
            print(str(e))
            
        return model, model['regressor'].intercept_, model['regressor'].coef_,␣ ,→y_pred
    
    # evaluate metrics
    def eval_metrics(self, model, y_pred):
        # evaluate metrics
        mse = mean_squared_error(self.y_test, y_pred, squared=True)
        rmse = mean_squared_error(self.y_test, y_pred, squared=False)
        r2train = model.score(self.X_train, self.y_train)
        r2test = model.score(self.X_test, self.y_test)
        
        return mse, rmse, r2train, r2test

    # plot coefficients as a function of lambda
    def plot_coeff(self, modelname):
        coef = []
        model = Regression(self.X, self.y, 0.20)
        alpha_range = np.logspace(2, -2, 200)
        
        if modelname == 'Lasso':
            for i in alpha_range:
                coef.append(model.fit_predict(Lasso(alpha=i, random_state=0), MinMaxScaler())[2])
        elif modelname == 'Ridge':
            alpha_range = np.logspace(6, -2, 200)
            for i in alpha_range:
                coef.append(model.fit_predict(Ridge(alpha=i, random_state=0), MinMaxScaler())[2])
        elif modelname == 'ElasticNet':
            for i in alpha_range:
                coef.append(model.fit_predict(ElasticNet(alpha=i,␣ ,→random_state=0), MinMaxScaler())[2])

        # Plot Coefficients
        fig = plt.figure(figsize=(20,8))
        ax = plt.axes()

        ax.plot(alpha_range, coef)
        ax.set_xscale('log')
        ax.legend(feature_names, loc=0)
        # ax.set_xlim(ax.get_xlim()[::-1]) # reverse axis
        ax.set_title(f'{modelname} coefficients as a function of the regularization')

        ax.set_xlabel('$\lambda$')
        ax.set_ylabel('$\mathbf{w}$')
        return plt.show()

In [None]:
# instantiate
lr = Regression(X, y)

# fit Linear Regression
lr_model , lr_intercept, lr_coef, lr_y_pred = lr.fit_predict(LinearRegression(), MinMaxScaler())
print(f"\n Model: {lr_model} \n Intercept: {lr_intercept} \n Coefficients: \n {lr_coef}")

In [None]:
# instantiate
lasso = Regression(X,y)

# fit Lasso
lasso_model , lasso_intercept, lasso_coef, lasso_y_pred = lasso.fit_predict(Lasso(alpha=0.3, random_state=0), MinMaxScaler())
print(f"\n Model: {lasso_model} \n Intercept: {lasso_intercept} \n Coefficients:\n {lasso_coef}")

In [None]:
# plot coefficients
lasso.plot_coeff('Lasso')

In [None]:
# instantiate
ridge = Regression(X,y)

# fit Ridge
ridge_model , ridge_intercept, ridge_coef, ridge_y_pred = ridge.fit_predict(Ridge(alpha=1, random_state=0), MinMaxScaler())
print(f"\n Model: {ridge_model} \n Intercept: {ridge_intercept} \n Coefficients:␣ ,→\n {ridge_coef}")

In [None]:
# plot coefficients
ridge.plot_coeff('Ridge')

In [None]:
# instantiate
elasticnet = Regression(X,y)

# fit ElasticNet
elasticnet_model , elasticnet_intercept, elasticnet_coef, elasticnet_y_pred=elasticnet.fit_predict(ElasticNet(alpha=0.1, l1_ratio=1e-10, random_state=0), MinMaxScaler())
print(f"\n Model: {elasticnet_model} \n Intercept: {elasticnet_intercept} \n Coefficients: \n {elasticnet_coef}")

In [None]:
# plot coefficients
elasticnet.plot_coeff('ElasticNet')

In [None]:
# compare model coefficients
coef_df = pd.DataFrame({
    'LR': lr_coef,
    'Lasso': lasso_coef,
    'Ridge': ridge_coef,
    'ElasticNet': elasticnet_coef
}, index = feature_names)

coef_df

In [None]:
# compare evaluation metrics
eval_df = pd.DataFrame({
    'LR': lr.eval_metrics(lr_model, lr_y_pred),
    'Lasso': lasso.eval_metrics(lasso_model, lasso_y_pred),
    'Ridge': ridge.eval_metrics(ridge_model, ridge_y_pred),
    'ElasticNet': elasticnet.eval_metrics(elasticnet_model, elasticnet_y_pred)
}, index = ['MSE', 'RMSE', 'R2_train', 'R2_test'])

eval_df