## Import Libraries

In [7]:
import pandas_datareader.data as pdr
import datetime
import numpy as np
import unittest
import pandas as pd


# seems to be a problem with pandas_datareader
# use this fix to get data from Yahoo! Finance
import fix_yahoo_finance as yf
yf.pdr_override()

# import tensorflow and print info
import tensorflow as tf
gpu_info = "Not found" if not tf.test.gpu_device_name() else tf.test.gpu_device_name() 
print("Tensorflow version %s, GPU: %s" % (tf.__version__,gpu_info))



Tensorflow version 1.2.1, GPU: Not found


## Helper Functions

In [8]:
def run_unit_tests(testClass):
    suite = unittest.TestLoader().loadTestsFromTestCase(testClass)
    unittest.TextTestRunner().run(suite)

## Data
Create a class that downloads the stock data and sets up the input/output data

In [22]:
class CreateDataSet:
    
    def __init__(self,symbol,number_of_days=10):
        self.symbol = symbol
        self.number_of_days = number_of_days
        self.train_start_date = None
        self.train_end_date = None
        self.test_start_date = None
        self.test_end_date = None
        self.validation_start_date = None
        self.validation_end_date = None
        self.yahoo_data = None
        self.pandas_df = None
        self.training_df = None
        self.test_df = None
        self.validation_df = None
        self.training_x = None
        self.training_y = None
        self.test_x = None
        self.test_y = None
        self.validation_x = None
        self.validation_y = None
        self.batch_size = None

        
    def set_training_dates(self,start_date,end_date):
        self.__check_date_string_format(start_date)
        self.__check_date_string_format(end_date)
        self.__check_start_date_before_end_date(start_date,end_date)
        self.train_start_date = start_date
        self.train_end_date = end_date
        
    def set_test_dates(self,start_date,end_date):
        self.__check_date_string_format(start_date)
        self.__check_date_string_format(end_date)
        self.__check_start_date_before_end_date(start_date,end_date)
        self.test_start_date = start_date
        self.test_end_date = end_date
        
    def set_validation_dates(self,start_date,end_date):
        self.__check_date_string_format(start_date)
        self.__check_date_string_format(end_date)
        self.__check_start_date_before_end_date(start_date,end_date)
        self.validation_start_date = start_date
        self.validation_end_date = end_date
        
    def set_batch_size(self,batch_size):
        self.batch_size = batch_size
        
    def download_data(self):
        assert self.train_start_date != None, "Training start date not set"
        assert self.train_end_date != None, "Training end date not set"
        assert self.test_start_date != None, "Test start date not set"
        assert self.test_end_date != None, "Test end date not set"
        assert self.validation_start_date != None, "Validation start date not set"
        assert self.validation_end_date != None, "Validation end date not set"
        
        start_date, end_date = self.__calculate_download_start_end_dates()

        self.yahoo_data = self.__download_data_yahoo(self.symbol,start_date,end_date)
        
        assert self.yahoo_data.shape != (0,0), "No data was downloaded"
        
    def create_dataset(self):
        
        if self.yahoo_data == None:
            self.download_data()
        
        prices = self.yahoo_data['Adj Close']
        
        # fill in missing data
        prices.fillna(method='ffill', inplace=True)
        prices.fillna(method='bfill', inplace=True)
        
        # scale features to make neural network easier to train
        scaled_prices = self.__scale_data(prices)
        
        # input features named X0, X1, X2, ...
        # output feature named Y (this is what we are predicting, tomorrows stock price)
        x_features = [ 'X%d' % (i) for i in range(self.number_of_days)]
        y_feature = ['Y']
        features = x_features + y_feature
        
        df = pd.DataFrame(index=prices.index,columns=features)
        df['Y'] = scaled_prices.shift(-1)
        
        for i in range(self.number_of_days):
            x_feature = "X%d" % (i)
            df[x_feature] = scaled_prices.shift(i)
            
        # trim the dataset to only use dates with enough data
        #
        # example: if number_of_days=10, then first date with enough
        # information will be day 10 (i.e. array index 9)
        #
        # last date will be next to last day in dataset since predicting tommorrows price
        df = df[self.number_of_days-1:-1]
        
        # there shouldn't be any NaN values
        assert df.isnull().values.any() == False, "Dataset contains unexpected NaN values"
                    
        self.pandas_df = df
        
        # split dataset into training/test/validation sets
        
        # data as pandas dataframes
        self.training_df = df[self.train_start_date:self.train_end_date]
        self.test_df = df[self.test_start_date:self.test_end_date]
        self.validation_df = df[self.validation_start_date:self.validation_end_date]
        
        # data as numpy arrays
        self.training_x = self.training_df[x_features].values
        self.training_y = self.training_df[y_feature].values
        self.test_x = self.test_df[x_features].values
        self.test_y = self.test_df[y_feature].values
        self.validation_x = self.validation_df[x_features].values
        self.validation_y = self.validation_df[y_feature].values
        
    # function used with neural networks
    # training happens on a batch instead of all the data
    # this function is called multiple times using an iterator to get the next batch
    def get_batches(self):
        assert self.batch_size != None
        
        n_batches = len(self.training_x) // self.batch_size
        assert n_batches > 0, "Batch size %d is too big (resulted in 0 batches)" % (self.batch_size)
        
        x = self.training_x[:n_batches*self.batch_size]
        y = self.training_y[:n_batches*self.batch_size]
        
        for ii in range(0, len(x), self.batch_size):
            yield x[ii:ii+self.batch_size], y[ii:ii+self.batch_size]
        
    def __download_data_yahoo(self,symbol,start_date,end_date):
        data = pdr.get_data_yahoo(symbol, start=start_date, end=end_date)
        return data
    
    def __str_to_datetime(self,date_str):
        return datetime.datetime.strptime(date_str,"%Y-%m-%d")
    
    def __check_date_string_format(self,date_str):
        assert datetime.datetime.strptime(date_str,"%Y-%m-%d"), "Dates must be a string in the format YYYY-MM-DD"
        
    def __check_start_date_before_end_date(self,start_date,end_date):
        start = self.__str_to_datetime(start_date)
        end = self.__str_to_datetime(end_date)
        assert start < end, "Start date must be before the end date"
        
    def __calculate_download_start_end_dates(self):
        train_start = self.__str_to_datetime(self.train_start_date)
        test_start = self.__str_to_datetime(self.test_start_date)
        validation_start = self.__str_to_datetime(self.validation_start_date)
        train_end = self.__str_to_datetime(self.train_end_date)
        test_end = self.__str_to_datetime(self.test_end_date)
        validation_end = self.__str_to_datetime(self.validation_end_date)
        
        start_datetime = min(train_start,test_start,validation_start)
        end_datetime = max(train_end,test_end,validation_end)
        
        start_date = start_datetime.strftime("%Y-%m-%d")
        end_date = end_datetime.strftime("%Y-%m-%d")
          
        return start_date,end_date
    
    def __scale_data(self,data):
        mean, std = data.mean(), data.std()
        self.mean = mean
        self.std = std
        
        scaled_data = (data - mean) / std
        return scaled_data
    
    def unscale_data(self,scaled_data):
        return scaled_data * self.std + self.mean
    


