Imports

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as f
import math
import copy

RUN_EXAMPLES = True

  from .autonotebook import tqdm as notebook_tqdm


## Helper Functions

In [4]:
def positional_encoding(dim,max_len=1000):
    arr_pe = torch.zeros(max_len,dim)
    #Creates an array for the positions of rows
    arr_pos = torch.arange(0,max_len).unsqueeze(1)
    #Sine and cosine are applied an alternating dims so half are generated(Sin and cos apply to same subset basically to have the different outputs)
    div_term = torch.exp(torch.arange(0, dim, 2) * -(math.log(10000.0) / dim))
    #basically multipleis the 0-max len values by the div term which is unique per dim
    temp = arr_pos*div_term
    arr_pe[:,0::2]= torch.sin(temp)
    arr_pe[:,1::2]= torch.cos(temp)
    return(arr_pe.unsqueeze(0))

def scaled_dot_product_attention(q,k,v,mask=None,dropout=None):
    dim = q.size(-1)
    q_k = torch.matmul(q,k.transpose(-2, -1))
    q_k_scale = q_k * math.sqrt(dim)
    #can add a mask here
    if mask is not None:
        q_k_scale = q_k_scale.masked_fill(mask == 0, -1e9)

    q_k_scale = f.softmax(q_k_scale,dim=1)
    if dropout is not None:
        q_k_scale = dropout(q_k_scale)
    attn = torch.matmul(q_k_scale,v)
    return(attn)


In [12]:

def show_example(fn, args=[]):
    if __name__ == "__main__" and RUN_EXAMPLES:
        return fn(*args)
def subsequent_mask(size):
    "Mask out subsequent positions."
    attn_shape = (1, size, size)
    subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(
        torch.uint8
    )
    return subsequent_mask == 0

def run_tests():
    for _ in range(10):
        inference_test()
def clones(component,N):
    return(nn.ModuleList([copy.deepcopy(component)for _ in range(0,N)]))


class Embeddings(nn.Module):
    def __init__(self, d_model, vocab):
        super(Embeddings, self).__init__()
        self.lut = nn.Embedding(vocab, d_model)
        self.d_model = d_model

    def forward(self, x):
        return self.lut(x) * math.sqrt(self.d_model)

class PositionwiseFeedForward(nn.Module):
    "Implements FFN equation."

    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.w_2(self.dropout(self.w_1(x).relu()))

class PositionalEncoding(nn.Module):
    "Implement the PE function."

    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)

class MultiHeadAttention(nn.Module):
    def __init__(self,h,dim,dropout=0.1):
        super().__init__()
        assert dim % h == 0
        # We assume d_v always equals d_k
        self.d_k = dim // h
        self.h = h
        self.linears = clones(nn.Linear(dim, dim), 4)
        self.attn = None
        self.l_q = nn.Linear(dim,dim)
        self.l_k = nn.Linear(dim,dim)
        self.l_v = nn.Linear(dim,dim)
        self.sdpa = scaled_dot_product_attention
        self.dropout = nn.Dropout(p =dropout)

    def forward(self,q,k,v,mask=None):
        if mask is not None:
            # Same mask applied to all h heads.
            mask = mask.unsqueeze(1)
        nbatches = q.size(0)
        # q = self.l_q(q)
        # k = self.l_k(k)
        # v = self.l_v(v)
        print(q.shape)

        # 1) Do all the linear projections in batch from d_model => h x d_k
        query, key, value = [
            lin(x).view(nbatches, -1, self.h, self.d_k)#.transpose(1, 2)
            for lin, x in zip(self.linears, (q, k, v))
        ]
        print(query.shape,key.shape,value.shape)

        query, key, value = [
            lin(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
            for lin, x in zip(self.linears, (q, k, v))
        ]
        print(query.shape,key.shape,value.shape)

        # 2) Apply attention on all the projected vectors in batch.
        x = self.sdpa(
            query, key, value, mask=mask, dropout=self.dropout
        )

        # 3) "Concat" using a view and apply a final linear.
        x = (
            x.transpose(1, 2)
            .contiguous()
            .view(nbatches, -1, self.h * self.d_k)
        )
        del query
        del key
        del value
        return self.linears[-1](x)

class SublayerConnection(nn.Module):
    """
    A residual connection followed by a layer norm.
    Note for code simplicity the norm is first as opposed to last.
    """

    def __init__(self, size, dropout):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        "Apply residual connection to any sublayer with the same size."
        return x + self.dropout(sublayer(self.norm(x)))

class LayerNorm(nn.Module):
    "Construct a layernorm module (See citation for details)."

    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

# Architecture
Consists only of the transformer block of the original transformer

In [13]:
#Embed, positional encoding, decoder n layers, Linear, softmax
class GPT(nn.Module):
    def __init__(self,decoder,embedding,N,dim):
        super().__init__()
        self.decoders = clones(decoder,N)
        self.embed = embedding
        #self.pe = positional_encoding
        self.out_layer = nn.Linear(dim,dim)
    
    def forward(self,x,mask):
        x = self.embed(x)# + self.pe
        for decode in self.decoders:
            x = decode(x,mask)
        x = self.out_layer(x)
        x = f.softmax(x)
        return(x)

class DecoderLayer(nn.Module):
    def __init__(self, size, self_attn, feed_forward, dropout):
        super(DecoderLayer, self).__init__()
        self.size = size
        self.self_attn = self_attn
        self.feed_forward = feed_forward
        self.sublayer = clones(SublayerConnection(size, dropout), 3)

    def forward(self, x, tgt_mask):
        "Follow Figure 1 (right) for connections."
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
        return self.sublayer[2](x, self.feed_forward)


In [14]:
def make_model(
    src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1
):
    "Helper: Construct a model from hyperparameters."
    c = copy.deepcopy
    attn = MultiHeadAttention(h, d_model)
    ff = PositionwiseFeedForward(d_model, d_ff, dropout)
    position = PositionalEncoding(d_model, dropout)
    model = GPT(
        DecoderLayer(d_model, c(attn), c(ff), dropout),
        nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),
        N,
        d_model,
    )

    # This was important from their code.
    # Initialize parameters with Glorot / fan_avg.
    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)
    return model

