In [1]:
from keras.models import Sequential
from keras.layers import Convolution1D, MaxPooling1D,AveragePooling1D, Dense, Conv1D
from keras.optimizers import RMSprop, Adam, SGD, Nadam
from keras.layers.normalization import BatchNormalization
from keras.layers.core import Dense, Dropout, Activation, Flatten
import util.DataUtil as data_util
from dataLayer.DataLayer import DataLayer
import util.Constants as Constants
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split


class oneD_CNN():

    def __init__(self):
        self.model = self.create_model()
        self.data_layer = DataLayer()
        self.data = self.data_layer.get_data_with_indicators()
        self.class_target = self.data_layer.get_classification_target()
        self.regression_target = self.data_layer.get_regression_target()

    def prepare_data(self):
        data = self.data
        target = self.regression_target
        print (data.shape , target.shape)
        #x_train, y_train, x_test, y_test = data_util.split_train_testy_test(data,target,split_train_ratio =  Constants.SPLIT_TRAIN_RATIO)
        x_train, y_train, x_test, y_test = train_test_split(data, target, test_size = 0.2)
        print ("number of training samples", str(y_train.shape))
        print ("validation samples", str(y_test.shape))
        x_train = np.asarray(x_train)
        x_test = np.asarray(x_test)
        x_train = np.reshape(x_train,(x_train.shape[0],x_train.shape[1],1))
        x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1))
        return x_train, y_train, x_test, y_test

    def create_model(self):
        model = Sequential()
        model.add(Conv1D(filters=128, kernel_size=3, activation='relu', input_shape=(35, 1)))
        model.add(BatchNormalization())
        model.add(AveragePooling1D(pool_size=2))
        model.add(Dropout(0.2))

        model.add(Conv1D(filters=128, kernel_size=3, activation='relu'))
        model.add(BatchNormalization())
        model.add(AveragePooling1D(pool_size=2))
        model.add(Dropout(0.2))

        model.add(Flatten())
        model.add(Dense(1, activation='linear'))
        return model

    def compile_model(self,model):
        model.compile(loss="mae", optimizer="adam", metrics=["accuracy"])

    def train_model(self, x_train, y_train, x_test, y_test,model):
        print('x_train', x_train.shape, 'y_train',y_train.shape,  'x_test', x_test.shape, 'y_test', y_test.shape)
        print('train model', x_train.shape, y_train.shape)
        batch_size = 32
        epochs = 100

        model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1,
                       validation_data=(x_test, y_test))
 

    def accuracy_metrics(self,model,y_test,x_test):
        
        pred_df = pd.DataFrame(columns=['actual', 'predicted'])
        
        pred_df['actual'] = y_test
        pred_df['predicted'] = model.predict(x_test)
        pred_df['true_positive'] = np.zeros(len(pred_df))
        pred_df['false_positive'] = np.zeros(len(pred_df))
        pred_df['true_negative'] = np.zeros(len(pred_df))
        pred_df['false_negative'] = np.zeros(len(pred_df))
        for i in range(len(pred_df)):
            if (pred_df.iloc[i, 0] < 0) and (pred_df.iloc[i, 1] < 0):
                pred_df.iloc[i, 4] = 1
            elif (pred_df.iloc[i, 0] > 0) and (pred_df.iloc[i, 1] > 0):
                pred_df.iloc[i, 2] = 1
            elif (pred_df.iloc[i, 0] < 0) and (pred_df.iloc[i, 1] > 0):
                pred_df.iloc[i, 3] = 1
            elif (pred_df.iloc[i, 0] > 0) and (pred_df.iloc[i, 1] < 0):
                pred_df.iloc[i, 5] = 1
        print('The number of true positives is {}'.format(sum(pred_df.loc[:, 'true_positive'])))
        print('The number of false positives is {}'.format(sum(pred_df.loc[:, 'false_positive'])))
        print('The number of true negatives is {}'.format(sum(pred_df.loc[:, 'true_negative'])))
        print('The number of false negatives is {}'.format(sum(pred_df.loc[:, 'false_negative'])))

    def run_model(self):
        x_train, y_train, x_test, y_test = self.prepare_data()
        model = self.create_model()
        self.compile_model(model)
        print(x_train.shape, y_train.shape, x_test.shape, y_test.shape)
        self.train_model(x_train, y_train, x_test, y_test,model)
        self.accuracy_metrics(model,y_test,x_test)

Using TensorFlow backend.


In [2]:
model=oneD_CNN()

----- Done SMA: Output Shape (2710, 2) -----
----- Done EMA: Output Shape (2710, 1) -----
----- Done Momentum: Output Shape (2710, 1) -----
----- Done Stochastic RSI: Output Shape (2710, 2) -----
----- Done ROC: Output Shape (2710, 1) -----
----- Done BB: Output Shape (2710, 3) -----
----- Done MACD: Output Shape (2710, 3) -----
----- Done AD Oscillator: Output Shape (2710, 1) -----
----- Done CCI: Output Shape (2710, 1) -----
----- Done FFT: Output Shape (2710, 4) -----
----- Done ATR: Output Shape (2710, 1) -----
Shape of the dataframe with indicators: (2710, 20)
Shape of the dataframe after adding deviations: (2710, 27)
Shape after adding all indicators: (2458, 32)
Shape after adding LR: (2457, 35)
Shape of the normalised datset: (2457, 35)
Shape of Regression Target: (2456,)
Shape of Classification Target: (2456,)


In [3]:
model.run_model()

(2456, 35) (2456,)
number of training samples (492, 35)
validation samples (492,)


IndexError: tuple index out of range