Collection of re-usable functions. Import it to a given notebook with `%run utils.ipynb`

In [None]:
# To be able to use django models etc in the notebook
from django_for_jupyter import init_django
init_django(project_name='cryptooracle')

In [6]:
import pandas as pd
import numpy as np
import psycopg2
#import yfinance as yf
import time
import plotly.graph_objects as go
import plotly.io as pio
import plotly.express as px
import plotly.figure_factory as ff
import scipy.stats as stats
import plotly.offline as offline_py

from sklearn import preprocessing
from sklearn.metrics import mean_absolute_error
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler

from datetime import datetime

In [2]:
# These Stock Django models hold OHLCV data per minute
# Sourced from the CSV files here http://www.cryptodatadownload.com/data/gemini/
# and then updated with the https://docs.coinapi.io/#latest-data coin API
# run ./manage.py import_stocks
# and ./manage.py update_stocks to build that database
from predictor.models import Stock
from django.conf import settings

In [7]:
# Check TF 2.0
import tensorflow as tf
print(tf.__version__)

2.4.1


In [8]:
from keras.models import Sequential
from keras.layers import Activation, Dense
from keras.layers import LSTM
from keras.layers import Dropout, Conv1D

In [10]:
# The database parameters
DB_PARAMS = {'host': settings.DATABASES['default']['HOST'],
             'port': settings.DATABASES['default']['PORT'],
             'database': settings.DATABASES['default']['NAME'],
             'user': settings.DATABASES['default']['USER'],
             'password': settings.DATABASES['default']['PASSWORD']
             }

In [11]:
def connect(db_params):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**db_params)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    print("Connection successful")
    return conn

In [4]:
color_scheme = {
    'index': '#B6B2CF',
    'etf': '#2D3ECF',
    'tracking_error': '#6F91DE',
    'df_header': 'silver',
    'df_value': 'white',
    'df_line': 'silver',
    'heatmap_colorscale': [(0, '#6F91DE'), (0.5, 'grey'), (1, 'red')],
    'background_label': '#9dbdd5',
    'low_value': '#B6B2CF',
    'high_value': '#2D3ECF',
    'y_axis_2_text_color': 'grey',
    'shadow': 'rgba(0, 0, 0, 0.75)',
    'major_line': '#2D3ECF',
    'minor_line': '#B6B2CF',
    'main_line': 'black'}

def generate_config():
    return {'showLink': False, 'displayModeBar': False, 'showAxisRangeEntryBoxes': True}

def _generate_traces(name_df_color_data):
    traces = []

    for name, df, color in name_df_color_data:
        traces.append(go.Scatter(
            name=name,
            x=df.index,
            y=df,
            mode='lines',
            line={'color': color}))

    return traces

def days_to_weeks(open_prices, high_prices, low_prices, close_prices):
    """Converts daily OHLC prices to weekly OHLC prices.
    
    Parameters
    ----------
    open_prices : DataFrame
        Daily open prices for each ticker and date
    high_prices : DataFrame
        Daily high prices for each ticker and date
    low_prices : DataFrame
        Daily low prices for each ticker and date
    close_prices : DataFrame
        Daily close prices for each ticker and date

    Returns
    -------
    open_prices_weekly : DataFrame
        Weekly open prices for each ticker and date
    high_prices_weekly : DataFrame
        Weekly high prices for each ticker and date
    low_prices_weekly : DataFrame
        Weekly low prices for each ticker and date
    close_prices_weekly : DataFrame
        Weekly close prices for each ticker and date
    """
    
    # TODO: Implement Function
    
    open_prices_weekly = open_prices.resample('W').first()
    high_prices_weekly = high_prices.resample('W').max()
    low_prices_weekly = low_prices.resample('W').min()
    close_prices_weekly = close_prices.resample('W').last()
    
    return open_prices_weekly, high_prices_weekly, low_prices_weekly, close_prices_weekly