In [26]:
# Test the create data set class
class CreateDataSetTest(unittest.TestCase):
        
    def test_set_dates(self):
        dataset = CreateDataSet('SPY')
        dataset.set_training_dates(start_date="2010-01-01",end_date="2011-01-01")
        dataset.set_test_dates(start_date="2010-01-01",end_date="2011-01-01")
        dataset.set_validation_dates(start_date="2010-01-01",end_date="2011-01-01")
        
    def test_set_dates_not_str(self):
        dataset = CreateDataSet('SPY')
        
        with self.assertRaises(Exception):
            dataset.set_training_dates(start_date=datetime.datetime(2010,1,1),end_date="2011-01-01")
            
        with self.assertRaises(Exception):
            dataset.set_training_dates(start_date="2010-01-01",end_date=datetime.datetime(2011,1,1))
            
        with self.assertRaises(Exception):
            dataset.set_test_dates(start_date=datetime.datetime(2010,1,1),end_date="2011-01-01")
            
        with self.assertRaises(Exception):
            dataset.set_test_dates(start_date="2010-01-01",end_date=datetime.datetime(2011,1,1))
            
        with self.assertRaises(Exception):
            dataset.set_validation_dates(start_date=datetime.datetime(2010,1,1),end_date="2011-01-01")
            
        with self.assertRaises(Exception):
            dataset.set_validation_dates(start_date="2010-01-01",end_date=datetime.datetime(2011,1,1))
            
    def test_set_dates_bad_str_format(self):
        dataset = CreateDataSet('SPY')
        
        with self.assertRaises(Exception):
            dataset.set_training_dates(start_date="01-01-2011",end_date="2011-01-01")
            
        with self.assertRaises(Exception):
            dataset.set_training_dates(start_date="2010-01-01",end_date="01-01-2011")
            
        with self.assertRaises(Exception):
            dataset.set_test_dates(start_date="01-01-2011",end_date="2011-01-01")
            
        with self.assertRaises(Exception):
            dataset.set_test_dates(start_date="2010-01-01",end_date="01-01-2011")
            
        with self.assertRaises(Exception):
            dataset.set_validation_dates(start_date="01-01-2011",end_date="2011-01-01")
            
        with self.assertRaises(Exception):
            dataset.set_validation_dates(start_date="2010-01-01",end_date="01-01-2011")
            
    def test_set_dates_start_date_after_end_date(self):
        dataset = CreateDataSet('SPY')
        
        with self.assertRaises(Exception):
            dataset.set_training_dates(start_date="2011-01-02",end_date="2011-01-01")
            
        with self.assertRaises(Exception):
            dataset.set_test_dates(start_date="2011-01-02",end_date="2011-01-01")
            
        with self.assertRaises(Exception):
            dataset.set_validation_dates(start_date="2011-01-02",end_date="2011-01-01")
            
    def test_download_start_end_dates_1(self):
        dataset = CreateDataSet('SPY')
        dataset.set_training_dates(start_date="2010-01-04",end_date="2010-06-01")
        dataset.set_test_dates(start_date="2011-01-04",end_date="2011-06-01")
        dataset.set_validation_dates(start_date="2012-01-04",end_date="2012-06-01")        
        dataset.download_data()
        
        # (start_date,end_date) should be (training_start,validation_end)
        self.__verify_start_end_date(dataset.yahoo_data,expected_start="2010-01-04",expected_end="2012-06-01")
        
    def test_download_start_end_dates_2(self):
        dataset = CreateDataSet('SPY')
        dataset.set_test_dates(start_date="2010-01-04",end_date="2010-06-01")
        dataset.set_validation_dates(start_date="2011-01-04",end_date="2011-06-01")
        dataset.set_training_dates(start_date="2012-01-04",end_date="2012-06-01")        
        dataset.download_data()
        
        # (start_date,end_date) should be (test_start,training_end)
        self.__verify_start_end_date(dataset.yahoo_data,expected_start="2010-01-04",expected_end="2012-06-01")
        
    def test_download_start_end_dates_3(self):
        dataset = CreateDataSet('SPY')
        dataset.set_validation_dates(start_date="2010-01-04",end_date="2010-06-01")
        dataset.set_training_dates(start_date="2011-01-04",end_date="2011-06-01")
        dataset.set_test_dates(start_date="2012-01-04",end_date="2012-06-01")        
        dataset.download_data()
        
        # (start_date,end_date) should be (validation_start,test_end)
        self.__verify_start_end_date(dataset.yahoo_data,expected_start="2010-01-04",expected_end="2012-06-01")
        
    def __verify_start_end_date(self,dataframe,expected_start,expected_end):
        start_date, end_date = dataframe.index[0], dataframe.index[-1]
        start_date = start_date.strftime("%Y-%m-%d")
        end_date = end_date.strftime("%Y-%m-%d")
        
        self.assertEqual(start_date,expected_start)
        self.assertEqual(end_date,expected_end)
        
    def test_create_dataset(self):
        dataset = CreateDataSet('SPY')
        dataset.set_training_dates(start_date="2010-01-04",end_date="2010-06-01")
        dataset.set_test_dates(start_date="2011-01-04",end_date="2011-06-01")
        dataset.set_validation_dates(start_date="2012-01-04",end_date="2012-06-01")        
        dataset.create_dataset()
        
        # check for 10 x features and 1 y feature
        self.assertEqual(dataset.training_x.shape[1],10)
        self.assertEqual(dataset.training_y.shape[1],1)
        self.assertEqual(dataset.test_x.shape[1],10)
        self.assertEqual(dataset.test_y.shape[1],1)
        self.assertEqual(dataset.validation_x.shape[1],10)
        self.assertEqual(dataset.validation_y.shape[1],1)
        
        # check dates
        self.__check_data_in_date_range(dataset.training_df,start_date="2010-01-04",end_date="2010-06-01")
        self.__check_data_in_date_range(dataset.test_df,start_date="2011-01-04",end_date="2011-06-01")
        self.__check_data_in_date_range(dataset.validation_df,start_date="2012-01-04",end_date="2012-06-01")
        
    def __check_data_in_date_range(self,df,start_date,end_date):
        dates = df.index
        start_timestamp = pd.to_datetime(start_date)        
        end_timestamp = pd.to_datetime(end_date)
        
        for date in dates:
            self.assertTrue(date >= start_timestamp)
            self.assertTrue(date <= end_timestamp)
            
    def test_get_batches(self):
        dataset = CreateDataSet('SPY')
        dataset.set_training_dates(start_date="2010-01-04",end_date="2010-06-01")
        dataset.set_test_dates(start_date="2011-01-04",end_date="2011-06-01")
        dataset.set_validation_dates(start_date="2012-01-04",end_date="2012-06-01")        
        dataset.create_dataset()
        
        dataset.set_batch_size(32)
        for x_batch, y_batch in dataset.get_batches():
            self.assertEqual(x_batch.shape,(32,10))
            self.assertEqual(y_batch.shape,(32,1))
            
    
        
