In [1]:
import pandas as pd 
import numpy as np
import keras
import math 
import random 

from matplotlib import pyplot as plt
from sklearn.preprocessing import MinMaxScaler

2023-04-27 14:33:20.442226: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-04-27 14:33:20.578774: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-04-27 14:33:20.578793: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-04-27 14:33:21.349910: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-

In [2]:

# -----------------------------------------------------------------
# Encapsulates a single data point indicating a trend change. 
# 
class TrendPoint: 
    def __init__(self, index: int, price: float, point_type: str): 
        self.index = index
        self.price = price
        self.point_type = point_type
    
# -----------------------------------------------------------------
# Encapsulates a metaseries indicating trend changes in a price 
# series. 
# 
class TrendData: 
    def __init__(self): 
        self.points = list()
        
    @property
    def length(self): 
        return len(self.points)
    
    @property 
    def last_point(self): 
        if (self.length) < 1: 
            return None
        return self.points[-1]

    # -----------------------------------------------------------------
    # Appends a data point of a given type to the end of the metaseries.
    # If a point of that type already exists at the end of the metaseries, 
    # it is replaced with the new point's data.
    # 
    def append_point(self, index: int, price: float, ptype: str): 
        if (self.length > 0 and self.points[-1].point_type == ptype): 
            self.points[-1].index = index
            self.points[-1].price = price
        else:
            self.points.append(TrendPoint(index, price, ptype))
    
    # -----------------------------------------------------------------
    # Returns the metaseries normalized to be overlaid over the original
    # price series.
    # 
    def as_price_series(self, start_price: float): 
        series = list()
        index = 0
        price = start_price
        
        for i in range(self.length): 
            point = self.points[i]
            price_diff = point.price - price
            index_diff = point.index - index
            if (index_diff > 1): 
                slope = price_diff / index_diff
                
                for n in range(index_diff): 
                    series.append(price + (n * slope))
                    
                price = point.price
                index = point.index
        
        return series
    
    # -----------------------------------------------------------------
    # Returns the metaseries normalized to the length of the original 
    # price series, with the following form: 
    #
    # trend downturn (reversal to the down direction): 0 
    # trend upturn (reversal to the up direction): 1
    # continuation: 0.5
    # 
    def as_boolean(self, start_price: float): 
        series = list()
        prev_index = 0
        prev_price = start_price
        
        for i in range(self.length): 
            point = self.points[i]
            price_diff = point.price - prev_price
            index_diff = point.index - prev_index
                
            if (index_diff > 1): 
                for n in range(1, index_diff): 
                    series.append(0.5)
                    
            if (price_diff < 0): 
                series.append(1)
            else: 
                series.append(0)
                    
            prev_price = point.price
            prev_index = point.index
        
        return series
    
    # -----------------------------------------------------------------
    # Returns an x,y series containing either the highs (trend downturns)
    # or the lows (trend upturns). 
    #
    # Returns as tuple: x, y
    # 
    def as_scatterplot(self, ptype: str):
        x = list()
        y = list()
        
        for i in range(self.length): 
            if (self.points[i].point_type == ptype): 
                y.append(self.points[i].price)
                x.append(self.points[i].index)
        
        return x, y
    
    
# -----------------------------------------------------------------
# Extracts an approximation of the trend and trend changes over 
# time of the given price series. 
# 
# df: pandas DataFrame containing the price series column
# col_name: the name of the price column in the given DataFrame
# period: a lower value will result in more granular trend changes 
# 
def extract_trend(series: pd.Series, period: int): 
    values = series.values
    data = TrendData()
    last_hi_price = values[0]
    last_lo_price = values[0]
    
    #get the first high and low of the range 0-period
    first_hi = values[0]
    first_hi_index = 0
    first_lo = values[0]
    first_lo_index = 0
    for i in range(period): 
        if (values[i] > first_hi): 
            first_hi = values[i]
            first_hi_index = i
        if (values[i] < first_lo): 
            first_lo = values[i]
            first_lo_index = i
    
    #append the first high & low in the right order
    if (first_hi_index > first_lo_index): 
        data.append_point(first_lo_index, first_lo, 'lo')
        data.append_point(first_hi_index, first_hi, 'hi')
    else: 
        data.append_point(first_hi_index, first_hi, 'hi')
        data.append_point(first_lo_index, first_lo, 'lo')
        
    #get the remaining trend points
    start_index = period
    end_index = 0
    
    while (start_index < len(values)-1): 
        last_point = data.last_point
    
        # count [period] points out from start
        end_index = start_index + period
        if (end_index > len(values)): 
            end_index = len(values)
            
        new_lo = start_index
        new_hi = start_index
        point_added = False
        
        # find highs & lows in the current series subset 
        for i in range(start_index, end_index): 
            val = values[i]
            if (last_point.point_type == 'hi'): 
                if (val > last_point.price):
                    data.append_point(i, val, 'hi')
                    point_added = True
                    break
                if (val < values[new_lo]): 
                    new_lo = i
            else:
                if (val < last_point.price):
                    data.append_point(i, val, 'lo')
                    point_added = True
                    break
                if (val > values[new_hi]): 
                    new_hi = i
        
        if not point_added: 
            if (values[new_lo] < values[start_index]):
                data.append_point(new_lo, values[new_lo], 'lo')

            if (values[new_hi] > values[start_index]):
                data.append_point(new_hi, values[new_hi], 'hi')
        
        start_index = data.last_point.index
    return data