In [None]:
def plot_hist(df, colname, nbins, with_std_norm=True):

    fig = px.histogram(df[colname] , x=colname, marginal="violin",
                       hover_data=[colname], histnorm='probability', nbins=nbins)

    if with_std_norm:
        # Standard normal
        ndist = stats.norm(loc=0, scale=1)
        low_bound = ndist.ppf(.0001)
        high_bound = ndist.ppf(.999999)
        x_norm = np.linspace(low_bound, high_bound, 201)
        y_norm = ndist.pdf(x_norm)

        fig.add_traces(go.Scatter(x=x_norm, y=y_norm, mode = 'lines',
                                  line = dict(color='rgba(0,255,0, 0.6)',
                                              width = 1
                                             ),
                                  name = 'normal'
                                 ))

    fig.update_layout(template = 'plotly_dark')


    fig.show()

In [None]:
def render_plot(series, title, xlabel, ylabel, plot_type='scatter'):
    #pio.renderers.default = 'colab'
    if plot_type == 'scatter':
        fig = px.scatter(x=series.index, y=series.values)
    elif plot_type == 'bar':
        fig = px.bar(x=series.index, y=series.values, width=1000 * 3600 * 24 * 1)
    fig.update_layout(
        title=title,
        xaxis_title=xlabel,
        yaxis_title=ylabel,
        font=dict(
            family="Courier New, monospace",
            size=18,
            color="RebeccaPurple"
        )
    )
    fig.show()

In [5]:
def get_stocks_df():
    conn = connect(DB_PARAMS)
    df = pd.read_sql_query('select * from "predictor_stock"', con=conn)

    # Cleanup
    df.drop(columns=['id', 'name', 'updated', 'created', 'open', 'high', 'low'], inplace=True)
    df.set_index('dt', inplace=True)
    # Timezone aware date
    df.index = pd.to_datetime(df.index, utc=True)
    # Ensure sorted
    df = df.sort_index()
    
    return df

In [None]:
def compute_returns(prices):
    """
    Compute returns from prices (adj close prices for example)
    
    Parameters
    ----------
    prices : DataFrame
        Prices for each ticker and date
    
    Returns
    -------
    log_returns : DataFrame
        Log returns for each ticker and date
    """
    return (prices- prices.shift(1))/prices.shift(1)

In [None]:
def compute_log_returns(prices):
    """
    Compute log returns 
    
    Parameters
    ----------
    prices : DataFrame
        Prices for each ticker and date
    
    Returns
    -------
    log_returns : DataFrame
        Log returns for each ticker and date
    """
    return np.log(prices) - np.log(prices.shift(1))

In [None]:
def log_returns_to_returns(log_returns):
    """
    Convert log returns back into returns
    
    Parameters
    ----------
    log_returns : Series
        Log returns
    
    Returns
    -------
    returns : The raw returns
    """
    return np.exp(log_returns) - 1

In [None]:
def standardize(df, colname):
    """
    Standard by subtracting the mean and diving by the standard deviation
    
    Parameters
    ----------
    df : DataFrame
    colnamne: column name to apply standardization to    -------
    log_returns : DataFrame
        Log returns for each ticker and date
    
    Returns

    """
    mean = df[colname].mean()
    std = df[colname].std()
    return (df[colname]-mean)/(std)

In [None]:
def unstandardize(df, colname, mean, std):
    """
    Unstandardize by multiplyging by the std and adding the mean of the original distr
    
    Parameters
    ----------
    df : DataFrame
    colnamne: column name to apply standardization to    -------
    mean: mean of orig distribution
    std: std dev of orig distribution
    
    Returns
        unstandardized series
    """
    return (std*df[colname])+mean

In [None]:
def returns_to_prices(df, colname, price_zero):
    """
    Starting with price at time 0, `p_0`, then get the series of prices
    e.g p_n = (1_r_n)...(1+r1)p_0
    
    Parameters
    ----------
    df : DataFrame
    colnamne: returns column name to compute prices from
    price_zero: starting price
    
    Returns
        series of prices
    """
    return price_zero*df[colname].fillna(0).add(1).cumprod()

