In [1]:
import math

In [2]:
from typing import Tuple

In [3]:
import torch
from torch import nn, Tensor

In [4]:
import torch.nn.functional as F

In [19]:
from torch.nn import *
#TransformerEncoder, TransformerEncoderLayer

In [6]:
from torch.utils.data import dataset

In [36]:
class PositionalEncoding(
    nn.Module
    ):
    
    def __init__(
        self,
        d_model: int,
        dropout: float = 0.1,
        max_len: int = 5000,
        ):
        
        super().__init__()
        self.dropout = nn.Dropout(p = dropout)
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) 
            * (-math.log(10000.0)/d_model)
            )
        
        pe = torch.zeros(max_len, 1, d_model)
        
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe)
        
    def forward(
        self,
        x: Tensor
        ) -> Tensor:
        
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)
        

In [26]:
torch.arange(10).unsqueeze(1)

tensor([[0],
        [1],
        [2],
        [3],
        [4],
        [5],
        [6],
        [7],
        [8],
        [9]])

In [38]:
p = PositionalEncoding(
    100, 
    )

In [48]:
class TransformerModel(
    nn.Module,
    ):
    
    def __init__(
        self,
        ntoken: int,
        d_model: int,
        nhead: int, 
        d_hid: int,
        nlayers: int,
        dropout: float = 0.5,
        ):
        
        super().__init__()
        
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(
            d_model, 
            dropout,)
        encoder_layers = TransformerEncoderLayer(
            d_model,
            nhead,
            d_hid,
            dropout,)
        self.transformer_encoder = TransformerEncoder(
            encoder_layers,
            nlayers,
            )
        self.encoder = nn.Embedding(
            ntoken,
            d_model,
            )
        self.d_model = d_model
        self.decoder = nn.Linear(
            d_model,
            ntoken,
            )
        
        self.init_weights()
        
    def init_weights(
        self
        ) -> None:
        initrage = 0.1
        
        self.encoder.weight.data.uniform_(
            -initrage,
            initrage,
            )
        
        self.decoder.bias.data.zero_()
        
        self.decoder.weight.data.uniform_(
            -initrage, 
            initrage,
            )
        
    def forward(
        self,
        src: Tensor,
        src_mask: Tensor,
        ) -> Tensor:
        
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        
        output = self.transformer_encoder(src, src_mask)
        output = self.dropout(output)
        
        return output
    
    def generate_square_subsequent_mask(
        sz: int,
        ) -> Tensor:
        return torch.triu(
            torch.ones(sz, sz) * float('-inf'),
            diagonal = 1,
            )

In [49]:
m = TransformerModel(
    ntoken = 100,
    d_model = 100,
    nhead = 10, 
    d_hid = 256,
    nlayers = 128,
    )

# load the batch data

In [51]:
!pip install torchdata==0.4.1




[notice] A new release of pip available: 22.2.1 -> 22.2.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [53]:
!pip install torchtext

Collecting torchtext
  Downloading torchtext-0.13.1-cp310-cp310-win_amd64.whl (2.2 MB)
     ---------------------------------------- 2.2/2.2 MB 10.2 MB/s eta 0:00:00
Installing collected packages: torchtext
Successfully installed torchtext-0.13.1



[notice] A new release of pip available: 22.2.1 -> 22.2.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [54]:
from torchtext.datasets import WikiText2

In [55]:
from torchtext.data.utils import get_tokenizer

In [56]:
from torchtext.vocab import build_vocab_from_iterator

In [57]:
train_iter = WikiText2(split='train')

In [58]:
tokenizer = get_tokenizer('basic_english')

In [60]:
vocab =  build_vocab_from_iterator(
    map(tokenizer, train_iter),
    specials = ['<unk>'],                               
    )

In [61]:
vocab.set_default_index(vocab['<unk>'])

In [65]:
vocab['test']

1660

In [72]:
def data_process(
    raw_text_iter: dataset.IterableDataset,
    ) -> Tensor:
    data = [torch.tensor(
        vocab(tokenizer(item))
        )
        for item in raw_text_iter]
    return torch.cat(
        tuple(
            filter(lambda t: t.numel() > 0, data)
            )
        )

In [68]:
train_iter, val_iter, test_iter = WikiText2()

In [90]:
train_data = data_process(
    train_iter,
    )

In [76]:
val_data = data_process(val_iter)

In [77]:
test_data = data_process(test_iter)

In [78]:
device = torch.device(
    'cuda' if torch.cuda.is_available()
    else 'cpu')

In [79]:
device

device(type='cuda')

In [88]:
def batchify(
    data: Tensor,
    bsz: int,
    ) -> Tensor:
    seq_len = data.size(0) // bsz
    data = data[:seq_len * bsz]
    data = data.view(bsz, seq_len).t().contiguous()
    return data.to(device)

In [91]:
batch_size = 20
eval_batch_size = 20
train_data = batchify(
    train_data, 
    batch_size,
    )

In [93]:
train_data.shape

torch.Size([102499, 20])

In [95]:
val_data = batchify(
    val_data,
    eval_batch_size,
    )

In [96]:
test_data = batchify(
    test_data,
    eval_batch_size,
    )

In [97]:
test_data.shape

torch.Size([12092, 20])

In [98]:
bptt = 35

In [99]:
def get_batch(
    source: Tensor,
    i: int,
    ) -> Tuple[Tensor, Tensor]:
    
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    
    target = source[i+1:i+seq_len].reshape(-1)
    
    return data, target

In [102]:
data, target = get_batch(test_data, 0)

In [103]:
data.shape

torch.Size([35, 20])

In [104]:
target.shape

torch.Size([680])

In [106]:
ntokens = len(vocab)
emsize = 200
d_hid = 200
nlayers = 2
nhead = 2
dropout = 0.2

In [107]:
model = TransformerModel(
    ntokens,
    emsize,
    nhead,
    d_hid,
    nlayers,
    dropout,
    ).to(device)

In [108]:
import copy
import time

In [109]:
criterion = nn.CrossEntropyLoss()

In [110]:
lr = 5.0

In [111]:
optimizer = torch.optim.SGD(
    model.parameters(),
    lr = lr,
    )

In [112]:
scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer, 
    1.0,
    gamma=0.95,
    )

In [118]:
def train(
    model: nn.Module
    ) -> None:
    
    model.train()
    
    total_loss = 0
    log_interval = 200
    start_time = time.time()
    
    src_mask = model.generate_square_subsequent_mask(bptt).to(device)
    
    return None

In [116]:
best_val_loss = float('inf')
epochs = 3
best_model = None

In [119]:
for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    
    train(model)

TypeError: TransformerModel.generate_square_subsequent_mask() takes 1 positional argument but 2 were given