In [7]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

import keras
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Dropout
from keras.models import load_model

#from err import mape
from abc import ABCMeta, abstractmethod

In [8]:
import numpy as np
import random
from keras import backend as K
from keras.engine.topology import Layer
from keras.initializers import RandomUniform, Initializer, Orthogonal, Constant

class InitCentersRandom(Initializer):
    """ Initializer for initialization of centers of RBF network
        as random samples from the given data set.
    # Arguments
        X: matrix, dataset to choose the centers from (random rows
          are taken as centers)
    """
    def __init__(self, X):
        self.X = X

    def __call__(self, shape, dtype=None):
        assert shape[1] == self.X.shape[1]
        idx = np.random.randint(self.X.shape[0], size=shape[0])
        return self.X[idx,:]


class RBFLayer(Layer):
    """ Layer of Gaussian RBF units.
    # Example
    ```python
        model = Sequential()
        model.add(RBFLayer(10,
                           initializer=InitCentersRandom(X),
                           betas=1.0,
                           input_shape=(1,)))
        model.add(Dense(1))
    ```
    # Arguments
        output_dim: number of hidden units (i.e. number of outputs of the layer)
        initializer: instance of initiliazer to initialize centers
        betas: float, initial value for betas
    """
    def __init__(self, output_dim, initializer=None, betas=1.0, **kwargs):
        self.output_dim = output_dim
        self.init_betas = betas
        if not initializer:
            self.initializer = RandomUniform(0.0, 1.0)
            #self.initializer = Orthogonal()
        else:
            self.initializer = initializer
        super(RBFLayer, self).__init__()

    def build(self, input_shape):
        self.centers = self.add_weight(name='centers',shape=(self.output_dim, input_shape[1]),initializer=self.initializer,trainable=True)
        self.betas = self.add_weight(name='betas',shape=(self.output_dim,),initializer=Constant(value=self.init_betas),trainable=True)
        super(RBFLayer, self).build(input_shape)

    def call(self, x):
        C = K.expand_dims(self.centers)
        H = K.transpose(C-K.transpose(x))
        return K.exp( -self.betas * K.sum(H**2, axis=1))

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.output_dim)


    def get_config(self):
        config = {
            'output_dim': self.output_dim
        }
        base_config = super(RBFLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))


