# LSTM for time series prediction

## Importing the libraries

In [93]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch.utils.data import Dataset, DataLoader


# Data Preprocessing
we will train the LSTM model on the Penn  Treebank dataset. The Penn Treebank dataset is a dataset of cleaned and annotated English text. The data is split into training, validation, and testing sets.

In [94]:
train_data = open('data/ptb.train.txt', 'r').read()
test_data = open('data/ptb.test.txt', 'r').read()
valid_data = open('data/ptb.valid.txt', 'r').read()
data = train_data + ' ' + test_data + ' ' + valid_data



### let's see what are the most common words in the data

In [95]:
from collections import Counter

leaderboard = Counter(data.split()).most_common(10)
i = 0
for word, freq in leaderboard:
    i+=1
    print(f'{i}.{word}: appears {freq} times')

1.the: appears 59421 times
2.<unk>: appears 53299 times
3.N: appears 37607 times
4.of: appears 28427 times
5.to: appears 27430 times
6.a: appears 24755 times
7.in: appears 21032 times
8.and: appears 20404 times
9.'s: appears 11555 times
10.for: appears 10436 times


## Tokenizing the data
### create a vocabulary of words

In [96]:


def build_vocab(txt, max_vocab_size):
    counter = Counter(txt.split())

    most_common = counter.most_common(max_vocab_size - 1)
    vocab = {word: idx + 1 for idx, (word, _) in enumerate(most_common)}
    vocab['<unk>'] = 0
    return vocab


In [97]:
# concatenate the data

vocab = build_vocab(data, max_vocab_size=100000)

### create a function that converts a word to token index and vice versa

In [98]:

# decode the token i to a word S
def itos(i):
    return list(vocab.keys())[i]

# encode the word S to a token index i
def stoi(s):
    return vocab[s] if s in vocab else vocab['<unk>']

In [99]:
train_data = [stoi(word) for word in train_data.split()]
valid_data = [stoi(word) for word in valid_data.split()]
test_data = [stoi(word) for word in test_data.split()]

## build a dataset and dataloader

In [100]:
batch_size = 32
seq_length = 32

In [101]:
class PTBDataset(Dataset):
    def __init__(self, data, seq_length):
        self.data = data
        self.seq_length = seq_length

    def __len__(self):
        return len(self.data) // self.seq_length

    def __getitem__(self, idx):
        x = self.data[idx * self.seq_length: (idx + 1) * self.seq_length]
        y = self.data[idx * self.seq_length + 1: (idx + 1) * self.seq_length + 1]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)


def batchify(data, batch_size, seq_length):
    num_batches = len(data) // (batch_size * seq_length)
    data = data[:num_batches * batch_size * seq_length]
    data = np.reshape(data, [batch_size, -1])
    return data

train_data_batched = batchify(train_data, batch_size, seq_length)
valid_data_batched = batchify(valid_data, batch_size, seq_length)
test_data_batched = batchify(test_data, batch_size, seq_length)

train_dataset = PTBDataset(train_data_batched.flatten(), seq_length)
valid_dataset = PTBDataset(valid_data_batched.flatten(), seq_length)
test_dataset = PTBDataset(test_data_batched.flatten(), seq_length)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

test_data_iter = iter(test_loader)
train_data_iter = iter(train_loader)
valid_data_iter = iter(valid_loader)

# Lstm model Architecture

In [144]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class LSTM_Cell(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        """
        :param input_size:
        :param hidden_size:
        :param num_layers:
        :param num_classes:
        """
        super(LSTM_Cell, self).__init__()

        # size of the hidden state
        self.hidden_size = hidden_size

        # LSTM gates
        # Forget gate
        self.f_gate = nn.Sequential(
            nn.Linear(input_size + hidden_size, hidden_size),
            nn.Sigmoid() )

        # Candidate gate(input modulation in the original paper)
        self.g_gate = nn.Sequential(
            nn.Linear(input_size + hidden_size, hidden_size),
            nn.Tanh())

        # Input gate
        self.i_gate = nn.Sequential(
            nn.Linear(input_size + hidden_size, hidden_size),
            nn.Sigmoid())

        # Output gate
        self.o_gate = nn.Sequential(
            nn.Linear(input_size + hidden_size,hidden_size),
            nn.Sigmoid())
        self.tanh = nn.Tanh()

    def forward(self,x,h,c):
        """
        :param x: input tensor
        :param h: previous hidden state
        :param c: previous cell state
        :return: (c,h) tuple of new cell state and new hidden state
        """
        print(f"x: {x.size()}, h: {h.size()}")
        # Concatenate input and hidden state
        x_h = torch.cat((x,h),0)

        # Forget
        f = self.f_gate(x_h)
        g = self.g_gate(x_h)
        i = self.i_gate(x_h)
        o = self.o_gate(x_h)

        # update c
        c = c * f + (g*i)
        # THEN, update h
        h = self.tanh(c) * o

        return h,c

class LSTM(nn.Module):
    def __init__(self,
                 input_size,
                 hidden_size,
                 num_layers,
                 num_classes):
        """
        :param input_size:
        :param hidden_size:
        :param num_layers:
        :param num_classes:
        """
        super(LSTM, self).__init__()

        self.LSTM_Cell = LSTM_Cell(input_size, hidden_size, num_layers, num_classes)
        self.hidden_size = hidden_size

    def forward(self,x,seq_length):
        """
        :param x: input tensor
        :param h: previous hidden state
        :param c: previous cell state
        :return: (c,h) tuple of new cell state and new hidden state
        """
        h = []
        # store the initial hidden and cell states
        ht =torch.zeros( self.hidden_size) # store the hidden states (output)
        c = torch.zeros(self.hidden_size)

        # store the hidden states (output)

        for i in range(seq_length):
            ht,c = self.LSTM_Cell(x[i],ht,c)
            h.append(ht)
            print("done")

        return h


# Example

In [148]:
x = next(train_data_iter)[0]

print(x.size())
lstm = LSTM(x.size(0), 200, 1, 10000)


h = lstm(x, 32)

print(h)

torch.Size([32, 32])
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200])
done
x: torch.Size([32]), h: torch.Size([200

2052