In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F


#### Data Prep (Fibonacci)

In [472]:
import random

def generate_fib_seq(n, seq_len=4):
    fib_seq = [1, 1]
    for _ in range(100):
        fib_seq.append(fib_seq[-1] + fib_seq[-2])
    dataset = []
    for _ in range(n):
        i = random.randint(2, 99)
        seq = [fib_seq[i-1], fib_seq[i]]
        for _ in range(seq_len - 2):
            seq.append(seq[-1] + seq[-2])
        dataset.append(seq)
    return dataset

In [473]:
dataset = np.array(generate_fib_seq(30)).astype(float)
training_set = dataset[:20]
val_set = dataset[20:25]
test_set = dataset[25:]

In [474]:
from sklearn.preprocessing import MinMaxScaler

training_set_log_scaled = np.log(training_set)
scaler = MinMaxScaler()
training_set_processed = scaler.fit_transform(training_set_log_scaled)

In [292]:
from torch.utils.data import TensorDataset, DataLoader

training_tensor = torch.tensor(training_set_log_scaled).float()
x_train, y_train = training_tensor.split(3, dim=1)

dataset = TensorDataset(x_train.unsqueeze(2), y_train) # train: N, 3, 1, test: N, 1
batch_size = 4
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [184]:
class BasicRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, 1)
    
    def forward(self, x):
        # print(f'input shape {x.shape}')
        _, x = self.rnn.forward(x)
        # output, h_t = _, x
        # print(f'output shape {output.shape}, h_t shape {h_t.shape}')
        x = self.linear.forward(x)
        return x

In [298]:
from tqdm import tqdm

model = BasicRNN(1, 30)
optimizer = torch.optim.Adam(model.parameters())
epoch = 50
for _ in tqdm(range(epoch)):
    for x_batch, y_batch in dataloader:
        optimizer.zero_grad()
        y_pred = model.forward(x_batch)
        loss = F.mse_loss(y_pred, y_batch)
        loss.backward()
        optimizer.step()

  loss = F.mse_loss(y_pred, y_batch)
100%|██████████| 50/50 [00:00<00:00, 211.52it/s]


In [192]:
with torch.no_grad():
    y_pred = model.forward(torch.log(torch.tensor([89, 144, 233]).unsqueeze(1).float()))
    print(np.exp(y_pred.numpy()))

[[360.75494]]


#### Eval

In [193]:
test_set_log_scaled = np.log(test_set)
test_tensor = torch.tensor(test_set_log_scaled).float()
x_test, y_test = test_tensor.split(3, dim=1)
x_test = x_test.unsqueeze(2)

In [194]:
x_test.shape

torch.Size([10, 3, 1])

In [195]:
model.eval()    
with torch.no_grad():
    y_pred = model(x_test)

y_pred_np = y_pred.squeeze(0).numpy()
y_test_np = y_test.numpy()

from sklearn.metrics import mean_squared_error

mse = mean_squared_error(np.exp(y_test_np), np.exp(y_pred_np))
print(mse)

4.823124e+33


#### Attention

In [441]:
'''
attn_score = a(s_t-1, h_i) where a is some function

weights_t,i = softmax(e_t,i)

context_vector_t =  sum(weights*h_i)
'''
debug_attention = None
debug_encoder_outputs = None
debug_h_t = None