In [1]:
def split_data(data, training_size=0.8):
    """
    Split the data into training and test sets
    We want to preserve order and not shuffle at this stage as past points will be used to predict next in the sequence
    The test set will represent the unseen future
    However the windowed dataset will shuffle around each windows and along with target label (see that func)
    
    Params:
        data: the dataset
        training_sie: the split e.g. 0.8 means 80% of data is used in the training set
    """
    return data[:int(training_size*len(data))], data[int(training_size*len(data)):]

In [4]:
def windowed_dataset(series, shuffle_buffer, window_len, batch_size, window_scaling=False, multivar=False):
    """
    If we have a series like [1,2,3,4,5,6]
    We want to split it into windows, e.g. we take previous value of the series
    as the X input features and want to predict the following value as output Y
    e.g. [1,2] - > 3, so what we want to do is split the data into windows
    of length window_len + 1 (the +1 acconts for the label)
    E.g.
      [1, 2, 3]
      [2, 3, 4]
      [3, 4, 5]
      [4, 5, 6]


    We shuffle the data to avoid bias

    Finally we split the example into input/target

    [[1, 2], [3]]
    .
    .

    so it's appropriate to feed into the model.fit    

    and we batch it into batches of batch_size
    
    Params:
        series: the series upon which we perform the windowing
        shuffle_buffer: size of the buffer when shuffling
            (see https://www.tensorflow.org/api_docs/python/tf/data/Dataset?version=nightly#shuffle)
        window_len; how many previous elements to take into account when predicting the next
        batch_size: https://www.tensorflow.org/api_docs/python/tf/data/Dataset?version=nightly#batch
    """
    # Initially the data is (1188,) expand dims to TensorShape([1188, 1])
    if not multivar:
        series = tf.expand_dims(series, axis=-1)
    
    # https://www.tensorflow.org/api_docs/python/tf/data/Dataset
    # will be an iterable of tf.Tensor([998.325], shape=(1,), dtype=float32),...
    ds = tf.data.Dataset.from_tensor_slices(series)
    
    # https://stackoverflow.com/questions/55429307/how-to-use-windows-created-by-the-dataset-window-method-in-tensorflow-2-0
    # The +1 accounts for the label too. Create a bunch of windows over our series
    # If we started with ds = tf.data.Dataset.from_tensor_slices([1,2,3,4,5])
    # then ds = ds.window(3, shift=1, drop_remainder=False) would lead
    # to [1,2,3], [2, 3, 4], [3, 4, 5], [4, 5], [5] whereas 
    # drop_remainder=True) => [1,2,3], [2, 3, 4], [3, 4, 5]
    # Remember the first window_len are our training data and the 1 is 
    # the target/label
    # Could also do this with pandas shift
    ds = ds.window(window_len + 1, shift=1, drop_remainder=True)
    #for w in ds:
    #    print(list(w.as_numpy_iterator()))
    
    # Maps map_func across this dataset and flattens the result
    ds = ds.flat_map(lambda w: w.batch(window_len + 1))

    # Instead of standard scaling all the data, sometimes people
    # normalize the window itself wrt to initial element
    def normalize_window(w):
      return (w/w[0]) -1
    if window_scaling:
      ds = ds.map(normalize_window)

    # randomize order 
    ds = ds.shuffle(shuffle_buffer)
    
    # Collect the inputs and the label
    if multivar:
        # All rows except last (all feats), but then just the first col of last row (target output close)
        ds = ds.map(lambda w: (w[:-1, :], w[-1, 0]))
    else:
        ds = ds.map(lambda w: (w[:-1], w[-1]))

    return ds.batch(batch_size).prefetch(1)

