In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from torch.utils.tensorboard import SummaryWriter

In [None]:
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')

In [None]:
torch.autograd.set_detect_anomaly(True)

In [None]:
data = pd.read_csv('./data/GDP.csv')
data.set_index('date',inplace=True)
base = data.loc[2000]
scaled_data = data/base
years = data.index
years = list(map(int,years))
inputs = torch.tensor(scaled_data.iloc[:-1].values, dtype=torch.float32, device=device)
labels = torch.tensor(scaled_data.iloc[1:].values, dtype=torch.float32, device=device)
train_sequence = sum([i>=1970 and i<=2000 for i in years])
test_sequence = sum([i>2000 for i in years])
train_data = inputs[:train_sequence]
train_label = labels[:train_sequence]
test_data = inputs[train_sequence:]
test_label = labels[train_sequence:]

# LSTMCell
<img src="./data/LSTM.png" width="600" height="200">
$$\mathrm{I}_t=\sigma(W_i \cdot x_t+U_i \cdot h_{t-1}+b_i) $$
$$\mathrm{F}_t=\sigma(W_f \cdot x_t+U_f \cdot h_{t-1}+b_f) $$
$$\mathrm{O}_t=\sigma(W_o \cdot x_t+U_o \cdot h_{t-1}+b_o) $$
$$\mathrm{\tilde{C}}_t=\mathrm{tanh}(W_c \cdot x_t+U_c \cdot h_{t-1}+b_c) $$
$$\mathrm{{C}}_t=\mathrm{{C}}_{t-1} \cdot \mathrm{F}_t + \mathrm{\tilde{C}}_t \cdot \mathrm{I}_t$$
$$\mathrm{{H}}_t=\mathrm{O}_t \cdot \mathrm{tanh}(\mathrm{{C}}_t)$$


In [None]:
class LSTMCell(nn.Module):
    def __init__(self,input_size,hidden_size,device):
        super().__init__()
        # input Matrix
        self.W_f = nn.Linear(in_features=input_size,out_features=hidden_size,bias=False)
        self.W_i = nn.Linear(in_features=input_size,out_features=hidden_size,bias=False)
        self.W_c = nn.Linear(in_features=input_size,out_features=hidden_size,bias=False)
        self.W_o = nn.Linear(in_features=input_size,out_features=hidden_size,bias=False)
        # hidden Matrix
        self.U_f = nn.Linear(in_features=hidden_size,out_features=hidden_size,bias=False)
        self.U_i = nn.Linear(in_features=hidden_size,out_features=hidden_size,bias=False)
        self.U_c = nn.Linear(in_features=hidden_size,out_features=hidden_size,bias=False)
        self.U_o = nn.Linear(in_features=hidden_size,out_features=hidden_size,bias=False)
        # bias
        self.b_f = nn.Parameter(data=torch.zeros(hidden_size,device=device))
        self.b_i = nn.Parameter(data=torch.zeros(hidden_size,device=device))
        self.b_c = nn.Parameter(data=torch.zeros(hidden_size,device=device))
        self.b_o = nn.Parameter(data=torch.zeros(hidden_size,device=device))
        # activation
        self.sigmf = nn.Sigmoid()
        self.sigmi = nn.Sigmoid()
        self.sigmo = nn.Sigmoid()
        self.tanh1 = nn.Tanh()
        self.tanh2 = nn.Tanh()
        
    def forward(self,x_t,h_p,c_p):
        gate_f = self.sigmf(self.W_f(x_t)+self.U_f(h_p)+self.b_f)
        gate_i = self.sigmi(self.W_i(x_t)+self.U_i(h_p)+self.b_i)
        tidl_c = self.tanh1(self.W_c(x_t)+self.U_c(h_p)+self.b_c)
        gate_o = self.sigmo(self.W_o(x_t)+self.U_o(h_p)+self.b_o)
        #print(gate_f,gate_i,gate_o,tidl_c)
         # Intermediate results for debugging
        gate_f = gate_f.detach()  # Ensure c_p is not modified during the computation
        tidl_c = tidl_c.detach()
        c = gate_f*c_p+gate_i*tidl_c
        gate_o = gate_o.detach()
        c = c.detach()
        h = gate_o*self.tanh2(c)
        
        return h,c

