<a href="https://colab.research.google.com/github/rbadi76/SimulatorWithONNX2/blob/master/DL_assignment_LO_RB3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Group members:**

- Luke O'Brien
- Róbert Badí Baldursson

In [None]:
# imports

from google.colab import drive
from os.path import join
import numpy as np
import torch
import torch.nn as nn
import pandas as pd
from sklearn.model_selection import train_test_split

ANN to predict day-to-day stock movements for 3 companies based on stock prices, market analysis, market segments of the companies, and info given to us by UpUpUp Inc.

In [None]:
# set working directory

ROOT='/content/drive'
drive.mount(ROOT)
PROJ='My Drive/Colab Notebooks/Deep Learning/UpAssignment'
PROJ_PATH=join(ROOT,PROJ)

!rsync -aP "{PROJ_PATH}"/* ./

In [None]:
# read in data from files

with open('data/stock_prices.txt') as f:
  stock_prices_df = pd.read_csv(f, delimiter=r"\s+", names=['company', 'year', 'day', 'quarter', 'stock_price'])

with open('data/market_analysis.txt') as f:
  market_analysis_df = pd.read_csv(f, delimiter=r"\s+", names=['segment', 'year', 'quarter', 'trend'])

with open('data/market_segments.txt') as f:
  market_segments_df = pd.read_csv(f, delimiter=r"\s+", names=['company', 'segment'])

with open('data/info.txt') as f:
  info_df = pd.read_csv(f, delimiter=r"\s+", names=['company', 'year', 'day', 'quarter', 'expert1_prediction', 'expert2_prediction', 'sentiment_analysis', 'm1', 'm2', 'm3', 'm4'])

In [None]:
# join data files

full_data = pd.merge(stock_prices_df, info_df, on=['company', 'year', 'day', 'quarter'])
full_data = pd.merge(full_data, market_segments_df, on=['company'])
full_data = pd.merge(full_data, market_analysis_df, on=['segment', 'year', 'quarter'])

print(full_data)

In [None]:
# normalise relevant columns

def normalise(dataset):
    dataNorm=((dataset-dataset.min())/(dataset.max()-dataset.min()))
    return dataNorm

extr_m1 = [full_data[['m1']].max(), full_data[['m1']].min()]
extr_m2 = [full_data[['m2']].max(), full_data[['m2']].min()]

full_data[['m1', 'm2']]=normalise(full_data[['m1', 'm2']])

print(extr_m1, extr_m2)

In [None]:
c1_df = full_data

pd.set_option('display.max_columns', None)

# create y vector by getting diff with yesterday's stock price and a boolean if the diff > 0
stock_diffs_c1 = c1_df['stock_price'].diff().fillna(c1_df['stock_price'].iloc[0]-100) ### update this to show first cell - 100 instead of 0, better use iloc, otherwise company 1 and 2 will fail

c1_df.insert(14, "stock_diff", stock_diffs_c1, True)

c1_df['stock_diff_bool'] = np.where(c1_df['stock_diff'] > 0, True, False)

# split dataframe into test and training sets
train_c1, test_c1 = train_test_split(c1_df, test_size=0.2, random_state=40, shuffle=True)

# Convert dataframes to tensor input formats

BATCH_SIZE = 25 # was 50

t_train_X = torch.Tensor(train_c1[['expert1_prediction', 'expert2_prediction', 'sentiment_analysis', 'trend', 'm1', 'm2', 'm3', 'm4']].values)
t_train_y = torch.Tensor(train_c1['stock_diff_bool'].values)

t_test_X = torch.Tensor(test_c1[['expert1_prediction', 'expert2_prediction', 'sentiment_analysis', 'trend', 'm1', 'm2', 'm3', 'm4']].values)
t_test_y = torch.Tensor(test_c1['stock_diff_bool'].values)

train_dataset = torch.utils.data.TensorDataset(t_train_X, t_train_y)
test_dataset = torch.utils.data.TensorDataset(t_test_X, t_test_y)

training_set = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_set = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
# Create our ANN (artifical neural network).
# inherit from base class (Module) and then overwrite what you need to, we'll almost always be doing it this way
class MyANN(nn.Module):
  def __init__(self):
    super().__init__()
    # simple linear layer
    self.fc1 = nn.Linear(8, 32)
    # input to next layer matches size of output from previous layer
    # fc means "fully connected"
    self.av1 = nn.ReLU()
    self.fc2 = nn.Linear(32, 32)
    self.av2 = nn.ReLU()
    self.fc3 = nn.Linear(32, 32)
    self.av3 = nn.ReLU()
    self.fc4 = nn.Linear(32, 32)
    self.av4 = nn.ReLU()
    self.fc5 = nn.Linear(32, 1)
    return

# forward pass - how you send your data through the network, first-layer, then 2nd, then 3rd. Passes out the tensor we are going to use.
  def forward(self, x):
    x = self.fc1(x)
    x = self.av1(x)
    x = self.fc2(x)
    x = self.av2(x)
    x = self.fc3(x)
    x = self.av3(x)
    x = self.fc4(x)
    x = self.av4(x)
    x = self.fc5(x)
    return torch.sigmoid(x)

net = MyANN()
print(net) # just printing out structure of the network

In [None]:
# Train the network.

#optimizer = torch.optim.Adam(net.parameters(), lr=0.01) # "if you don't know what to do, use Adam optimiser" adapts the learning rate
optimizer = torch.optim.RMSprop(net.parameters(), lr=0.01, alpha=0.95)
criterion = nn.BCELoss()
for epocs in range(70):
  for data in training_set:
    X, y = data
    net.zero_grad() # gradients in network must be "zero-ed out".
    output = net(X)
    y = torch.Tensor([[el] for el in y])
    loss = criterion(output, y)# Computing loss with Binary cross-entropy
    loss.backward() # compute gradients for Back-propagation
    optimizer.step() # update the weights (based on calculated gradients)
  print(loss)

In [None]:
# Evaluate training
total = 0
correct = 0

net.eval()
with torch.no_grad():
  for data in training_set:
    X, y = data
    output = net(X)
    for idx, val in enumerate(output):
      if (torch.round(val) == y[idx]):
        correct+=1
      total+=1

print('Accuracy: ', round(correct/total, 100))
print('Total:', total)

Accuracy:  0.9167776298268975
Total: 1502


In [None]:
# Evaluate test
total = 0
correct = 0
net.eval()
with torch.no_grad():
  for data in test_set:
    X, y = data
    output = net(X)
    for idx, val in enumerate(output):
      if (torch.round(val) == y[idx]):
        correct += 1
      total += 1

print('Accuracy: ', round(correct/total, 100))
print('Total:', total)

Accuracy:  0.875
Total: 376


In [None]:
# Specify a path
PATH = "state_dict_model.pt"

# Save
torch.save(net.state_dict(), PATH)

# Load
torch_model = MyANN()
torch_model.load_state_dict(torch.load(PATH))
torch_model.eval()

MyANN(
  (fc1): Linear(in_features=8, out_features=32, bias=True)
  (av1): ReLU()
  (fc2): Linear(in_features=32, out_features=32, bias=True)
  (av2): ReLU()
  (fc3): Linear(in_features=32, out_features=32, bias=True)
  (av3): ReLU()
  (fc4): Linear(in_features=32, out_features=32, bias=True)
  (av4): ReLU()
  (fc5): Linear(in_features=32, out_features=1, bias=True)
)

In [None]:
## Example evaluating with saved state_dict

total = 0
correct = 0
torch_model.eval()
with torch.no_grad():
  for data in test_set:
    X, y = data
    output = torch_model(X)
    for idx, val in enumerate(output):
      if (torch.round(val) == y[idx]):
        correct += 1
      total += 1
print('Accuracy: ', round(correct/total, 100))
print('Total:', total)

In [None]:
for data in training_set:
  X, y = data
  x = X[0:25]
  break
torch_out = torch_model(x)

# Export the model
torch.onnx.export(torch_model,               # model being run
                  x,                         # model input (or a tuple for multiple inputs)
                  "upupup.onnx",   # where to save the model (can be a file or file-like object)
                  export_params=True,        # store the trained parameter weights inside the model file
                  opset_version=10,          # the ONNX version to export the model to
                  do_constant_folding=True,  # whether to execute constant folding for optimization
                  input_names = ['input'],   # the model's input names
                  output_names = ['output'], # the model's output names
                  dynamic_axes={'input' : {0 : 'batch_size'},    # variable lenght axes
                                'output' : {0 : 'batch_size'}})

In [None]:
!pip install onnx
import onnx

onnx_model = onnx.load("upupup.onnx")
onnx.checker.check_model(onnx_model)

In [None]:
!pip install onnxruntime
import onnxruntime

ort_session = onnxruntime.InferenceSession("upupup.onnx")

def to_numpy(tensor):
    return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()

# compute ONNX Runtime output prediction
ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(x)}
ort_outs = ort_session.run(None, ort_inputs)

# compare ONNX Runtime and PyTorch results
np.testing.assert_allclose(to_numpy(torch_out), ort_outs[0], rtol=1e-03, atol=1e-05)

print("Exported model has been tested with ONNXRuntime, and the result looks good!")

In [None]:
input_name = ort_session.get_inputs()[0].name
print("input name", input_name)
input_shape = ort_session.get_inputs()[0].shape
print("input shape", input_shape)
input_type = ort_session.get_inputs()[0].type
print("input type", input_type)

input name input
input shape ['batch_size', 8]
input type tensor(float)


In [None]:
output_name = ort_session.get_outputs()[0].name
print("output name", output_name)
output_shape = ort_session.get_outputs()[0].shape
print("output shape", output_shape)
output_type = ort_session.get_outputs()[0].type
print("output type", output_type)

output name output
output shape ['batch_size', 1]
output type tensor(float)


In [None]:
import numpy.random
x = numpy.random.random((25,8))
x = x.astype(numpy.float32)
res = ort_session.run([output_name], {input_name: x})
print(res)

In [None]:
traced_script_module = torch.jit.trace(torch_model, torch.Tensor(x))
traced_script_module.save("torchscript.pt")