In [None]:
# Ignore warnings
import warnings
warnings.filterwarnings('ignore')
# Base Libraries
import pandas as pd
import numpy as np
from functools import partial
import matplotlib.pyplot as plt
# Classifier
from sklearn.linear_model import LogisticRegression
# Preprocessing
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import (
                                    train_test_split,
                                    TimeSeriesSplit
                                    )
# Metrics
from sklearn.metrics import (
                            accuracy_score,
                            classification_report,
                            RocCurveDisplay,
                            ConfusionMatrixDisplay,
                            log_loss
                            )

In [None]:
df = pd.read_csv('../data/niftyindex.csv', index_col=0, parse_dates=True, dayfirst=True)
df.head(2)

In [None]:
# Get Info
df.info()

In [None]:
# Visualize data
plt.plot(df['Close']);

In [None]:
# Descriptive statistics
df.describe().T

In [None]:
# Check for missing values
df.isnull().sum()

In [None]:
# Create Features
df['HC'] = df['High'] - df['Close']
df['RET'] = np.log(df['Close'] / df['Close'].shift(1))
df['MA7'] = df['Close'] / df['Close'].rolling(7).mean()
df['VMA'] = df['Volume'] / df['Volume'].rolling(7).mean()
df['OC_'] = df['Close'] / df['Open'] - 1
df['OC'] = df['OC_'].rolling(7).mean()
df['OC'] = df['OC_'].rolling(14).mean()
df['HC_'] = df['High'] / df['Low'] - 1
df['HC'] = df['HC_'].rolling(7).mean()
df['GAP_'] = df['Open'] / df['Close'].shift(1) - 1
df['GAP'] = df['GAP_'].rolling(7).mean()
df['STD'] = df['RET'].rolling(7).std()
df['UB'] = df['Close'].rolling(7).mean() + df['Close'].rolling(7).std() * 2
df.dropna(inplace=True)

features = df.drop(['Open', 'High', 'Low', 'Close', 'Volume', 'OC_', 'HC_', 'GAP_'], axis=1)
features.head(2)

In [None]:
# Specific X
X = features.values

In [None]:
# Specify y
y = np.where(df['Close'].shift(-1)>0.995*df['Close'],1,0)

In [None]:
# Check Class Imbalance
pd.Series(y).value_counts()

In [None]:
# Split the Data into Training and Testing
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

In [None]:
# Define a Baseline Model
classifier = Pipeline([
        ("scaler", StandardScaler()),
        ("classifier", LogisticRegression(class_weight='balanced'))
    ])
classifier.fit(X_train, y_train)

In [None]:
# Verify Class Labels
classifier.classes_

In [None]:
# Predict the Class Labels
y_pred = classifier.predict(X_test)
y_pred[-20:]

In [None]:
# Predict Probabilities
y_proba = classifier.predict_proba(X_test)
y_proba[-20:]

In [None]:
# Get the Scores
acc_train = accuracy_score(y_train, classifier.fit(X_train, y_train).predict(X_train))
acc_test = accuracy_score(y_test, classifier.predict(X_test))
print(f'Baseline Model -- Train Accuracy: {acc_train:0.4}, Test Accuracy: {acc_test:0.4}')

In [None]:
# Display confussion matrix
disp = ConfusionMatrixDisplay.from_estimator(
        classifier,
        X_test,
        y_test,
        # display_labels=model.classes_,
        cmap=plt.cm.Blues
        )
plt.title('Confusion matrix')
plt.show()

In [None]:
# Display ROCCurve
disp = RocCurveDisplay.from_estimator(
            classifier,
            X_test,
            y_test,
            name='Baseline Model')
plt.title("AUC-ROC Curve \n")
plt.plot([0,1],[0,1],linestyle="--", label='Random 50:50')
plt.legend()
plt.show()

In [None]:
# Classification Report
print(classification_report(y_test, classifier.predict(X_test)))

In [None]:
# Get Params list
classifier.get_params()

In [None]:
# Use Optuna for Tuning
import optuna