In [None]:
class LSTM(nn.Module):
    def __init__(self,input_size,hidden_size,device,num_layers=1):
        super().__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.device = device
        self.lstmforward = nn.ModuleList(LSTMCell(input_size=input_size if i==0 else hidden_size,
                                                  hidden_size=hidden_size,device=self.device)
                                         for i in range(self.num_layers))
        #self.lstmbackward = nn.ModuleList(LSTMCell(input_size=input_size if i==0 else hidden_size,hidden_size=hidden_size)
        #                                 for i in range(self.num_layers))
        
    def forward(self,x):
        batch_size, seq_len,_ = x.size()
        h_forward = torch.zeros(self.num_layers,batch_size,self.hidden_size,device=x.device)
        c_forward = torch.zeros(self.num_layers,batch_size,self.hidden_size,device=x.device)
        outputs_forward = []
        for t in range(seq_len):
            x_t = x[:,t,:]
            for i in range(self.num_layers):
                h_forward[i],c_forward[i] = self.lstmforward[i](x_t,h_forward[i],c_forward[i])
                x_t = h_forward[i]
            outputs_forward.append(h_forward[-1])
        out_forward = torch.stack(outputs_forward,dim=1) 
        # backward pass
        #h_backward = torch.zeros(self.num_layers,batch_size,hidden_size)
        #c_backward = torch.zeros(self.num_layers,batch_size,hidden_size)
        #outputs_backward = []
        #for t in range(seq_len):
        #    x_t = x[:,seq_len-t-1,:]
        #    for i in range(self.num_layers):
        #        h_backward[i],c_backward[i] = self.lstmbackward[i](x_t,h_backward[i],c_backward[i])
        #        x_t = h_backward[i]
        #    outputs_backward.append(h_backward[-1])
        #out_backward = torch.stack(outputs_backward[::-1],dim=1)    
        #biout = torch.cat((out_forward,out_backward),dim=-1)
            
        return out_forward,(h_forward[-1],c_forward[-1])

# Example usage
input_size = 10   # Number of input features
hidden_size = 20  # Number of LSTM hidden units
num_layers = 1    # Number of LSTM layers

model = LSTM(input_size, hidden_size, num_layers)

# Dummy input (batch_size=5, seq_length=7, input_size=10)
inputs = torch.randn(5, 7, input_size)

# Forward pass
outputs, (hn, cn) = model(inputs)

print(outputs.size())  # Should be (5, 7, hidden_size)


# GRUCell
<img src="./data/GRU.png" width="600" height="200">
$$\mathrm{R}_t=\sigma(W_r \cdot x_t+U_r \cdot h_{t-1}+b_r) $$
$$\mathrm{Z}_t=\sigma(W_z \cdot x_t+U_z \cdot h_{t-1}+b_z) $$
$$\mathrm{\tilde{H}}_t=\mathrm{tanh}(W_h \cdot x_t+U_h \cdot (h_{t-1} \cdot \mathrm{R}_t)+b_h) $$
$$\mathrm{{H}}_t=h_{t-1} \cdot \mathrm{Z}_t + \mathrm{\tilde{H}}_t \cdot (1-\mathrm{Z}_t) $$

In [None]:
class GRUCell(nn.Module):
    def __init__(self,input_size,hidden_size,device):
        super().__init__()
        # input Matrix
        self.W_r = nn.Linear(in_features=input_size,out_features=hidden_size,bias=False)
        self.W_z = nn.Linear(in_features=input_size,out_features=hidden_size,bias=False)
        self.W_h = nn.Linear(in_features=input_size,out_features=hidden_size,bias=False)
        # hidden Matrix
        self.U_r = nn.Linear(in_features=hidden_size,out_features=hidden_size,bias=False)
        self.U_z = nn.Linear(in_features=hidden_size,out_features=hidden_size,bias=False)
        self.U_h = nn.Linear(in_features=hidden_size,out_features=hidden_size,bias=False)
        # bias
        self.b_r = nn.Parameter(data=torch.zeros(hidden_size,device=device))
        self.b_z = nn.Parameter(data=torch.zeros(hidden_size,device=device))
        self.b_h = nn.Parameter(data=torch.zeros(hidden_size,device=device))
        # activation
        self.sigmr = nn.Sigmoid()
        self.sigmz = nn.Sigmoid()
        self.tanhh = nn.Tanh()
        
    def forward(self,x_t,h_p):
        gate_r = self.sigmr(self.W_r(x_t)+self.U_r(h_p)+self.b_r)
        gate_z = self.sigmz(self.W_z(x_t)+self.U_z(h_p)+self.b_z)
        gate_r = gate_r.detach()
        tidl_h = self.tanhh(self.W_h(x_t)+self.U_h(h_p*gate_r)+self.b_h)
        gate_z = gate_z.detach()
        h = h_p*gate_z+tidl_h*(1-gate_z)
        
        return h