In [2]:
def build_model(output_size, neurons, activ_func, dropout, loss, optimizer, input_shape=1):
  """
  The keras model. Will try a Conv layer initially followed by a bunch of LSTM layers with dropout.
  
  Params:
     output_size: e.g. predict 1 point in the future
     neurons: how many nuerons for each LSTM later
     activ_func: activation func, e.g. tanh
     loss: loss function to use, e.g. mse
     optimizer: e.g. adam
     
  Returns:
     The model
  """
  model = tf.keras.models.Sequential([
  tf.keras.layers.Conv1D(filters=25, kernel_size=5,
                         strides=1, padding="causal",
                         activation="relu",
                         input_shape=[None, input_shape]),
  # tf.keras.layers.LSTM(neurons, input_shape=[None, None, 1], return_sequences=True, activation=activ_func),
  tf.keras.layers.LSTM(neurons, return_sequences=True, activation=activ_func),
  tf.keras.layers.Dropout(dropout),
#   tf.keras.layers.LSTM(neurons, return_sequences=True, activation=activ_func),
#   tf.keras.layers.Dropout(dropout),
#   tf.keras.layers.LSTM(neurons, return_sequences=True, activation=activ_func),
#   tf.keras.layers.Dropout(dropout),
      
  # Ensure last layer has return sequences Flae
  tf.keras.layers.LSTM(neurons, return_sequences=False, activation=activ_func),
  tf.keras.layers.Dropout(dropout),
  tf.keras.layers.Dense(units=output_size, activation=activ_func),
  ])
  model.compile(loss=loss, optimizer=optimizer, metrics=['mae'])
  model.summary()
  return model

In [2]:
def display_results(model, scaler, dataset, dates, window_len, output_size, multivar=False):
    """
    With our predictions we de-normalize the predictions using the inverse transform.
    
    We plot those prices against the actual prices in the same date range
    
    We compute the MAE and print it.
    
    Params:
        model: the training model
        scaler: the scalar we can use to invert the normalization
        dataset: Maybe this is train or test set
        dates: the dates over this set for which we expect predictions
        window_len: how many previous points used when predicting the next
        output_size: how many steps forward are we predicting
    Returns:
        None
    """
   
    preds = model_forecast(model, dataset, window_len, multivar=multivar)
   
   
    # E.g if window_len is 5, we have predictions for [5:]  since [0, 1, 2, 3, 4] -> [5] etc. If the output_size=1
    # then we neglect the final pred since it uses the final 5 elements of training set to pred a subsequent element, which
    # we have no training data to compare with
    if not multivar:
        res_df = pd.DataFrame({'y': dataset.flatten()[window_len:], 'yhat': preds.flatten()[:-output_size]})
    else:
        res_df = pd.DataFrame({'y': dataset[window_len:, 0], 'yhat': preds[:-output_size, 0]})
    
    # Want to inverse the normalization transform
    if scaler is not None:
        if not multivar:
            res_df['y_prices'] = scaler.inverse_transform(res_df['y'].values.reshape(-1, 1)).flatten()
            res_df['yhat_prices'] = scaler.inverse_transform(res_df['yhat'].values.reshape(-1, 1)).flatten()
        else:
            # Make a fake col to satisfy the scaler which was trained with close and vol
            dummy_col = np.random.random((res_df['y'].shape[0], 1))
            expanded_y = np.hstack((res_df['y'].values.reshape(-1, 1), dummy_col))
            expanded_yhat = np.hstack((res_df['yhat'].values.reshape(-1, 1), dummy_col))
            res_df['y_prices'] = scaler.inverse_transform(expanded_y)[:, 0]
            res_df['yhat_prices'] = scaler.inverse_transform(expanded_yhat)[:, 0]
    else:
      # Window scaling
      res_df['y_prices'] = res_df['y']
      res_df['yhat_prices'] = denormalize_forecast(res_df['yhat'], dataset)
                              

    # Plot
    fig = go.Figure()
    fig.add_scatter(x=dates, y=res_df['y_prices'], mode='lines', name="Actual") 

    fig.add_scatter(x=dates, y=res_df['yhat_prices'], mode='lines', name="Predicted") 

    fig.update_layout(template = 'plotly_dark',
                      xaxis_title="Time",
                      yaxis_title="Price",)


    fig.show()
                                                  
    # Print the MAE                              
    mae = mean_absolute_error(res_df['y'], res_df['yhat'])
    print(f'The MAE is {mae}')
    
    return res_df