In [4]:
def scale_col_values(df, col_name, min_value=0, max_value=1): 
    values = df[col_name].values.reshape(-1, 1)
    scaler = MinMaxScaler(feature_range=(min_value, max_value))
    scaled_values = scaler.fit_transform(values)
    df[col_name] = scaled_values.transpose()[0]
    return df

def squash_col_outliers(df, col_name, min_quantile=0.01, max_quantile=0.99): 
    q_lo = df[col_name].quantile(min_quantile)
    q_hi  = df[col_name].quantile(max_quantile)
    
    df.loc[df[col_name] >= q_hi, col_name] = q_hi
    df.loc[df[col_name] <= q_lo, col_name] = q_lo
    return df

df = pd.read_csv("data/TSLA-d.csv", index_col=0)
df.pop("Volume")
df["Range"] = (df["High"] - df["Low"]) / df["Open"]
df.pop("Open")
df.pop("High")
df.pop("Low")
df['Change'] = df["Adj Close"].pct_change()
df = df. tail(-1) 
df.pop("Close")
df = pd.DataFrame(df.values, columns=['Adj Close', 'Range', 'Change'])
df = squash_col_outliers(df, 'Change')
df = squash_col_outliers(df, "Range", min_quantile=0.0, max_quantile=0.97)
df = scale_col_values(df, 'Change')
df = scale_col_values(df, 'Range')

trend = extract_trend(df['Adj Close'], 100)
#df['Trend'] = trend.as_boolean(df['Adj Close'][0])
df.pop("Adj Close")
df.head()

Unnamed: 0,Range,Change
0,1.0,0.437608
1,1.0,0.057881
2,1.0,0.0
3,1.0,0.0
4,0.989125,0.353968


In [None]:
# VI: Shaping the Data for LSTM Input

'''
Finally, we have 3 columns (or features): Range, Change, and Trend. 
Let's pretend that Change is what we want the model to predict. 

The input for a keras LSTM requires a three dimensional array with the shape: 
(s, t, f) 

s = samples: the number of samples in the data set (i.e. the number of rows of data) 
t = timesteps: the number of timesteps to be input for each sample (also sometimes called the 'lag')
f = features: the number of distinct features to be considered; in this case, 3 (Range, Change, Trend)

An LSTM can predict multiple output features, and can do so with a variable offset and width. But just to 
keep things simple, we'll assume for this example that the output offset is 1, the LSTM will predict only 
one output feature (Change), and it will predict for only one timestep: the next day's Change. 

Note also that the output feature need not be one of the input features as well. In this case, Change is 
present in both the input and the output. 

X represents the input values. 
y represents the predicted or expected values. 

X: Range(t[-10:0]), Change(t[-10:0]), Trend(t[-10:0])
y: Change (t+1)
'''

In [None]:
# extract y column (col to be predicted)
def extract_y(df, colname, ntimesteps): 
    #TODO: don't need to shift here 
    shifted = df.shift(1)
    shifted = shifted.tail(-1) 
    shifted = shifted.tail(-ntimesteps)
    return shifted[colname].values


In [4]:
# extract X 
def extract_X(df, ntimesteps): 
    features = len(df.columns)
    X = list()
    
    #offset for timesteps
    offsets = list()
    for i in range (ntimesteps, 0, -1): 
        offsets.append(df.shift(i))
        
    #combine timestep columns into rows 
    combined = pd.concat(offsets, axis=1)
    combined = combined.tail(-ntimesteps) 
    combined.drop(combined.tail(1).index, inplace=True)
    
    #reshape each row (timesteps, features)
    for i in range(len(combined)): 
        row = combined.iloc[i].to_numpy()
        xrow = list()
        for n in range(ntimesteps): 
            xrow.append(row[n*features:(n*features)+features])
        X.append(xrow)
    
    #return as numpy array
    return np.array(X)

In [5]:
# EXTRACT X and y
timesteps = 10
X = extract_X(df, timesteps)
y = extract_y(df, 'Change', timesteps)

print(X.shape)
print(y.shape)

(3200, 10, 3)
(3200,)


In [6]:
class DataSet:
    def __init__(self, X, y): 
        if X.ndim != 3: 
            raise Exception("Expected a 3-dimensional array for X")
        if y.ndim != 1: 
            raise Exception("Expected a 1-dimensional array for y")
        if len(X) != len(y): 
            raise Exception("Length of X and y must be the same")
        
        self.X = X
        self.y = y
        
    def split(self, pct): 
        count = int(self.size * pct)
        new_dataset = DataSet(self.X[:count], self.y[:count])
        self.X = self.X[:-count]
        self.y = self.y[:-count]
        return new_dataset
        
    @property
    def size(self): 
        return len(self.X)


In [7]:
# SPLIT TO TEST & TRAIN
train = DataSet(X, y)
val = train.split(0.3)
test = val.split(0.3)

In [5]:
print(train.size)
print(val.size)
print(test.size)

NameError: name 'train' is not defined