In [1]:
!pip install lightning



In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import matplotlib.pyplot as plt
import seaborn as sns
import lightning as L
from torch.utils.data import TensorDataset, DataLoader


In [3]:
import pandas as pd
import numpy as np

df = pd.read_csv('iaq_middle5.csv')
column_name = 'Humidnity'

data_array = df[column_name].values
obs=data_array[:-1]
scaled_obs = obs * 0.001


In [4]:
#Data set declaration 
A=1.0
B=2.0
C=3.0
D=4.0
pred_E=5.0

Z=0.06
G=0.07
H=0.08
I=0.09
pred_J=0.10

In [5]:
class LSTMbyHand(L.LightningModule):
    def __init__(self):
        super().__init__()
        mean=torch.tensor(0.0)
        std=torch.tensor(1.0)
        
        self.wlr1=nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wlr2=nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.blr1=nn.Parameter(torch.tensor(0.0),requires_grad=True)
        self.wpr1=nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wpr2=nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bpr1=nn.Parameter(torch.tensor(0.0),requires_grad=True)
        self.wp1=nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wp2=nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bp1=nn.Parameter(torch.tensor(0.0),requires_grad=True)
        self.wo1=nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wo2=nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bo1=nn.Parameter(torch.tensor(0.0),requires_grad=True)
        
    def lstm_unit(self,input_value,long_memory,short_memory):
        long_remember_percent=torch.sigmoid((short_memory*self.wlr1)+(input_value*self.wlr2)+self.blr1)
        potential_remember_memory=torch.sigmoid((short_memory*self.wpr1)+(input_value*self.wpr2)+self.bpr1)
        potential_memory=torch.tanh((short_memory*self.wp1)+(input_value*self.wp2)+self.bp1)
        updated_long_memory=((long_memory*long_remember_percent)+(potential_remember_memory*potential_memory))
        output_percent=torch.sigmoid((short_memory*self.wo1)+(input_value*self.wo2)+self.bo1)
        updated_short_memory=torch.tanh(updated_long_memory)*output_percent
        return([updated_long_memory,updated_short_memory])
    
    def forward(self,input):
        long_memory=0
        short_memory=0
        
        indices = list(range(0, len(scaled_obs)))
        
        for index in indices:
            element = input[index]  # Access the element using tensor indexing
            long_memory, short_memory = self.lstm_unit(element, long_memory, short_memory)
        
        return short_memory
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
        return optimizer
    
    def training_step(self,batch,batch_idx):
        input_i,label_i=batch
        output_i=self.forward(input_i[0])
        loss=(output_i-label_i)**2
        self.log("train_loss",loss)
        
        if (label_i==0):
            self.log("output_0",output_i)
        
        else:
            self.log("output_1",output_i)
        return loss
    

In [6]:

# Suppose you have a list of NumPy arrays named numpy_array_list
numpy_array_list = [np.array([1, 2, 3]), np.array([4, 5, 6]), np.array([7, 8, 9])]

# Convert the list of NumPy arrays into a single NumPy array
combined_numpy_array = np.concatenate(numpy_array_list)

# Convert the combined NumPy array into a PyTorch tensor
tensor = torch.tensor(combined_numpy_array)

# Now, 'tensor' contains the data in a PyTorch tensor

In [7]:
model=LSTMbyHand()
print("Company A, observed=0,predicted:",model(torch.tensor(scaled_obs).detach()))
print("Company B, observed=0,predicted:",model(torch.tensor(scaled_obs).detach()))

Company A, observed=0,predicted: tensor(0.0257, dtype=torch.float64, grad_fn=<MulBackward0>)
Company B, observed=0,predicted: tensor(0.0257, dtype=torch.float64, grad_fn=<MulBackward0>)


In [8]:
inputs=torch.tensor([scaled_obs])
#[A,B,C,D],[Z,G,H,I]
labels=torch.tensor([data_array[-1]*0.001])
#pred_E,pred_J

dataset=TensorDataset(inputs,labels)
dataloader=DataLoader(dataset)

  inputs=torch.tensor([scaled_obs])


In [None]:
trainer=L.Trainer(max_epochs=500)
trainer.fit(model,train_dataloaders=dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name         | Type | Params
--------------------------------------
  | other params | n/a  | 12    
--------------------------------------
12        Trainable params
0         Non-trainable params
12        Total params
0.000     Total estimated model params size (MB)
C:\Users\USER-PC\anaconda3\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.
C:\Users\USER-PC\anaconda3\lib\site-packages\lightning\pytorch\loops\fit_loop.py:293: The number of training batches (1) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training e

Training: |          | 0/? [00:00<?, ?it/s]

In [None]:
path_to_best_checkpoint=trainer.checkpoint_callback.best_model_path
trainer=L.Trainer(max_epochs=510)
trainer.fit(model,train_dataloaders=dataloader,ckpt_path=path_to_best_checkpoint)

In [None]:
prediction1=model(torch.tensor(scaled_obs).detach())

print("Company A, observed=0,predicted:",prediction1*1000)
print("Company B, observed=0,predicted:",prediction1*1000)