In [None]:
def display_history(hist):
    """
    Simply plot the training history such as the MAE and Loss over time
    
    Params:
       hist: the history obj returned by Keras
       
    Returns:
        None
    """
    fig = px.line(hist.history['loss'], title='Loss over time')

    fig.update_layout(template = 'plotly_dark',
                      xaxis_title="Time",
                      yaxis_title="Loss",)


    fig.show()

    fig = px.line(hist.history['mae'], title='MAE over time')

    fig.update_layout(template = 'plotly_dark',
                      xaxis_title="Time",
                      yaxis_title="MAE",)


    fig.show()

In [1]:
def preprocessing(df, scaler='standard', window_scaling=False, colname='close', window_len=5, start_date=None,
                  end_date=None, shuffle_buffer=1000,
                  batch_size=128, **kwargs):
    """
    Data preprocessing.
     - First keep only data between the start and end date
     - Compute the log returns from the adjusted close prices
     - Use a standard scalar to normalize that data
     - Split into training and test sets
     
     Params:
         df - The OHLC dataframe
         colname - The column we want to make predictions for (close prices)
         window_len - how many elements to use from the series when predicting the next
         start_date/end_date - The data range to model over (for example we may want to exclude
            the early days of bitcoin with the long tail)
        shuffle_buffer - buffer size for shuffling
        batch_size - size of batches
     
     Returns:
         training_price_zero - the initial price in the training set (useful when reconstructing prices from rets)
         test_price_zero - the initial price in the test set (useful when reconstructing prices from rets)
         scaler - the standard scalar (use it to do inverse transform later)
         
         model_training_data - the windowed dataset and target labels to train the NN on
         
         training_data - the series of log returns in the training set
         test_data - the series of log returns in the test set
         
         training_dates - the date series for training set
         test_dates - the date series for test set
    """

    # Date range of interest
    temp_df = df
    if start_date is not None:
        temp_df = temp_df[temp_df.index >= start_date]
    if end_date is not None:
        temp_df = temp_df[temp_df.index <= end_date]
        
    
    # First get the log returns
    prices_df = temp_df[colname]
        
    # Split into training/test datasets
    training_df, test_df = split_data(prices_df)
    
    # Want to normalize the log returns (must use same scaler on test and train
    # since not supposed to know about test set)
    if scaler == 'standard':
      print('Standard scaler')
      sc = StandardScaler()
    elif scaler == 'robust':
      sc = RobustScaler()
      print('Robust scaler')
    elif scaler == 'minmax':
      sc = MinMaxScaler()
      print('MinMax scaler')
    else:
      sc = None

    if sc is not None:
      # Fit on training, transform only the test
      training_data = sc.fit_transform(training_df.values.reshape(-1, 1)).flatten()
      test_data = sc.transform(test_df.values.reshape(-1, 1)).flatten()
      # Remember sc.inverse_transform should transform back the data too so we return the scaler too
    else:
      print('No scaling')
      training_data = training_df.values
      test_data = test_df.values
      
    # Windowed/batched training data to feed the model
    windowed_training_data = windowed_dataset(training_data, shuffle_buffer, 
                                              window_len, batch_size, window_scaling=window_scaling)
    
    # # This will help with the displaying of results etc
    # Training and test dates for plotting comparisons
    training_dates = training_df.iloc[window_len: ].index
    test_dates = test_df.iloc[window_len:].index

    return sc, windowed_training_data, training_data, test_data, training_dates, test_dates

