In [46]:
import string
import random
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np
import math
import torch.nn.functional as F


#### Prepare for Dataset

In [47]:
all_chars       = string.printable
n_chars         = len(all_chars)
file            = open('../Data/shakespeare.txt').read()
file_len        = len(file)

print('Length of file: {}'.format(file_len))
print('All possible characters: {}'.format(all_chars))
print('Number of all possible characters: {}'.format(n_chars))

Length of file: 1115394
All possible characters: 0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ 	

Number of all possible characters: 100


#### Choose a Device

In [48]:
# If there are GPUs, choose the first one for computing. Otherwise use CPU.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)  
# If 'cuda:0' is printed, it means GPU is available.

cpu


#### Transformer Definition

In [49]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        # Initialize dimensions
        self.d_model = d_model# Model's dimension
        self.num_heads = num_heads # Number of attention heads
        self.d_k = d_model // num_heads # Dimension of each head's key, query, and value
        
        # Linear layers for transforming inputs
        self.W_q = nn.Linear(d_model, d_model) # Query transformation
        self.W_k = nn.Linear(d_model, d_model)  # Key transformation
        self.W_v = nn.Linear(d_model, d_model)  # Value transformation
        self.W_o = nn.Linear(d_model, d_model) # Output transformation
        
    def scaled_dot_product_attention(self, Q, K, V):
        # Calculate attention scores
        scaled = torch.matmul(Q,K.T)/np.sqrt(self.seq_length)
        
        # Rrevent from attending to future characters
        mask = np.tril(np.ones(self.seq_length))
        mask[mask == 0] = -np.infty
        mask[mask == 1] = 0
        
        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(scaled + mask, dim=-1)
        
        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.reshape(batch_size,seq_length,d_model)
        # return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        # Combine the multiple heads back to original shape
        batch_size, _, seq_length, d_k = x.size()
        # return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        qkv_layer = nn.Linear(self.seq_length, 3 * self.seq_length)
        return qkv_layer(x)
        
    def forward(self, Q, K, V, mask=None):
        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output

# def scaled_dot_product(q, k, v, mask=None):
#     d_k = q.size()[-1]
#     scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k)
#     if mask is not None:
#         scaled = scaled.permute(1, 0, 2, 3) + mask
#         scaled = scaled.permute(1, 0, 2, 3)
#     attention = F.softmax(scaled, dim=-1)
#     values = torch.matmul(attention, v)
#     return values, attention

# class MultiHeadAttention(nn.Module):
#     def __init__(self, d_model, num_heads):
#         super().__init__()
#         self.d_model = d_model
#         self.num_heads = num_heads
#         self.head_dim = d_model // num_heads
#         self.qkv_layer = nn.Linear(d_model , 3 * d_model)
#         self.linear_layer = nn.Linear(d_model, d_model)
    
#     def forward(self, x, mask):
#         batch_size, sequence_length, d_model = x.size()
#         qkv = self.qkv_layer(x)
#         qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim)
#         qkv = qkv.permute(0, 2, 1, 3)
#         q, k, v = qkv.chunk(3, dim=-1)
#         values, attention = scaled_dot_product(q, k, v, mask)
#         values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, self.num_heads * self.head_dim)
#         out = self.linear_layer(values)
#         return out

In [50]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [51]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [52]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

In [53]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

In [54]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        print(self.encoder_embedding)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2).to(device)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3).to(device)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool().to(device)
        tgt_mask = (tgt_mask & nopeak_mask).to(device)
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output).to(device)
        return output

### Preprocess Data

In [55]:
char_to_index = {char: i for i, char in enumerate(all_chars)}
# index_to_char = {i: char for i, char in enumerate(all_chars)}

#Convert text to tensor of indices
def text_to_tensor(text):
    return torch.tensor([char_to_index[char] for char in text], 
                       )

### Instantiate Model

In [56]:
# Instantiate the Transformer model
d_model = 128
num_heads = 4
num_layers = 4
d_ff = 512
max_seq_length = 128
dropout = 0.1

