<a href="https://colab.research.google.com/github/vishal-burman/PyTorch-Architectures/blob/master/misc/Transformer_TimeSeries.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install wget
! pip install datasets
! git clone https://github.com/vishal-burman/PyTorch-Architectures.git # --> QOL helper methods
%cd PyTorch-Architectures/

/content/PyTorch-Architectures


In [14]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from toolkit.utils import get_optimal_batchsize, dict_to_device

In [3]:
class CustomDataset(Dataset):
  def __init__(self, train=True):
    self.root_url = "https://raw.githubusercontent.com/hfawaz/cd-diagram/master/FordA/"
    self.url = self.root_url + ("FordA_TRAIN.tsv" if train else "FordA_TEST.tsv")
    self.data = np.loadtxt(self.url, delimiter="\t")
    self.x = self.data[:, 1:]
    self.y = self.data[:, 0].astype(int)
    self.y[self.y == -1] = 0
  
  def __len__(self,):
    return len(self.data)
  
  def __getitem__(self, idx):
    inputs = self.x[idx]
    labels = self.y[idx]
    return (inputs, labels)

  def collate_fn(self, batch):
    inputs = []
    labels = []
    for sample in batch:
      inputs.append(sample[0])
      labels.append(sample[1])
    inputs = torch.tensor(inputs)
    labels = torch.tensor(labels, dtype=torch.long)
    return {
        'inputs': inputs[:, :, None],
        'labels': labels[:, None, None],
    }

In [4]:
class PreNorm(nn.Module):
  def __init__(self, dim, fn):
    super().__init__()
    self.layernorm = nn.LayerNorm(dim)
    self.fn = fn
  
  def forward(self, x, **kwargs):
    return self.fn(self.layernorm(x), **kwargs)

In [5]:
class Residual(nn.Module):
  def __init__(self, fn):
    super().__init__()
    self.fn = fn
  
  def forward(self, x, **kwargs):
    return self.fn(x, **kwargs) + x

In [6]:
class FeedForward(nn.Module):
  def __init__(self, dim, hidden_dim, dropout=0.):
    super().__init__()
    self.net = nn.Sequential(
        nn.Linear(dim, hidden_dim),
        nn.GELU(),
        nn.Dropout(dropout),
        nn.Linear(hidden_dim, dim),
        nn.Dropout(dropout),
    )
  
  def forward(self, x):
    return self.net(x)

In [7]:
# Code for Attention
class Attention(nn.Module):
  def __init__(self, dim, heads=8, dropout=0.):
    super().__init__()
    self.heads = heads
    self.scale = dim ** -0.5

    self.to_qkv = nn.Linear(dim, dim * 3, bias=False)
    self.to_out = nn.Sequential(
        nn.Linear(dim, dim),
        nn.Dropout(dropout),
    )
  
  def forward(self, x):
    b, n, dim, h = *x.shape, self.heads
    qkv = self.to_qkv(x).chunk(3, dim=-1)
    q, k, v = map(lambda t: t.reshape(b, n, h, dim // h).transpose(1, 2), qkv)
    
    dots = q @ k.transpose(-2, -1) * self.scale
    attn = dots.softmax(dim=-1) # attn ~ [b, h, n, n]    
    out = (attn @ v).transpose(1, 2).reshape(b, n, -1)
    
    out = self.to_out(out)
    return out

In [16]:
class TransformerTimeSeries(nn.Module):
  def __init__(self, dim, hidden_dim, mlp_dim, num_classes, depth=4, dropout=0.):
    super().__init__()
    self.inputs_to_embeds = nn.Linear(1, dim)
    self.dropout = nn.Dropout(dropout)
    self.cls_token = nn.Parameter(torch.randn(1, 1, dim))

    self.layers = nn.ModuleList([])
    for _ in range(depth):
      self.layers.append(nn.ModuleList([
          Residual(PreNorm(dim, Attention(dim, dropout=dropout))),
          Residual(PreNorm(dim, FeedForward(dim, mlp_dim, dropout=dropout))),
      ]))

    self.mlp_head = nn.Sequential(
        nn.LayerNorm(dim),
        nn.Linear(dim, mlp_dim),
        nn.GELU(),
        nn.Dropout(dropout),
        nn.Linear(mlp_dim, num_classes),        
    )
  
  def forward(self, inputs, labels=None):
    x = self.inputs_to_embeds(inputs)
    b, n, _ = x.shape
    cls_token = self.cls_token.expand(b, -1, -1)
    x = torch.cat([cls_token, x], dim=1)
    x = self.dropout(x)
    for att, ff in self.layers:
      x = att(x)
      x = ff(x)
    pooled_x = x[:, 0]
    logits = self.mlp_head(pooled_x)
    loss = None
    if labels is not None:
      loss_fct = nn.CrossEntropyLoss()
      loss = loss_fct(logits.view(logits.size(0), -1), labels.view(-1))
    return (loss, logits)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = TransformerTimeSeries(dim=1024, 
                           hidden_dim=2048, 
                           mlp_dim=512, 
                           num_classes=2,
                           depth=4, 
                           dropout=0.5)
model = model.double()
model.to(device)

In [18]:
params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Trainable Parameters: {params}")

Trainable Parameters: 21529090


In [19]:
# Hyperparameter section
BS = 64

In [20]:
dataset_train = CustomDataset(train=True)
dataset_valid = CustomDataset(train=False)

train_loader = DataLoader(dataset_train, batch_size=BS, shuffle=False,
                          collate_fn=dataset_train.collate_fn)
valid_loader = DataLoader(dataset_valid, batch_size=BS, shuffle=False,
                          collate_fn=dataset_valid.collate_fn)

print(f"Length of Train Loader: {len(train_loader)}")
print(f"Length of Valid Loader: {len(valid_loader)}")

Length of Train Loader: 57
Length of Valid Loader: 21


In [23]:
# Sanity check forward pass
loader = DataLoader(dataset_train, batch_size=4, shuffle=False,
                          collate_fn=dataset_train.collate_fn)
model.eval()
with torch.set_grad_enabled(False):
  for sample in loader:
    outputs = model(**dict_to_device(sample, device=device))
    loss, logits = outputs
    assert logits.size(-1) == 2, "Last Dimension not equal to classes"
    break