def inference_test():
    test_model = make_model(11, 11, 2)
    test_model.eval()
    src = torch.LongTensor([[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]])
    src_mask = torch.ones(1, 1, 10)

    memory = test_model.forward(src, src_mask)
    ys = torch.zeros(1, 1).type_as(src)
    print(memory)

    for i in range(9):
        out = test_model.decode(
            memory, src_mask, ys, subsequent_mask(ys.size(1)).type_as(src.data)
        )
        prob = test_model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.data[0]
        ys = torch.cat(
            [ys, torch.empty(1, 1).type_as(src.data).fill_(next_word)], dim=1
        )

    print("Example Untrained Model Prediction:", ys)




show_example(run_tests)

torch.Size([1, 10, 512])
torch.Size([1, 10, 8, 64]) torch.Size([1, 10, 8, 64]) torch.Size([1, 10, 8, 64])
torch.Size([1, 8, 10, 64]) torch.Size([1, 8, 10, 64]) torch.Size([1, 8, 10, 64])
torch.Size([1, 10, 512])
torch.Size([1, 10, 8, 64]) torch.Size([1, 10, 8, 64]) torch.Size([1, 10, 8, 64])
torch.Size([1, 8, 10, 64]) torch.Size([1, 8, 10, 64]) torch.Size([1, 8, 10, 64])
tensor([[[1., 1., 1.,  ..., 1., 1., 1.],
         [1., 1., 1.,  ..., 1., 1., 1.],
         [1., 1., 1.,  ..., 1., 1., 1.],
         ...,
         [1., 1., 1.,  ..., 1., 1., 1.],
         [1., 1., 1.,  ..., 1., 1., 1.],
         [1., 1., 1.,  ..., 1., 1., 1.]]], grad_fn=<SoftmaxBackward0>)


  x = f.softmax(x)


AttributeError: 'GPT' object has no attribute 'decode'

In [15]:


class GPTTransformer(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_heads, dropout=0.1):
        super(GPTTransformer, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.dropout = dropout

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.pos_encoding = PositionalEncoding(hidden_size, dropout)

        self.layers = nn.ModuleList([GPTTransformerLayer(hidden_size, num_heads, dropout) for _ in range(num_layers)])
        self.output_layer = nn.Linear(hidden_size, input_size)
        
    def forward(self, input_ids, input_mask=None):
        # Perform embedding and add positional encoding
        embedded = self.embedding(input_ids)
        embedded = self.pos_encoding(embedded)

        # Pass the input through the layers of the transformer
        for layer in self.layers:
            embedded = layer(embedded, input_mask)

        # Apply the output layer and return the result
        logits = self.output_layer(embedded)
        return logits

class GPTTransformerLayer(nn.Module):
    def __init__(self, hidden_size, num_heads, dropout=0.1):
        super(GPTTransformerLayer, self).__init__()
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.dropout = dropout

        self.attention = MultiHeadAttention(hidden_size, num_heads, dropout)
        self.feed_forward = nn.Sequential(
            nn.Linear(hidden_size, 4 * hidden_size),
            nn.ReLU(),
            nn.Linear(4 * hidden_size, hidden_size)
        )
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, input, input_mask=None):
        # Apply attention
        attention_output = self.attention(input, input, input, input_mask)
        attention_output = self.dropout1(attention_output)
        attention_output = attention_output + input

        # Apply feed-forward layer
        feed_forward_output = self.feed_forward(attention_output)
        feed_forward_output = self.dropout2(feed_forward_output)
        feed_forward_output = feed_forward_output + attention_output

        return feed_forward_output