model = Transformer(
    src_vocab_size=n_chars,
    tgt_vocab_size=n_chars,
    d_model=d_model,
    num_heads=num_heads,
    num_layers=num_layers,
    d_ff=d_ff,
    max_seq_length=max_seq_length,
    dropout=dropout
)
model.to(device)

Embedding(100, 128)


Transformer(
  (encoder_embedding): Embedding(100, 128)
  (decoder_embedding): Embedding(100, 128)
  (positional_encoding): PositionalEncoding()
  (encoder_layers): ModuleList(
    (0-3): 4 x EncoderLayer(
      (self_attn): MultiHeadAttention(
        (W_q): Linear(in_features=128, out_features=128, bias=True)
        (W_k): Linear(in_features=128, out_features=128, bias=True)
        (W_v): Linear(in_features=128, out_features=128, bias=True)
        (W_o): Linear(in_features=128, out_features=128, bias=True)
      )
      (feed_forward): PositionWiseFeedForward(
        (fc1): Linear(in_features=128, out_features=512, bias=True)
        (fc2): Linear(in_features=512, out_features=128, bias=True)
        (relu): ReLU()
      )
      (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (decoder_layers): ModuleList(
    (0-3): 4 x DecoderLayer(
     

### Eval step

In [57]:
def eval_step(model, init_seq='W', predicted_len=100):
    # Initialize the hidden state, input, and the predicted sequence.
    init_input = text_to_tensor(init_seq).unsqueeze(0).to(device)
    predicted_seq = init_seq

    # Disable autograd to speed up the evaluation
    with torch.no_grad():
        # Use initial string to "build up" hidden state.
        for t in range(len(init_seq) - 1):
            output = model(init_input[:, :t+1], init_input[:, :t+1])  # Autoregressive prediction

        # Set current input as the last character of the initial string.
        input = init_input[:, -1]

        # Predict more characters after the initial string.
        for t in range(predicted_len):
            # Get the current output.
            output = model(input.unsqueeze(0), input.unsqueeze(0))

            # Sample from the output as a multinomial distribution.
            predicted_index = torch.multinomial(output.view(-1).exp(), 1)[0].item()

            # Add predicted character to the sequence and use it as next input.
            predicted_char = all_chars[predicted_index]
            predicted_seq += predicted_char

            # Use the predicted character to generate the input of the next round.
            input = text_to_tensor(predicted_char).unsqueeze(0).to(device)

    return predicted_seq


In [58]:
def get_random_seq():
    seq_len     = 128  # The length of an input sequence.
    start_index = random.randint(0, file_len - seq_len)
    end_index   = start_index + seq_len
    return file[start_index:end_index], file[start_index+1:end_index+1]

#### Training Procedure

In [59]:
# Define the loss function
criterion = nn.CrossEntropyLoss()

# Instantiate an optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 2
batch_size = 64

for epoch in range(num_epochs):
    for i in range(0, file_len - max_seq_length, batch_size):
        # Prepare batch
        input_text, target_text = get_random_seq()  # Shifted by one character for next character prediction
        input_tensor = text_to_tensor(input_text).to(device)
        target_tensor = text_to_tensor(target_text).to(device)
    
        # Forward pass
        optimizer.zero_grad()
        output = model(input_tensor.unsqueeze(0), target_tensor.unsqueeze(0))

        # Compute loss
        loss = criterion(output.view(-1, n_chars), target_tensor.view(-1))

        # Backward pass
        loss.backward()
        optimizer.step()

        # Print loss
        if (i // batch_size) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i//batch_size+1}/{(file_len-max_seq_length)//batch_size+1}], Loss: {loss.item()}')
            print('generated sequence: {}\n'.format(eval_step(model)))



TypeError: MultiHeadAttention.scaled_dot_product_attention() takes 4 positional arguments but 5 were given

#### Training Loss Curve

In [None]:
plt.xlabel('iters')
plt.ylabel('loss')
all_losses = [loss.item() for loss in all_losses]
plt.plot(np.array(all_losses))
plt.show()

#### Evaluation: A Sample of Generated Sequence

In [None]:
print(eval_step(net, predicted_len=600))