run_unit_tests(CreateDataSetTest)

[*********************100%***********************]  1 of 1 downloaded

.

[*********************100%***********************]  1 of 1 downloaded

.

[*********************100%***********************]  1 of 1 downloaded

.

[*********************100%***********************]  1 of 1 downloaded

.

[*********************100%***********************]  1 of 1 downloaded

.....
----------------------------------------------------------------------
Ran 9 tests in 2.116s

OK


## Create your model
Setup whatever model you plan to use for price prediction. i.e. neural network, linear regression, etc.

In [11]:
class NeuralNetwork:
    
    def __init__(self):
        self.learning_rate = None
        self.number_of_inputs = None
        self.keep_prob = None
        self.hidden_layers = []
        
    def add_inputs(self,number_of_inputs):
        self.number_of_inputs = number_of_inputs
        
    def add_hidden_layer(self,number_of_nodes,dropout=False):
        self.hidden_layers.append((number_of_nodes,dropout))
            
    def set_learning_rate(self,learning_rate):
        self.learning_rate = learning_rate
        
    def set_keep_probability(self,keep_prob):
        self.keep_prob = keep_prob
        
    def build_model(self):
        
        inputs_ = tf.placeholder(tf.float32,[None,self.number_of_inputs],name="inputs")
        targets_ = tf.placeholder(tf.float32,[None,1],name="targets")
        keep_prob = tf.placeholder(tf.float32,name="keep_prob")
        
        # add hidden layers
        layer = inputs_
        for i in range(len(self.hidden_layers)):
            num_nodes, dropout = self.hidden_layers[i]

            layer = tf.layers.dense(layer,num_nodes,activation=tf.nn.relu)
            
            if dropout:
                layer = tf.nn.dropout(layer,keep_prob)
            
            
        # last layer predict the price
        output = tf.layers.dense(layer,1,activation=None,name="output")
        
        # setup loss function and optimizer
        loss = tf.nn.l2_loss(targets_-output)
        cost = tf.reduce_mean(loss)
        opt = tf.train.AdamOptimizer(self.learning_rate).minimize(cost)

nn = NeuralNetwork()
nn.set_learning_rate(.001)
nn.set_keep_probability(0.5)
nn.add_inputs(number_of_inputs=10)
nn.add_hidden_layer(number_of_nodes=20)
nn.build_model()    

  if d.decorator_argspec is not None), _inspect.getargspec(target))


## Train your model
Define the function to train your model using the training data and the test data

In [None]:
def train(model,train_data,test_data):
    pass

## Evaluate your model on the validation set
Return the accuracy of your model

In [None]:
def evaluate(model,validation_data):
    pass

## Display the results
Show graphs and metrics of your model accuracy

In [None]:
def results():
    pass

## Train and evaluate your model here
Call the functions to get the data, train the model, and evaluate it on the validation data