In [1]:
def model_forecast(model, series, window_len, multivar=False):
    """
    Take the model we just trained and make predictions.
    
    We window the dataset then try to predict the next values after the window.
    Note we do not shuffle this time as we are predicting not training, and want to compare also with
    actual prices.
    
    Parameters:
        model: the ML model trained
        series: the series on which to make predictions
        window_len: size of our window for making preds, e.g. previous 5 elem to predict next perhaps
    """
    if not multivar:
        # Initially the data is (N,) expand dims to TensorShape([N, 1])
        series = tf.expand_dims(series, axis=-1)

    # Now we just use window_len not +1, because we just want inputs not label, and we predict label
    ds = tf.data.Dataset.from_tensor_slices(series)
    ds = ds.window(window_len, shift=1, drop_remainder=True)
    ds = ds.flat_map(lambda w: w.batch(window_len))

    ds = ds.batch(32).prefetch(1)

    return model.predict(ds)

In [3]:
def multivar_preprocessing(df, scaler='standard', window_scaling=False, colnames=['close', 'volume'], window_len=5, start_date=None,
                           end_date=None, shuffle_buffer=1000,
                           batch_size=128, **kwargs):
    """
    Data preprocessing.
     - First keep only data between the start and end date
     - Compute the log returns from the adjusted close prices
     - Use a standard scalar to normalize that data
     - Split into training and test sets
     
     Params:
         df - The OHLC dataframe
         colnames - The columns we want to make predictions from (volume, close prices)
         window_len - how many elements to use from the series when predicting the next
         start_date/end_date - The data range to model over (for example we may want to exclude
            the early days of bitcoin with the long tail)
        shuffle_buffer - buffer size for shuffling
        batch_size - size of batches
     
     Returns:
         training_price_zero - the initial price in the training set (useful when reconstructing prices from rets)
         test_price_zero - the initial price in the test set (useful when reconstructing prices from rets)
         sc1 - the standard scalar (use it to do inverse transform later)
         sc2 - scaler for th
         
         model_training_data - the windowed dataset and target labels to train the NN on
         
         training_data - the series of log returns in the training set
         test_data - the series of log returns in the test set
         
         training_dates - the date series for training set
         test_dates - the date series for test set
    """

    # Date range of interest
    temp_df = df
    if start_date is not None:
        temp_df = temp_df[temp_df.index >= start_date]
    if end_date is not None:
        temp_df = temp_df[temp_df.index <= end_date]
        
    
    # First get the log returns
    prices_df = temp_df[colnames]
        
    # Split into training/test datasets
    training_df, test_df = split_data(prices_df)
    
    # Want to normalize the log returns (must use same scaler on test and train
    # since not supposed to know about test set)
    if scaler == 'standard':
      print('Standard scaler')
      sc = StandardScaler()
    elif scaler == 'robust':
      sc = RobustScaler()
      print('Robust scaler')
    elif scaler == 'minmax':
      sc = MinMaxScaler()
      print('MinMax scaler')
    else:
      sc = None

    if sc is not None:
      # Fit on training, transform only the test
      training_data = sc.fit_transform(training_df.values)
      test_data = sc.transform(test_df)
      # Remember sc.inverse_transform should transform back the data too so we return the scaler too
    else:
      print('No scaling')
      training_data = training_df.values
      test_data = test_df.values
      
    # Windowed/batched training data to feed the model
    windowed_training_data = windowed_dataset(training_data, shuffle_buffer, 
                                              window_len, batch_size, window_scaling=window_scaling, multivar=True)
    
    # # This will help with the displaying of results etc
    # Training and test dates for plotting comparisons
    training_dates = training_df.iloc[window_len: ].index
    test_dates = test_df.iloc[window_len:].index

    return sc, windowed_training_data, training_data, test_data, training_dates, test_dates