In [4]:
# PyThorch - To create the neural network
# matplotlib & seaborn to drow graphs

In [11]:
import torch # Used to crate tensor to storer all of the numerical values including the raw data
import torch.nn as nn # Used to make the weights and bias tensor part of the neural network
import torch.nn.functional as F # this gives us the activation functions
#from torch.optim import Adan # Then we import Adan to fit the neural network to the data


# A is lot like SGD but not quite as stochastic. As a result, Adan tends to find the optimal values faster than SGD.
# Adan é um implementação do Gradiente Descent

In [12]:
import lightning as L
from torch.utils.data import TensorDataset, DataLoader ## to handler large datasets

In [13]:
import matplotlib.pyplot as plt
import seaborn as sns # (sns Samual Norman SEABORN)

In [10]:
# create our NN class to herdar da L.LightningModule)
class LSTMbyHand(L.LightningModule):
    def __init__(self): # initialization methods for the class
        super().__init__() # call the initialization methods for the parent nn.Module class

        mean = torch.tensor(0.0)
        std  = torch.tensor(1.0)
        
        # Initialize the weights and biases in our neural network
        self.wlr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=False)
        self.wlr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=False)
        self.blr1 = nn.Parameter(torch.tensor(0.), requires_grad=False)

        self.wpr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=False)
        self.wpr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=False)
        self.bpr1 = nn.Parameter(torch.tensor(0.), requires_grad=False)

        self.wp1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=False)
        self.wp2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=False)
        self.bp1 = nn.Parameter(torch.tensor(0.), requires_grad=False)

        self.wo1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=False)
        self.wo2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=False)
        self.bo1 = nn.Parameter(torch.tensor(0.), requires_grad=False)
        

    def lstm_unit(self, input_value, long_memory, short_memory):
        # Do the LSTM Math

        # compute the long term to remember percentage
        longrm = ((short_memory*self.wlr1)+(input_value*self.wlr2) + self.blr1)

        # run that sum throught a sigmoid activation function
        long_remember_precent = torch.sigmoid(longrm)

        # compute the percentual of Memory to remember
        pottential_rp = ((short_memory * self.wpr1) + (input_value * self.wpr2) + self.bpr1)
        potential_remember_percent = torch.sigmoid(pottential_rp)

        # compute the potential long-term memory
        potential_memory = torch.tanh(pottential_rp)

        updated_long_memory = ((long_memory * long_remember_precent) + (potential_remember_percent * potential_memory))

        output_percent = torch.sigmoid((short_memory * self.wo1) + (input_value * self.wo1) + self.bo1)
        updated_short_memory = torch.tanh(updated_long_memory) * output_percent
        
        return([updated_long_memory, updated_short_memory])

    def forward(self, input):
        long_memory = 0
        short_memory = 0 
        day1 = input[0]
        day2 = input[1]
        day3 = input[2]
        day4 = input[3]
        
        long_memory, short_memory = self.lstm_unit(day1, long_memory, short_memory)
        long_memory, short_memory = self.lstm_unit(day2, long_memory, short_memory)
        long_memory, short_memory = self.lstm_unit(day3, long_memory, short_memory)
        long_memory, short_memory = self.lstm_unit(day4, long_memory, short_memory)
        
        return short_memory

    def configure_optimizers(self):
        # Configure the optimizer like ADAN, SGD
        return Adan(self.parameters())

    def training_step(self, batch, batch_idx):
        # Calculate loss and log training progress.
        input_i, label_i = batch
        output_i = self.forward(input_i[0])
        loss = (output_i - label_i)**2
        
        self.log("train_loss", loss)
        # also logs the prediction
        if (label_i == 0):
            self.log("out_0", output_i)
        else:
            self.log("out_1", output_i)
        
        return loss

In [9]:
model = LSTMbyHand()

In [10]:
print("\nNow let's compare the observed and predicted values ... ")
print("\nCompany A: Observed = 0, Predicted = ", model(torch.tensor([0., 0.5, 0.25, 1.])).detach())


Now let's compare the observed and predicted values ... 

Company A: Observed = 0, Predicted =  tensor(0.4408)


In [11]:
print("\nCompany B: Observed = 1, Predicted = ", model(torch.tensor([1., 0.5, 0.25, 1.])).detach())


Company B: Observed = 1, Predicted =  tensor(0.4800)


In [15]:
# train the model
inputs = torch.tensor([0., 0.5, 1.])
labels = torch.tensor([0., 1.0, 0.])

dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)

In [None]:
#  accelerator : The accelerator to use for training. Can be one of "cpu", "gpu", "tpu", "ipu", "auto".
trainer = L.Trainer(max_epochs=2000)
trainer.fit(model, train_dataloaders=dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [None]:
#  accelerator : The accelerator to use for training. Can be one of "cpu", "gpu", "tpu", "ipu", "auto".
trainer = L.Trainer(max_epochs=3000)
trainer.fit(model, train_dataloaders=dataloader)

In [14]:
# create our NN class to herdar da L.LightningModule)
class LightningLSTM(L.LightningModule):
    def __init__(self): # initialization methods for the class
        super().__init__() # call the initialization methods for the parent nn.Module class

        self.lstm = nn.LSTM(input_size=1, hidden_size=1)

    def forward(self, input):
        # transpose the input
        input_trans = input.view(len(input),1)
        lstm_out, temp = self.lstm(input_trans)
        prediction = lstm_out[-1]
        return prediction

    def configure_optimizers(self):
        # Configure the optimizer like ADAN, SGD
        return Adan(self.parameters(), lr=0.1)

    def training_step(self, batch, batch_idx):
        # Calculate loss and log training progress.
        input_i, label_i = batch
        output_i = self.forward(input_i[0])
        loss = (output_i - label_i)**2

        self.log("train_loss", loss)
        # also logs the prediction
        if (label_i == 0):
            self.log("out_0", output_i)
        else:
            self.log("out_1", output_i)
            
        return loss

In [None]:
model = LightningLSTM()
trainer = L.Trainer(max_epochs=300, log_every_n_steps=2)
trainer.fit(model, train_dataloaders=dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [None]:
print("\nNow let's compare the observed and predicted values ... ")
print("\nCompany A: Observed = 0, Predicted = ", model(torch.tensor([0., 0.5, 0.25, 1.])).detach())
print("\nCompany B: Observed = 1, Predicted = ", model(torch.tensor([1., 0.5, 0.25, 1.])).detach())