In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow import keras
import csv
import numpy as np
from collections import deque
from sklearn.model_selection import train_test_split
import pandas as pd
import math
from tensorflow.keras.layers import Input
from sklearn.preprocessing import StandardScaler



# The size of the dataset to work with
size = 10000


# The number of time steps in each example
timesteps = 30

# The size of each batch
batch_size = 16

# Number of Epochs
epochs = 100


def tuple_generator(x_data, y_data, timesteps):
    ''' Input: A numpy array of Time-series Data
        Yields: A batch of Training and testing data '''

    i = 0
    y_data = y_data[timesteps-1:]
    while True:
        x_batch = np.stack([x_data[j:j+timesteps] for j in range(i, min(batch_size + i, len(x_data) + 1 - timesteps))])
        y_batch = y_data[i:i+batch_size]
        i += batch_size
        if i >= len(x_data) + 1 - timesteps:
            i = 0
        yield x_batch, y_batch

def get_column_names(name):
    """Returns the column names of a csv file."""
    with open(name) as f:
        reader = csv.reader(f)
        return next(reader)

def get_last_lines(name, n=5000):
    """Returns the last n lines of a file as a numpy array."""
    with open(name) as f:
        lines = deque(csv.reader(f), maxlen=n+1)
    return np.array(lines)[:-1]

def build_generators(name, timesteps):
    """Builds the generators for the neural network."""
    columns = get_column_names(name)
    data = get_last_lines(name, size)
    frame = pd.DataFrame(data, columns=columns)
    frame.pop('Timestamp')
    frame = frame.astype(float)
    features_to_scale = ["SMA_10", "SMA_50", "RSI_14", "Middle_Band", "Upper_Band", "Lower_Band", "TOD"]
    scaler = StandardScaler()
    frame[features_to_scale] = scaler.fit_transform(frame[features_to_scale])
    target = frame.pop('Return_Signal')
    weights = {}
    for i in target:
        weights[i] = weights.get(i, 0) + 1/len(target)
    print(weights)
    xtr, xte, ytr, yte = train_test_split(frame, target, test_size=0.2, shuffle=False)
    train_shape = xtr.shape
    train_gen = tuple_generator(xtr.to_numpy(), ytr.to_numpy(), timesteps)
    test_gen = tuple_generator(xte.to_numpy(), yte.to_numpy(), timesteps)
    test_shape = xte.shape
    return  train_gen, test_gen, train_shape, test_shape, weights


train_gen, test_gen, train_shape, test_shape, weights = build_generators("drive/MyDrive/Kraken Coins/XBTUSD_5_with_features.csv", timesteps)




{0.0: 0.493199999999962, 1.0: 0.5057999999999606, 2.0: 0.0010000000000000002}


In [None]:
class ClassSpecificAccuracy(tf.keras.metrics.Metric):
    def __init__(self, target_class=2, name="class_accuracy", **kwargs):
        name = f"{name}_{target_class}"
        super().__init__(name=name, **kwargs)
        self.target_class = target_class
        self.correct = self.add_weight(name="correct", initializer="zeros")
        self.total = self.add_weight(name="total", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, tf.int64)
        y_pred_labels = tf.argmax(y_pred, axis=1, output_type=tf.int64)

        mask = tf.equal(y_true, self.target_class)
        correct_preds = tf.reduce_sum(tf.cast(tf.logical_and(mask, tf.equal(y_pred_labels, y_true)), tf.float32))
        total_preds = tf.reduce_sum(tf.cast(mask, tf.float32))

        self.correct.assign_add(correct_preds)
        self.total.assign_add(total_preds)

    def result(self):
        return self.correct / (self.total + tf.keras.backend.epsilon())

    def reset_states(self):
        self.correct.assign(0)
        self.total.assign(0)

In [None]:
#Specify list of layers
layer_list = [
    Input(shape=(timesteps, train_shape[1])),
    Bidirectional(layers.LSTM(128, return_sequences=True)),
    Bidirectional(layers.LSTM(128, return_sequences=True)),
    Bidirectional(layers.LSTM(128, return_sequences=False)),
    layers.Dense(100, activation='relu'),
    layers.Dense(3, activation='softmax')

]

model = models.Sequential(layer_list)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy', ClassSpecificAccuracy(target_class=2), ClassSpecificAccuracy(target_class=1), ClassSpecificAccuracy(target_class=0)])

model.fit(train_gen, epochs=epochs, steps_per_epoch=math.ceil(train_shape[0] / batch_size), validation_data=test_gen, validation_steps= math.ceil(test_shape[0] / batch_size))

test_loss, test_acc = model.evaluate(test_gen, steps=math.ceil(test_shape[0] / batch_size))
print('Test accuracy:', test_acc)

NameError: name 'Input' is not defined

In [None]:
from google.colab import runtime
import time

time.sleep(15)
runtime.unassign()