In [None]:
class GRU(nn.Module):
    def __init__(self,input_size,hidden_size,device,num_layers=1):
        super().__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.device = device
        self.gruforward = nn.ModuleList(GRUCell(input_size=input_size if i==0 else hidden_size,
                                                  hidden_size=hidden_size,device=self.device)
                                         for i in range(self.num_layers))
        #self.grubackward = nn.ModuleList(GRUCell(input_size=input_size if i==0 else hidden_size,hidden_size=hidden_size)
        #                                 for i in range(self.num_layers))
        
    def forward(self,x):
        batch_size, seq_len,_ = x.size()
        h_forward = torch.zeros(self.num_layers,batch_size,self.hidden_size,device=x.device)
        c_forward = torch.zeros(self.num_layers,batch_size,self.hidden_size,device=x.device)
        outputs_forward = []
        for t in range(seq_len):
            x_t = x[:,t,:]
            for i in range(self.num_layers):
                h_forward[i] = self.gruforward[i](x_t,h_forward[i])
                x_t = h_forward[i]
            outputs_forward.append(h_forward[-1])
        out_forward = torch.stack(outputs_forward,dim=1) 
        # backward pass
        #h_backward = torch.zeros(self.num_layers,batch_size,hidden_size)
        #c_backward = torch.zeros(self.num_layers,batch_size,hidden_size)
        #outputs_backward = []
        #for t in range(seq_len):
        #    x_t = x[:,seq_len-t-1,:]
        #    for i in range(self.num_layers):
        #        h_backward[i],c_backward[i] = self.grubackward[i](x_t,h_backward[i],c_backward[i])
        #        x_t = h_backward[i]
        #    outputs_backward.append(h_backward[-1])
        #out_backward = torch.stack(outputs_backward[::-1],dim=1)    
        #biout = torch.cat((out_forward,out_backward),dim=-1)
            
        return out_forward

# Example usage
input_size = 10   # Number of input features
hidden_size = 20  # Number of GRU hidden units
num_layers = 1    # Number of GRU layers

model = GRU(input_size, hidden_size, device,num_layers).to(device)

# Dummy input (batch_size=5, seq_length=7, input_size=10)
inputs = torch.randn(5, 7, input_size,device=device)

# Forward pass
outputs = model(inputs)

print(outputs.size())  # Should be (5, 7, hidden_size)


In [None]:
class Net(nn.Module):
    def __init__(self,input_size,hidden_size):
        super(Net,self).__init__()
        self.rnn = LSTM(input_size=input_size,hidden_size=hidden_size,device=device)
        self.fc = nn.Linear(hidden_size,1)
    def forward(self,X):
        X = X[:,:,None]
        X, _ = self.rnn(X)
        X = self.fc(X)
        X = X[:,:,0]
        return X

In [None]:
model = Net(input_size=1,hidden_size=5)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
model.to(device)
criterion.to(device)

In [None]:
steps = 50000
#writer = SummaryWriter(log_dir='./log')
#scheduler = optim.lr_scheduler.StepLR(optimizer,step_size=1000,gamma=0.1)
for step in range(steps):
    # train
    train_data, train_label = train_data.to(device), train_label.to(device)
    optimizer.zero_grad()
    train_output = model(train_data)
    train_loss = criterion(train_output,train_label)
    train_loss.backward()
    optimizer.step()
    for para_group in optimizer.param_groups:
        current_lr = para_group['lr']
    if (step+1)%1000==0:
        print(f'{step+1}/{steps} lr={current_lr} train_loss={train_loss.item()}')
        #writer.add_scalar('Loss/train',train_loss.item(),step+1)
    #scheduler.step()
    
    # eval
    if (step+1)%10000==0:
        model.eval()
        with torch.no_grad():
            test_data, test_label = test_data.to(device), test_label.to(device)
            test_output = model(test_data)
            test_loss = criterion(test_output,test_label)
            print(f'{step+1}/{steps} test_loss={test_loss.item()}')
            #writer.add_scalar('Loss/test',test_loss.item(),step+1)