#NIFTY50 2dCNNpred

In [40]:
import random
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D, Input
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, mean_absolute_error

In [41]:
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall
 
def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision
 
def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))
 
def f1macro(y_true, y_pred):
    f_pos = f1_m(y_true, y_pred)
    # negative version of the data and prediction
    f_neg = f1_m(1-y_true, 1-K.clip(y_pred,0,1))
    return (f_pos + f_neg)/2

In [42]:
def cnnpred_2d(seq_len=60, n_features=12, n_filters=(8,8,8), droprate=0.1):
    #2D-CNNpred model according to the paper
    model = Sequential([
        Input(shape=(seq_len, n_features, 1)),
        Conv2D(n_filters[0], kernel_size=(1, n_features), activation="relu"),
        Conv2D(n_filters[1], kernel_size=(3,1), activation="relu"),
        MaxPool2D(pool_size=(2,1)),
        Conv2D(n_filters[2], kernel_size=(3,1), activation="relu"),
        MaxPool2D(pool_size=(2,1)),
        Flatten(),
        Dropout(droprate),
        Dense(1, activation="sigmoid")
    ])
    return model
 

In [43]:
def datagen(data, seq_len, batch_size, targetcol, kind):
    #As a generator to produce samples for Keras model
    batch = []
    while True:
        df = data
        input_cols = [c for c in df.columns if c != targetcol]
        index = df.index[df.index < TRAIN_TEST_CUTOFF]
        split = int(len(index) * TRAIN_VALID_RATIO)
        if kind == 'train':
            index = index[:split]   # range for the training set
        elif kind == 'valid':
            index = index[split:]   # range for the validation set
        
        # Pick one position, then clip a sequence length
        while True:
            t = random.choice(index)      # pick one time step
            n = (df.index == t).argmax()  # find its position in the dataframe
            if n-seq_len+1 < 0:
                continue # can't get enough data for one sequence length
            frame = df.iloc[n-seq_len+1:n+1]
            batch.append([frame[input_cols].values, df.loc[t, targetcol]])
            break
        # if we get enough for a batch, dispatch
        if len(batch) == batch_size:
            X, y = zip(*batch)
            X, y = np.expand_dims(np.array(X), 3), np.array(y)
            yield X, y
            batch = []

In [44]:
def testgen(data, seq_len, targetcol):
    #Return array of all test samples
    batch = []
    df=data
    input_cols = [c for c in df.columns if c != targetcol]
    # find the start of test sample
    t = df.index[df.index >= TRAIN_TEST_CUTOFF][0]
    n = (df.index == t).argmax()
    # extract sample using a sliding window
    for i in range(n+1, len(df)+1):
        frame = df.iloc[i-seq_len:i]
        batch.append([frame[input_cols].values, frame[targetcol][-1]])
    X, y = zip(*batch)
    return np.expand_dims(np.array(X),3), np.array(y)

In [45]:
url='https://raw.githubusercontent.com/mandalnilabja/soc2022/main/data/NSEI.csv'

