In [1]:
import torch
import torch.nn as nn
import numpy as np
import torch.optim as optim
from collections import Counter
import random

In [2]:
# Reading text file
with open('input_text.txt', 'r') as f:
    text = f.read()

In [3]:
def preprocess(text):
    text = text.lower()
    words = text.split()
    words.append('<eos>')  # Add end-of-sequence token at the end of the text
    
    word_counts = Counter(words)
    sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    vocab_to_int = {word: ii for ii, word in enumerate(sorted_vocab, 1)}
    vocab_to_int['<eos>'] = len(vocab_to_int) + 1  # Add <eos> token
    
    int_to_vocab = {ii: word for word, ii in vocab_to_int.items()}
    
    encoded = [vocab_to_int[word] for word in words]
    return encoded, vocab_to_int, int_to_vocab

In [4]:
# Now preprocess the text again
encoded_text, vocab_to_int, int_to_vocab = preprocess(text)
vocab_size = len(vocab_to_int) + 1

In [5]:
def create_batches(encoded, sequence_length, batch_size):
    total_length = len(encoded)
    n_batches = total_length // (batch_size * sequence_length)
    encoded = encoded[:n_batches * batch_size * sequence_length]
    input_data = np.array(encoded)
    target_data = np.roll(input_data, -1)
    
    inputs = input_data.reshape((batch_size, -1))
    targets = target_data.reshape((batch_size, -1))
    
    return inputs, targets

In [6]:
# Hyperparameters
sequence_length = 10
batch_size = 4

inputs, targets = create_batches(encoded_text, sequence_length, batch_size)

In [7]:
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_size, num_layers, dropout=0.2):
        super(TransformerModel, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = PositionalEncoding(embed_size, dropout)
        
        self.transformer = nn.Transformer(d_model=embed_size, 
                                          nhead=num_heads, 
                                          num_encoder_layers=num_layers, 
                                          num_decoder_layers=num_layers, 
                                          dim_feedforward=hidden_size, 
                                          dropout=dropout, batch_first=True)
        
        self.fc = nn.Linear(embed_size, vocab_size)

    def forward(self, src, tgt):
        src = self.embedding(src) * np.sqrt(src.size(1))
        src = self.positional_encoding(src)
        
        tgt = self.embedding(tgt) * np.sqrt(tgt.size(1))
        tgt = self.positional_encoding(tgt)
        
        out = self.transformer(src, tgt)
        out = self.fc(out)
        return out

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


In [8]:
# Hyperparameters for the model
vocab_size = len(vocab_to_int) + 1
embed_size = 256
hidden_size = 512
num_heads = 8
num_layers = 2
learning_rate = 0.001
epochs = 20

# Instantiate model, loss function and optimizer
model = TransformerModel(vocab_size, embed_size, num_heads, hidden_size, num_layers)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
model.train()
for epoch in range(epochs):
    for i in range(0, inputs.shape[1], sequence_length):
        # Prepare inputs and targets
        input_batch = torch.tensor(inputs[:, i:i + sequence_length], dtype=torch.long)
        target_batch = torch.tensor(targets[:, i:i + sequence_length], dtype=torch.long)
        
        # Reset gradients
        optimizer.zero_grad()
        
        # Forward pass
        output = model(input_batch, target_batch[:, :-1])
        loss = criterion(output.reshape(-1, vocab_size), target_batch[:, 1:].reshape(-1))
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        if i % (sequence_length * 10) == 0:
            print(f'Epoch [{epoch + 1}/{epochs}], Step [{i}/{inputs.shape[1]}], Loss: {loss.item():.4f}')


Epoch [1/20], Step [0/20], Loss: 3.8986
Epoch [2/20], Step [0/20], Loss: 2.9302
Epoch [3/20], Step [0/20], Loss: 2.6717
Epoch [4/20], Step [0/20], Loss: 2.3375
Epoch [5/20], Step [0/20], Loss: 2.1031
Epoch [6/20], Step [0/20], Loss: 1.8486
Epoch [7/20], Step [0/20], Loss: 1.5706
Epoch [8/20], Step [0/20], Loss: 1.3836
Epoch [9/20], Step [0/20], Loss: 1.1733
Epoch [10/20], Step [0/20], Loss: 1.0094
Epoch [11/20], Step [0/20], Loss: 0.9424
Epoch [12/20], Step [0/20], Loss: 0.7804
Epoch [13/20], Step [0/20], Loss: 0.6759
Epoch [14/20], Step [0/20], Loss: 0.5936
Epoch [15/20], Step [0/20], Loss: 0.5737
Epoch [16/20], Step [0/20], Loss: 0.4886
Epoch [17/20], Step [0/20], Loss: 0.4456
Epoch [18/20], Step [0/20], Loss: 0.3968
Epoch [19/20], Step [0/20], Loss: 0.3552
Epoch [20/20], Step [0/20], Loss: 0.3694


In [9]:
def generate_text(model, prime, vocab_to_int, int_to_vocab, max_len=100, top_k=5):
    model.eval()
    input_sequence = [vocab_to_int[word] for word in prime.lower().split()]
    input_sequence = torch.tensor([input_sequence], dtype=torch.long)
    
    generated_sequence = input_sequence.tolist()[0]
    
    for _ in range(max_len):
        tgt_input = torch.tensor([generated_sequence], dtype=torch.long)
        
        with torch.no_grad():
            output = model(input_sequence, tgt_input)
        
        p = torch.nn.functional.softmax(output[:, -1, :], dim=1).data
        p, top_token = p.topk(top_k)
        top_token = top_token.squeeze().cpu().numpy()
        p = p.squeeze().cpu().numpy()
        
        next_token = np.random.choice(top_token, p=p/p.sum())
        generated_sequence.append(next_token)
        
        if next_token == vocab_to_int['<eos>']:
            break
    
    generated_words = [int_to_vocab[token] for token in generated_sequence]
    return ' '.join(generated_words)

In [11]:

# Example of generating text after training
generated_text = generate_text(model, 
                               prime='The bird', 
                               vocab_to_int=vocab_to_int, 
                               int_to_vocab=int_to_vocab, 
                               max_len=100,
                               top_k=2)
print(generated_text)


the bird the bird the bird flew over the bird flew over the bird flew over the bird flew over the bird flew over the bird flew over the bird flew over the bird flew over the bird flew over the bird flew over the bird flew over the bird flew over the tree. the bird flew over the tree. the bird flew over the tree. the bird flew over the bird flew over the tree. the bird liked to play near the tree. the bird flew over the bird flew over the bird flew over the bird flew over the bird