class AttentionLayer(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.W1 = nn.Linear(hidden_size, hidden_size, bias=False)
        self.W2 = nn.Linear(hidden_size, hidden_size, bias=False)
        self.V = nn.Linear(hidden_size, 1, bias=False)

    def forward(self, encoder_outputs, h_t):
        global debug_attention
        # encoder_output: (N, L, H), h_t: (N, 1, H)
        e = F.relu(self.W1(encoder_outputs) + self.W2(h_t)) # (N, L, H) + (N, 1, H)|broadcast
        score = self.V(e) # V.shape (N, L, 1)
        debug_attention = F.softmax(score, dim=1)
        return debug_attention
        # return F.softmax(attention, dim=1)

class DotProductAttentionLayer(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, encoder_outputs, h_t):
        # encoder_outputs (N, L, H)
        # h_t (N, 1, H)
        score = torch.bmm(encoder_outputs, h_t.transpose(1, 2)) # (N, L, 1)
        a = F.softmax(score, dim=1)
        global debug_attention, debug_encoder_outputs, debug_h_t
        debug_attention = a
        debug_encoder_outputs = encoder_outputs
        debug_h_t = h_t
        return a

class BasicRNNWithAttention(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.attention = DotProductAttentionLayer()
        self.linear = nn.Linear(hidden_size, 1)
    
    def forward(self, x):
        outputs, h_t = self.rnn.forward(x) 
        # attention is applied post RNN. it does not fully leverage the potential of integrating attention directly into the RNN computation
        # calculating attention at each timestep requires custom handling of timesteps in x_batch[i]
        h_t = h_t.transpose(0, 1)
        attention_weights = self.attention.forward(outputs, h_t) # (N, L, 1)
        # attn_w (N, L, 1)   x (N, L, H)
        context_vector = torch.bmm(attention_weights.transpose(1, 2), outputs) # N, 1, H
        context_vector = context_vector.squeeze(1)
        x = self.linear.forward(context_vector)
        return x

#### Data Prep

In [500]:
training_tensor = torch.tensor(np.log(training_set)).float()
x_train, y_train = training_tensor.split(3, dim=1)

dataset = TensorDataset(x_train.unsqueeze(2), y_train)
batch_size = 4
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

x_val, y_val = torch.tensor(np.log(val_set)).float().split(3, dim=1)
x_val = x_val.unsqueeze(2)

x_test, y_test = torch.tensor(np.log(test_set)).float().split(3, dim=1)
x_test = x_test.unsqueeze(2)


In [495]:
training_tensor = torch.tensor(training_set).float()
x_train, y_train = training_tensor.split(3, dim=1)

dataset = TensorDataset(x_train.unsqueeze(2), y_train)
batch_size = 4
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

x_val, y_val = torch.tensor(val_set).float().split(3, dim=1)
x_val = x_val.unsqueeze(2)

x_test, y_test = torch.tensor(test_set).float().split(3, dim=1)
x_test = x_test.unsqueeze(2)

In [503]:
model = BasicRNNWithAttention(1, 7)
optimizer = torch.optim.Adam(model.parameters())
epoch = 1000
best_val_loss = float('inf')

def compute_log_val_loss(model, x_val, y_val):
    with torch.no_grad():
        y_pred = model.forward(x_val)
        loss = F.mse_loss(torch.log(y_pred), torch.log(y_val))
        return loss.item()

for e in tqdm(range(epoch)):
    for x_batch, y_batch in dataloader:
        optimizer.zero_grad()
        y_pred = model.forward(x_batch)
        loss = F.mse_loss(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        
    val_loss = compute_log_val_loss(model, x_val, y_val)
    # print(val_loss)
    if val_loss < best_val_loss:
        best_val_loss = val_loss
    else:
        print(f'early stopping at epoch {e}')
        break

100%|██████████| 1000/1000 [00:06<00:00, 163.13it/s]


In [504]:
seq = [89, 144, 233]
x_input = torch.tensor(seq).reshape(1, 3, 1).float()

with torch.no_grad():
    y_pred = model.forward(torch.log(x_input))
    print(np.exp(y_pred.numpy()))

[[468.68658]]


#### eval

In [505]:
model.eval()    
with torch.no_grad():
    y_pred = model(x_test)

y_pred_np = y_pred.squeeze(0).numpy()
y_test_np = y_test.numpy()

from sklearn.metrics import mean_squared_error

mse = mean_squared_error(y_test_np, y_pred_np)
print(mse)

84.10379


* RNN with attention for predicting fibonacci numbers notes
  * Given the large variance in scale, from 1 to billions, numerical stability becomes a problem
    * log scaling the input
    * gradient clipping
  * recurring activations push hidden layers towards 1 or -1, even after log scaled. Not much is learned.
  * low numbers of samples

In [465]:
model.state_dict()

OrderedDict([('rnn.weight_ih_l0',
              tensor([[ 0.0253],
                      [ 0.0096],
                      [-0.3131],
                      [-0.0291],
                      [-0.0164],
                      [-0.0147],
                      [ 0.2367]])),
             ('rnn.weight_hh_l0',
              tensor([[ 0.2606,  0.3990, -0.0763, -0.2116, -0.0966,  0.4863,  0.5038],
                      [ 0.1999,  0.1977, -0.5518, -0.3686,  0.1758, -0.1812,  0.0813],
                      [ 0.3376, -0.3192, -0.1182,  0.1830,  0.0291,  0.2075, -0.0565],
                      [ 0.3539, -0.3627,  0.0881,  0.0058, -0.3877, -0.0663, -0.5361],
                      [-0.4692, -0.1756,  0.5350,  0.0281, -0.3063, -0.0777, -0.3484],
                      [-0.1327, -0.3040,  0.2115,  0.4709, -0.1651, -0.3135, -0.0269],
                      [ 0.1180,  0.3517, -0.1059,  0.0586,  0.1548,  0.1565,  0.1477]])),
             ('rnn.bias_ih_l0',
              tensor([ 0.3482,  0.5256, -0.0538, -0.56

In [510]:
x_test

tensor([[[ 6.4135],
         [ 6.8947],
         [ 7.3759]],

        [[29.0304],
         [29.5116],
         [29.9928]],

        [[ 1.0986],
         [ 1.6094],
         [ 2.0794]],

        [[45.3916],
         [45.8728],
         [46.3540]],

        [[25.1807],
         [25.6619],
         [26.1431]]])

In [509]:
debug_encoder_outputs

tensor([[[ 0.9385,  0.7197,  0.9772,  0.9476, -0.9605,  0.9699, -0.5150],
         [ 0.9717, -0.9840,  0.7426,  0.0812, -0.6823,  0.9971,  0.9998],
         [ 0.7367, -0.9138,  0.6385, -0.2419, -0.9479,  0.9969,  0.9990]],

        [[ 1.0000,  1.0000,  1.0000,  1.0000, -1.0000,  1.0000, -1.0000],
         [ 1.0000,  0.9992,  1.0000,  1.0000, -1.0000,  1.0000, -0.9839],
         [ 1.0000,  0.9994,  1.0000,  1.0000, -1.0000,  1.0000, -0.9876]],

        [[ 0.4824, -0.5419,  0.6573,  0.4467, -0.2499,  0.8659,  0.7758],
         [ 0.1869, -0.9881, -0.4207, -0.8575, -0.0675,  0.9683,  0.9998],
         [-0.3154, -0.2252,  0.3518, -0.0150, -0.6101,  0.9236,  0.8786]],

        [[ 1.0000,  1.0000,  1.0000,  1.0000, -1.0000,  1.0000, -1.0000],
         [ 1.0000,  1.0000,  1.0000,  1.0000, -1.0000,  1.0000, -1.0000],
         [ 1.0000,  1.0000,  1.0000,  1.0000, -1.0000,  1.0000, -1.0000]],

        [[ 1.0000,  1.0000,  1.0000,  1.0000, -1.0000,  0.9999, -1.0000],
         [ 1.0000,  0.9932,  1

In [507]:
debug_h_t

tensor([[[ 0.7367, -0.9138,  0.6385, -0.2419, -0.9479,  0.9969,  0.9990]],

        [[ 1.0000,  0.9994,  1.0000,  1.0000, -1.0000,  1.0000, -0.9876]],

        [[-0.3154, -0.2252,  0.3518, -0.0150, -0.6101,  0.9236,  0.8786]],

        [[ 1.0000,  1.0000,  1.0000,  1.0000, -1.0000,  1.0000, -1.0000]],

        [[ 1.0000,  0.9945,  1.0000,  0.9999, -1.0000,  1.0000, -0.8568]]])

In [508]:
debug_attention

tensor([[[0.0260],
         [0.4809],
         [0.4932]],

        [[0.3366],
         [0.3310],
         [0.3323]],

        [[0.2800],
         [0.2839],
         [0.4361]],

        [[0.3333],
         [0.3333],
         [0.3333]],

        [[0.3635],
         [0.3166],
         [0.3198]]])

In [459]:
hidden_size = 7
W1 = nn.Linear(hidden_size, hidden_size, bias=False)
W2 = nn.Linear(hidden_size, hidden_size, bias=False)
V = nn.Linear(hidden_size, 1, bias=False)

rnn = nn.RNN(1, hidden_size, batch_first=True)

In [460]:
output, h_t = rnn.forward(x_test)

In [264]:
print(f'{W1(output).shape=}')
print(f'{W2(h_t.transpose(0, 1)).shape=}')

W1(output).shape=torch.Size([10, 3, 7])
W2(h_t.transpose(0, 1)).shape=torch.Size([10, 1, 7])


In [265]:
x = (W1(output) + W2(h_t.transpose(0, 1)))

In [266]:
attn_w = F.softmax(V(x), dim=1)

In [269]:
print(f'{attn_w.shape=}') # N, L, 1
print(f'{h_t.transpose(0, 1).shape=}') # N, 1, H
torch.bmm(attn_w.transpose(1, 2), output).shape

attn_w.shape=torch.Size([10, 3, 1])
h_t.transpose(0, 1).shape=torch.Size([10, 1, 7])


torch.Size([10, 1, 7])