X = pd.read_csv(url, index_col="Date", parse_dates=True)
X=X[X.Volume>0]
X.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 2316 entries, 2013-01-21 to 2022-07-27
Data columns (total 6 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Open       2316 non-null   float64
 1   High       2316 non-null   float64
 2   Low        2316 non-null   float64
 3   Close      2316 non-null   float64
 4   Adj Close  2316 non-null   float64
 5   Volume     2316 non-null   float64
dtypes: float64(6)
memory usage: 126.7 KB


In [46]:
TRAIN_TEST_CUTOFF = '2020-2-11'
TRAIN_VALID_RATIO = 0.75
    
# get the names of columns
cols = X.columns
    
# Save the target variable as a column in dataframe and drop NaN values
X["Target"] = (X["Close"].pct_change().shift(-1) > 0).astype(int)
X.dropna(inplace=True)
   
# Fit the standard scaler using the training dataset not whole data set
index = X.index[X.index > TRAIN_TEST_CUTOFF]
index = index[:int(len(index) * TRAIN_VALID_RATIO)]
scaler = StandardScaler().fit(X.loc[index, cols])
    
# Save scale transformed dataframe
X[cols] = scaler.transform(X[cols])
data = X

In [48]:
data['RoM3']=data.Close.rolling(3).mean() #Rolling Mean Closing Values of 3 days
data['RoM5']=data.Close.rolling(5).mean() #Rolling Mean Closing Values of 5 days
data['RoM15']=data.Close.rolling(15).mean() #Rolling Mean Closing Values of 15 days
data['RoM30']=data.Close.rolling(30).mean() #Rolling Mean Closing Values of 30 days
data['Vol_Diff'] = data["Volume"].pct_change().shift(-1) #Daily Volume Difference
data['Close_Diff'] = data["Close"].pct_change().shift(-7) #Weekly Closing Value Difference

#Dropping NaN rows
data.dropna(inplace=True)

data.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 2244 entries, 2013-04-15 to 2022-07-07
Data columns (total 13 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Open        2244 non-null   float64
 1   High        2244 non-null   float64
 2   Low         2244 non-null   float64
 3   Close       2244 non-null   float64
 4   Adj Close   2244 non-null   float64
 5   Volume      2244 non-null   float64
 6   Target      2244 non-null   int64  
 7   RoM3        2244 non-null   float64
 8   RoM5        2244 non-null   float64
 9   RoM15       2244 non-null   float64
 10  RoM30       2244 non-null   float64
 11  Vol_Diff    2244 non-null   float64
 12  Close_Diff  2244 non-null   float64
dtypes: float64(12), int64(1)
memory usage: 245.4 KB


In [None]:
#CNNpred paper parameters
seq_len = 60 # for 60 past days data
n_features = 12 # raw features used


index=0

accuracy=[]
MAE=[]
F1=[]
parameters=[]


param_list = [
 ['sgd', 'mae', 0.05], ['adam', 'mae', 0.1], ['adagrad', 'mae',  0.05], ['adamax', 'binary_focal_crossentropy', 0.1], 
 ['sgd', 'binary_focal_crossentropy', 0.1], ['adam', 'binary_focal_crossentropy', 0.15], ['adagrad', 'mae', 0.1], ['adamax', 'binary_focal_crossentropy', 0.15], 
 ['sgd', 'binary_crossentropy', 0.15], ['adam', 'binary_crossentropy', 0.2], ['adagrad', 'hinge', 0.2], ['adamax', 'binary_crossentropy', 0.15],
 ['sgd', 'hinge', 0.2], ['adam', 'hinge',  0.05], ['adagrad', 'hinge',  0.05], ['adamax', 'binary_crossentropy', 0.2]
 ]






for batch_size in [64, 32]:
  for n_epochs in [20, 30]:
    for optim, lossfxn, dr in param_list:
      # Produce CNNpred as a binary classification problem

      model = cnnpred_2d(seq_len, n_features, droprate=dr)
      model.compile(optimizer=optim, loss=lossfxn, metrics=["acc", f1macro])
      model.summary()  # print model structure to console
      
      # Set up callbacks and fit the model
      
      # We use custom validation score f1macro() and hence monitor for "val_f1macro"
      checkpoint_path = "./cp2d-{epoch}-{val_f1macro:.2f}.h5"
      callbacks = [
          ModelCheckpoint(checkpoint_path,
                          monitor='val_f1macro', mode="max",
                          verbose=0, save_best_only=True, save_weights_only=False, save_freq="epoch")
      ]
      #fitting the model
      model.fit(datagen(data, seq_len, batch_size, "Target", "train"),
                validation_data=datagen(data, seq_len, batch_size, "Target", "valid"),
                epochs=n_epochs, steps_per_epoch=400, validation_steps=10, verbose=1, callbacks=callbacks)
      

      # Prepare test data
      test_data, test_target = testgen(data, seq_len, "Target")
      
      # Test the model
      test_out = model.predict(test_data)
      test_pred = (test_out > 0.5).astype(int)
      
      #index of model
      index = index + 1 

      #measuring performance
      accuracy.append(accuracy_score(test_pred, test_target))
      MAE.append(mean_absolute_error(test_pred, test_target))
      F1.append(f1_score(test_pred, test_target))
      #saving parameters of each model
      parameters.append('{}--{}--{}--{}--{}--{}'.format(index,n_epochs,optim,batch_size,dr,lossfxn))
      #saving the model
      model.save('2dCNNpredNIFTY50m{}.h5'.format(index))

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_9 (Conv2D)           (None, 60, 1, 8)          104       
                                                                 
 conv2d_10 (Conv2D)          (None, 58, 1, 8)          200       
                                                                 
 max_pooling2d_6 (MaxPooling  (None, 29, 1, 8)         0         
 2D)                                                             
                                                                 
 conv2d_11 (Conv2D)          (None, 27, 1, 8)          200       
                                                                 
 max_pooling2d_7 (MaxPooling  (None, 13, 1, 8)         0         
 2D)                                                             
                                                                 
 flatten_3 (Flatten)         (None, 104)              

In [None]:
# del model
model = tf.keras.models.load_model('2dCNNpredNIFTY50m.h5')

In [None]:
print(parameters)
print(accuracy)
print(MAE)
print(F1)

#ARIMA

#SARIMA

#FBProphet