In [9]:
def mape(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100



class Generalnn(object):
    
    """
    Attributes:
    csv_path: Path to the dataset file.
    feature: extracted feature data
    X, Y: splited Feature (input & output).
    X_scaled, X_scaled: scaled Feature.
    X_train, X_test, y_train, y_test: Scaled X, Y after spliting into test & training Dataset.
    scaler
    test_size
    epochs, regressor, batch_size, epochs
    """

    __metaclass__ = ABCMeta

    type = ""

    def __init__(self):
        pass


    def preprocess(self, csv_path, test_size=0.2):
        self.csv_path = csv_path
        self.test_size = test_size
        self.feature_extract()
        self.feature_scale()
        self.train_test_split()

        return self


    def train_test_split(self):
        self.X_train, self.X_test, self.y_train, self.y_test \
                                    = train_test_split(self.X_scaled, self.Y_scaled, test_size=self.test_size, random_state=0)


    def feature_scale(self):
        # Feature Scaling
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        feature_scaled = self.scaler.fit_transform(self.feature)

        self.X_scaled = feature_scaled[:, 0]
        self.Y_scaled = feature_scaled[:, 1]


    def load_dataset(self):
        ds = pd.read_csv(self.csv_path)
        return ds


    def feature_extract(self):
        ds = self.load_dataset()
        self.feature = ds.iloc[:, [2, 5]].values
        self.X = ds.iloc[:, 2].values
        self.y = ds.iloc[:, 5].values



    def save_model(self, name):
        path ='{}-{}.h5'.format(name, self.type)
        self.regressor.save(path)
        return path


    @abstractmethod
    def load_model(self, model_path=None):
        if self.regressor is None:
            self.regressor = load_model(model_path)
        else:
            return self.regressor

    @abstractmethod
    def train(self):
        pass

    @abstractmethod
    def test(self, model_path=None):
        pass

    @abstractmethod
    def predict(self, X, model_path=None):
        pass


In [28]:
class stocknn(Generalnn):
    
    def __init__(self):
        
        self.RNN=self.RNN()
        self.BKP=self.BKP()
        self.RBF=self.RBF()

    class BKP(Generalnn):
        type = 'BKP'


        def __init__(self):
            super().__init__()
            print("backpropagation")


        def train(self, batch_size=32, epochs=50):
            regressor = Sequential()

            regressor.add(Dense(units=500, kernel_initializer='uniform', activation='relu', input_dim=1))
            regressor.add(Dropout(.2))

            regressor.add(Dense(units=500, kernel_initializer='uniform', activation='relu'))
            regressor.add(Dropout(.2))

            regressor.add(Dense(units=500, kernel_initializer='uniform', activation='relu'))
            regressor.add(Dropout(.2))

            regressor.add(Dense(units=500, kernel_initializer='uniform', activation='relu'))
            regressor.add(Dropout(.2))

            regressor.add(Dense(units=1, kernel_initializer='uniform', activation='sigmoid'))

            regressor.compile(optimizer='adam', loss='mean_squared_error')
            regressor.fit(self.X_train, self.y_train, batch_size=batch_size, epochs=epochs)

            self.regressor = regressor

            return self


        def test(self, model_path=None):
            self.load_model(model_path)

            inputs = np.array(self.X_train)
            y_hat = self.regressor.predict(inputs)

            df = pd.DataFrame()
            df['X'] = inputs
            df['y_hat'] = y_hat

            predicted_price = self.scaler.inverse_transform(df)
            _mape = mape(predicted_price[:, 0], predicted_price[:, 1])
            _mse = mean_squared_error(predicted_price[:, 0], predicted_price[:, 1])

            return _mape, _mse


        def predict(self, X, model_path=None):
            self.load_model(model_path)

            x = np.array([X])
            yh = self.regressor.predict(x)

            x_yh = pd.DataFrame()
            x_yh['x'] = x
            x_yh['yh'] = yh

            x_yh = self.scaler.inverse_transform(x_yh)

            return x_yh[0]


    class RNN(Generalnn):
        type = 'RNN'


        def __init__(self):
            super().__init__()
            print("RNN")


        def train(self, batch_size=32, epochs=50):
            train_sz = self.X_train.shape[0]

            X_train = np.reshape(self.X_train, (train_sz, 1, 1))
            y_train = np.reshape(self.y_train, (train_sz, 1))

            regressor = Sequential()

            regressor.add(LSTM(units=500, return_sequences=True, input_shape=(X_train.shape[1], 1)))
            regressor.add(Dropout(.2))

            regressor.add(LSTM(units=500, return_sequences=True))
            regressor.add(Dropout(.2))

            regressor.add(LSTM(units=500, return_sequences=True))
            regressor.add(Dropout(.2))

            regressor.add(LSTM(units=500, return_sequences=False))
            regressor.add(Dropout(.2))

            regressor.add(Dense(units=1, activation='sigmoid'))

            regressor.compile(optimizer='adam', loss='mean_squared_error')

            regressor.fit(X_train, y_train, epochs=epochs, batch_size=32)

            self.regressor = regressor

            return self


        def test(self, model_path=None):
            self.load_model(model_path)

            inputs_ = np.array(self.X_test)
            inputs = np.reshape(inputs_, (self.X_test.shape[0], 1, 1))
            predicted_price = self.regressor.predict(inputs)

            df = pd.DataFrame()
            df['real'] = inputs_
            df['predicted'] = predicted_price

            predicted_price = self.scaler.inverse_transform(df)
            _mape = mape(predicted_price[:, 0], predicted_price[:, 1])
            _mse = mean_squared_error(predicted_price[:, 0], predicted_price[:, 1])

            return _mse, _mape


        def predict(self, X, model_path=None):
            self.load_model(model_path)

            x = np.array([X])
            x = np.reshape(x, (len(x), 1, 1))
            yh = self.regressor.predict(x)

            # rebuild the Structure
            x_yh = pd.DataFrame()
            x_yh['x'] = x[0][0]
            x_yh['yh'] = yh[0]

            # real test data price VS. predicted price
            x_yh = self.scaler.inverse_transform(x_yh)

            return x_yh[0]


    class RBF(Generalnn):
        type = 'RBF'


        def __init__(self):
            super().__init__()
            print("RBF")


        def train(self, batch_size=32, epochs=50):
            train_sz = self.X_train.shape[0]

            X_train = np.reshape(self.X_train, (train_sz, 1))
            y_train = np.reshape(self.y_train, (train_sz, 1))

            regressor = Sequential()

            regressor.add(RBFLayer(500, initializer=InitCentersRandom(X_train), betas=2.0, input_shape=(1,)))
            regressor.add(Dropout(.2))

            regressor.add(Dense(units=1, kernel_initializer='uniform', activation='sigmoid'))

            regressor.compile(optimizer='adam', loss='mean_squared_error')

            regressor.fit(X_train, y_train, batch_size=32, epochs=epochs)

            self.regressor = regressor

            return self


        def load_model(self, model_path=None):
            if self.regressor is None:
                self.regressor = load_model(model_path, custom_objects={'RBFLayer': RBFLayer})


        def test(self, model_path=None):
            self.load_model(model_path)

            inputs = np.array(self.X_test)
            predicted_price = self.regressor.predict(inputs)

            df = pd.DataFrame()
            df['X'] = inputs
            df['y_hat'] = predicted_price

            predicted_price = self.scaler.inverse_transform(df)
            _mape = mape(predicted_price[:, 0], predicted_price[:, 1])
            _mse = mean_squared_error(predicted_price[:, 0], predicted_price[:, 1])

            return _mse, _mape


        def predict(self, X, model_path=None):
            self.load_model(model_path)

            x = np.array([X])
            yh = self.regressor.predict(x)

            x_yh = pd.DataFrame()
            x_yh['x'] = x
            x_yh['yh'] = yh

            x_yh = self.scaler.inverse_transform(x_yh)

            return x_yh[0]




In [31]:
obj = stocknn().RBF
obj.preprocess('AAPL.csv', test_size=0.2).train(batch_size=32, epochs=1)
model = obj.save_model('AAPL')
mape, _ = obj.test(model)
_, pred = obj.predict(100)

RNN
backpropagation
RBF


FileNotFoundError: [Errno 2] File b'AAPL.csv' does not exist: b'AAPL.csv'