In [1]:
import torch
import matplotlib.pyplot as plt
%matplotlib inline
import torch.nn.functional as F

In [4]:
# train = open('Training.txt','r').read().splitlines()
test = open('../MLP/data/test.txt').read().splitlines()
dev = open('../MLP/data/dev.txt').read().splitlines()
words = open('../MLP/data/Training.txt').read().splitlines()
len(words)

30000

In [21]:
# letter to number mapping

chars = sorted(list(set(''.join(words))))
stoi = {s:i+1 for i,s in enumerate(chars)}
stoi['.'] = 0
itos = {i:s for s,i in stoi.items()}
itos
vocab_size = len(itos)

In [24]:
block = 8

def dataset(words):


    X,Y = [],[]

    for w in words:

    #     print(w)

        context = [0] * block
        for ch in w + '.':
            ix = stoi[ch]
            X.append(context)
            Y.append(ix)

    #         print(''.join(itos[i] for i in context), itos[ix])
            context = context[1:] + [ix]

    X = torch.tensor(X)
    Y = torch.tensor(Y)
    print(X.shape,Y.shape)
    return X,Y

In [25]:
Xtr, Ytr = dataset(words)
Xdev,Ydev = dataset(dev)
Xte, Yte = dataset(test)
for x,y in zip(Xtr[:20], Ytr[:20]):
    print(''.join(itos[ix.item()] for ix in x), '-->', itos[y.item()])

torch.Size([313130, 8]) torch.Size([313130])
torch.Size([104449, 8]) torch.Size([104449])
torch.Size([104449, 8]) torch.Size([104449])
........ --> u
.......u --> n
......un --> a
.....una --> r
....unar --> r
...unarr --> a
..unarra --> i
.unarrai --> g
unarraig --> n
narraign --> e
arraigne --> d
rraigned --> .
........ --> c
.......c --> i
......ci --> r
.....cir --> s
....cirs --> o
...cirso --> t
..cirsot --> o
.cirsoto --> m


In [29]:
class Linear:
  
    def __init__(self, fan_in, fan_out, bias=True):
        self.weight = torch.randn((fan_in, fan_out)) / fan_in**0.5 # note: kaiming init
        self.bias = torch.zeros(fan_out) if bias else None
  
    def __call__(self, x):
        self.out = x @ self.weight
        if self.bias is not None:
            self.out += self.bias
        return self.out
  
    def parameters(self):
        return [self.weight] + ([] if self.bias is None else [self.bias])


class BatchNorm1d:
  
    def __init__(self, dim, eps=1e-5, momentum=0.1):
        self.eps = eps
        self.momentum = momentum
        self.training = True
 
        self.gamma = torch.ones(dim)
        self.beta = torch.zeros(dim)

        self.running_mean = torch.zeros(dim)
        self.running_var = torch.ones(dim)
  
    def __call__(self, x):
        if self.training:
            if x.ndim == 2:
                xmean = x.mean(0, keepdim=True)  # batch mean
                xvar = x.var(0, keepdim=True)    # batch variance
            elif x.ndim == 3:
                xmean = x.mean((0, 1), keepdim=True)
                xvar = x.var((0, 1), keepdim=True)
        else:
            xmean = self.running_mean
            xvar = self.running_var

        xhat = (x - xmean) / torch.sqrt(xvar + self.eps)
        self.out = self.gamma * xhat + self.beta
        if self.training:
            with torch.no_grad():
                self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * xmean.squeeze()
                self.running_var = (1 - self.momentum) * self.running_var + self.momentum * xvar.squeeze()

        return self.out
  
    def parameters(self):
        return [self.gamma, self.beta]


class Tanh:
    def __call__(self, x):
        self.out = torch.tanh(x)
        return self.out
    def parameters(self):
        return []


class Embedding:
  
    def __init__(self, num_embeddings, embedding_dim):
        self.weight = torch.randn((num_embeddings, embedding_dim))
    
    def __call__(self, IX):
        self.out = self.weight[IX]
        return self.out
  
    def parameters(self):
        return [self.weight]


class FlattenConsecutive:
  
    def __init__(self, n):
        self.n = n
    
    def __call__(self, x):
        B, T, C = x.shape
        x = x.view(B, T//self.n, C*self.n)
        if x.shape[1] == 1:
            x = x.squeeze(1)
        self.out = x
        return self.out
  
    def parameters(self):
        return []


class Sequential:
  
    def __init__(self, layers):
        self.layers = layers
  
    def __call__(self, x):
        for layer in self.layers:
            x = layer(x)
        self.out = x
        return self.out
  
    def parameters(self):
    
        return [p for layer in self.layers for p in layer.parameters()]

In [30]:
n_embd = 24 # the dimensionality of the character embedding vectors
n_hidden = 128 # the number of neurons in the hidden layer of the MLP
model = Sequential([
      Embedding(vocab_size, n_embd),
      FlattenConsecutive(2), Linear(n_embd * 2, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
      FlattenConsecutive(2), Linear(n_hidden*2, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
      FlattenConsecutive(2), Linear(n_hidden*2, n_hidden, bias=False), BatchNorm1d(n_hidden), Tanh(),
      Linear(n_hidden, vocab_size),
    ])

# parameter init
with torch.no_grad():
    model.layers[-1].weight *= 0.1 # last layer make less confident

parameters = model.parameters()
print(sum(p.nelement() for p in parameters)) # number of parameters in total
for p in parameters:
    p.requires_grad = True

76579


In [None]:
max_steps = 200000
batch_size = 32
lossi = []

for i in range(max_steps):
  
  # minibatch construct
    ix = torch.randint(0, Xtr.shape[0], (batch_size,))
    Xb, Yb = Xtr[ix], Ytr[ix] # batch X,Y
    
  # forward pass
    logits = model(Xb)
    loss = F.cross_entropy(logits, Yb) # loss function
  
  # backward pass
    for p in parameters:
        p.grad = None
    loss.backward()
  
  # update: simple SGD
    lr = 0.1 if i < 150000 else 0.01 # step learning rate decay
    for p in parameters:
        p.data += -lr * p.grad

  # track stats
    if i % 10000 == 0: # print every once in a while
        print(f'{i:7d}/{max_steps:7d}: {loss.item():.4f}')
    lossi.append(loss.log10().item())

      0/ 200000: 3.2802