In [None]:
import torch
import torch.nn as nn

class GPT(nn.Module):
    def __init__(self, vocab_size, hidden_size, num_layers, num_heads, dropout):
        super(GPT, self).__init__()
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.dropout = dropout

        self.embedding = nn.Embedding(vocab_size, hidden_size)
        self.transformer = nn.Transformer(hidden_size, num_heads, num_layers, dropout)
        self.output_linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, mask):
        x = self.embedding(x)
        x = self.transformer(x, mask)
        x = self.output_linear(x)
        return x

# Create a GPT model with a vocabulary size of 50, hidden size of 32,
# 2 layers, 4 heads, and a dropout rate of 0.1
gpt = GPT(50, 32, 2, 4, 0.1)


In [None]:
import torch
import torch.optim as optim

# Set the device to run on: GPU or CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Set the number of epochs and the learning rate
num_epochs = 10
learning_rate = 1e-3

# Set the criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(gpt.parameters(), lr=learning_rate)

# Move the model to the specified device
gpt.to(device)

# Loop over the number of epochs
for epoch in range(num_epochs):
    # Set the model to training mode
    gpt.train()

    # Loop over the training data
    for x, y in train_data:
        # Move the input and label tensors to the correct device
        x = x.to(device)
        y = y.to(device)

        # Forward pass
        output = gpt(x, mask)
        loss = criterion(output.view(-1, gpt.vocab_size), y.view(-1))

        # Zero the gradients
        optimizer.zero_grad()

        # Backward pass
        loss.backward()

        # Update the parameters
        optimizer.step()

    # Print the loss for each epoch
    print('Epoch: {}, Loss: {:.4f}'.format(epoch+1, loss.item()))

# Set the model to evaluation mode
gpt.eval()

# Evaluate the model on the test data
with torch.no_grad():
    correct = 0
    total = 0
    for x, y in test_data:
        # Move the input and label tensors to the correct device
        x = x.to(device)
        y = y.to(device)

        # Forward pass
        output = gpt(x, mask)

        # Get the predictions
        _, predicted = torch.max(output.data, 1)

        # Update the number of correct and total predictions
        total += y.size(0)
        correct += (predicted == y).sum().item()

    # Print the accuracy
    print('Accuracy: {:.2f}%'.format(100 * correct / total))


In [None]:
import torch
import torch.nn as nn

class Transformer(nn.Module):
    def __init__(self, num_tokens, embedding_dim, num_heads, hidden_dim, num_layers):
        super(Transformer, self).__init__()
        
        self.num_tokens = num_tokens
        self.embedding_dim = embedding_dim
        self.num_heads = num_heads
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # Embedding layer
        self.embedding = nn.Embedding(num_tokens, embedding_dim)
        
        # Positional encoding
        self.positional_encoding = nn.Embedding(num_tokens, embedding_dim)
        
        # Multi-head attention layers
        self.attention_layers = nn.ModuleList([nn.MultiheadAttention(embedding_dim, num_heads) for _ in range(num_layers)])
        
        # Fully connected layers
        self.fc_layers = nn.ModuleList([nn.Linear(embedding_dim, hidden_dim), nn.Linear(hidden_dim, num_tokens)])
        
    def forward(self, input_tokens, input_lengths):
        # Add positional encoding
        positions = torch.arange(input_tokens.size(1), device=input_tokens.device).unsqueeze(0).repeat(input_tokens.size(0), 1)
        input_tokens = input_tokens + self.positional_encoding(positions)
        
        # Embed input tokens
        embedded = self.embedding(input_tokens)
        
        # Pass through multi-head attention layers
        for attention_layer in self.attention_layers:
            embedded = attention_layer(embedded, embedded, embedded, input_lengths)
        
        # Pass through fully connected layers
        hidden = self.fc_layers[0](embedded)
        output = self.fc_layers[1](hidden)
        
        return output

model = Transformer(num_tokens=10000, embedding_dim=512, num_heads=8, hidden_dim=1024, num_layers=6)
output = model(input_tokens, input_lengths)