In [None]:
# Define Objective Function
def optimize(trial, x, y):
    # specify params range
    tolerance = trial.suggest_float("tol", 0.001, 0.01, log=True)
    regularization = trial.suggest_float('C', 0.001, 1, log=True)
    
    model = Pipeline([
        ("scaler", StandardScaler()),
        ("model", LogisticRegression(
            C=regularization,
            tol=tolerance,
            class_weight='balanced'))
    ])
    
    tscv = TimeSeriesSplit(n_splits=2, gap=1)
    ll = []
    
    for idx in tscv.split(x):
        train_idx, test_idx = idx[0], idx[1]
        xtrain = x[train_idx]
        ytrain = y[train_idx]
        xtest = x[test_idx]
        ytest = y[test_idx]
        model.fit(xtrain, ytrain)
        preds = model.predict(xtest)
        ll.append(log_loss(ytest, preds))
    return -1.0 * np.mean(ll)

In [None]:
# Create a Study
study = optuna.create_study(
    study_name='hp_lr',
    direction='minimize'
)

In [None]:
# Specify Optimization function
optimization_function = partial(optimize, x=X, y=y)
study.optimize(optimization_function, n_trials=20)

In [None]:
# Get the Best Params
print(f'Best Params: {study.best_params}, Best Value: {study.best_value}')

In [None]:
# plot Optimization History
optuna.visualization.plot_optimization_history(study)

In [None]:
# Plot Param Importances
optuna.visualization.plot_param_importances(study)

In [None]:
# plot accuracies for each HP trail
optuna.visualization.plot_slice(study)

In [None]:
# plot the surface
optuna.visualization.plot_contour(study, params=['tol', 'C'])

In [None]:
# plot parallel coordinates
optuna.visualization.plot_parallel_coordinate(study)

In [None]:
# Scale and fit the model
clf = Pipeline([
    ("scaler", StandardScaler()),
    ("estimator", LogisticRegression(
        tol=study.best_params['tol'],
        C=study.best_params['C'],
        class_weight='balanced',
        ))
])

clf.fit(X_train, y_train)

In [None]:
# Predict Class Labels
y_pred = clf.predict(X_test)

# Predict Probabilities for upside
# y_proba = model.best_estimator_.predict_proba(X_test)[:,1]

# Measure Accuracy
acc_train = accuracy_score(y_train, clf.predict(X_train))
acc_test = accuracy_score(y_test, y_pred)

# Print Accuracy
print(f'\n Training Accuracy \t: {acc_train :0.4} \n Test Accuracy \t\t: {acc_test :0.4}')

In [None]:
# Display confussion matrix
disp = ConfusionMatrixDisplay.from_estimator(
        clf,
        X_test,
        y_test,
        # display_labels=model.classes_,
        cmap=plt.cm.Blues
    )
plt.title('Confusion matrix')
plt.show()

In [None]:
# Display ROCCurve
disp = RocCurveDisplay.from_estimator(
            clf,
            X_test,
            y_test,
            name='Tuned Model')
plt.title("AUC-ROC Curve \n")
plt.plot([0,1],[0,1],linestyle="--", label='Random 50:50')
plt.legend()
plt.show()

In [None]:
# Classification Report
print(classification_report(y_test, y_pred))

In [None]:
df2 = pd.read_csv('../data/niftyindex.csv', index_col=0, parse_dates=True, dayfirst=True)
df2 = df2.iloc[13:,:]

# Get Prediction
df2['Signal'] = clf.predict(X)

# Define Entry Logic
df2['Entry'] = np.where(df2['Signal']==1, df2['Close'], 0)

# Defining Exit Logic
df2['Exit'] = np.where((df2['Entry'] != 0) & (df2['Open'].shift(-1) <= df2['Close']),
                         df['Open'].shift(-1), 0)
df2['Exit'] = np.where((df2['Entry'] != 0) & (df2['Open'].shift(-1) > df2['Close']),
                         df2['Close'].shift(-1), df2['Exit'])

# Calculate MTM
df2['P&L'] = df2['Exit'] - df2['Entry']

# Generate Equity Curve
df2['Equity'] = df2['P&L'].cumsum() + df2['Close'][0]

# Calculate Benchmark Return
df2['Returns'] = np.log(df2['Close']).diff().fillna(0)

# Calculate Strategy Return
df2['Strategy'] = (df2['Equity']/df2['Equity'].shift(1) - 1).fillna(0)
df2 = df2.iloc[:-1]

In [None]:
# Generate HTML Strategy Report
# Refer HTML file for report
import quantstats as qs

qs.reports.html(df2['Strategy'], df2['Returns'])

In [None]:
# Can also use pyfolio for analysis
import pyfolio as pf

df3 = df2.copy()
df3.index = df3.index.tz_localize('utc')
pf.create_returns_tear_sheet(df3['Strategy'], live_start_date='2020-04-07', benchmark_rets=df3['Returns'])