In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import re
import math
import time
import torch.nn.functional as F

## Params

In [2]:
vocab_size = 10000
embedding_dim = 100
hidden_dim = 256
learning_rate = 0.001
batch_size = 64
num_epochs = 5
sequence_length = 10
nhead = 4
num_encoder_layers = 2
dim_feedforward = 512
file_path = 'C:\\Users\\josep\\Downloads\\LTR.txt'

In [3]:
def preprocess_text(text, sequence_length):
    """Preprocess the text into sequences of tokens."""
    # Tokenize text
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text).lower()
    tokens = text.split()
    
    # Create sequences
    sequences = []
    for i in range(len(tokens) - sequence_length):
        seq = tokens[i:i + sequence_length + 1]
        sequences.append(seq)
    
    return sequences

def build_vocab(sequences):
    """Build a vocabulary from the sequences."""
    all_tokens = [token for seq in sequences for token in seq]
    token_counts = Counter(all_tokens)
    vocab = {token: idx for idx, (token, _) in enumerate(token_counts.items(), 1)}
    vocab['<PAD>'] = 0  # Add padding token
    return vocab

def sequences_to_indices(sequences, vocab):
    """Convert sequences of tokens to sequences of indices."""
    return [[vocab[token] for token in seq] for seq in sequences]

def read_text_file(file_path):
    """Read text from a file."""
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text

In [4]:
class LanguageDataset(Dataset):
    
    def __init__(self, sequences):
        self.sequences = sequences
        
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.long)

In [5]:
# Read and preprocess the text file
text = read_text_file(file_path)
sequences = preprocess_text(text, sequence_length)
vocab = build_vocab(sequences)
indexed_sequences = sequences_to_indices(sequences, vocab)

# Adjust vocab_size according to the actual vocabulary size
vocab_size = len(vocab)

# Create the dataset and data loader
dataset = LanguageDataset(indexed_sequences)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

## Perceptron

In [6]:
class PerceptronLanguageModel(nn.Module):
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        
        super(PerceptronLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.fc1 = nn.Linear(embedding_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, vocab_size)
        
    def forward(self, x):
        
        embedded = self.embedding(x)
        batch_size, seq_len, _ = embedded.shape
        embedded = embedded.view(batch_size * seq_len, -1)  # Flatten the sequence dimension
        out = torch.relu(self.fc1(embedded))
        out = self.fc2(out)
        return out.view(batch_size, seq_len, -1)  # Reshape to (batch_size, sequence_length, vocab_size)

In [7]:
def train_model_perceptron(model, data_loader, criterion, optimizer, num_epochs, device):
    """Train the model."""
    model.to(device)
    total_training_time = 0.0
    for epoch in range(num_epochs):
        model.train()
        start_time = time.time()
        total_loss = 0.0
        for batch in data_loader:
            batch = batch.to(device)
            optimizer.zero_grad()
            inputs = batch[:, :-1]
            targets = batch[:, 1:].flatten()  # Flatten targets for loss calculation
            
            outputs = model(inputs).reshape(-1, vocab_size)  # Reshape outputs for loss calculation
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        end_time = time.time()
        epoch_time = end_time - start_time
        total_training_time += epoch_time
        avg_loss = total_loss / len(data_loader)
        print(f'Epoch {epoch+1}, Average Loss: {avg_loss:.4f}, Time: {epoch_time:.2f}s')
    return total_training_time

In [8]:
def generate_sentence_perceptron(model, start_sequence, vocab, reverse_vocab, max_length=20):
    
    model.eval()  # Set the model to evaluation mode
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    # Convert the start_sequence to indices
    sequence = [vocab[word] for word in start_sequence]
    
    # Generate tokens one by one
    for _ in range(max_length - len(start_sequence)):
        inputs = torch.tensor([sequence], dtype=torch.long).to(device)
        with torch.no_grad():
            outputs = model(inputs)
        
        # Get the last token probabilities
        last_token_logits = outputs[0, -1, :]
        next_token_probs = F.softmax(last_token_logits, dim=-1)
        
        # Sample the next token
        next_token = torch.multinomial(next_token_probs, 1).item()
        
        # Add the token to the sequence
        sequence.append(next_token)
        
        # Stop if the end token is generated (optional)
        if reverse_vocab[next_token] == '<END>':
            break

    # Convert indices back to words
    generated_sequence = [reverse_vocab[idx] for idx in sequence]
    
    return ' '.join(generated_sequence)

## RNN

In [9]:
class RNNLanguageModel(nn.Module):
    """Elman RNN language model."""
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1):
        """
        Initialize the language model.

        Args:
            vocab_size (int): Size of the vocabulary.
            embedding_dim (int): Dimension of the embeddings.
            hidden_dim (int): Dimension of the hidden layer.
            num_layers (int): Number of recurrent layers.
        """
        super(RNNLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)
        
    def forward(self, x, hidden):
        """
        Perform a forward pass of the model.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, sequence_length).
            hidden (torch.Tensor): Hidden state for RNN.

        Returns:
            torch.Tensor: Output tensor of shape (batch_size, sequence_length, vocab_size).
            torch.Tensor: Updated hidden state.
        """
        embedded = self.embedding(x)
        rnn_out, hidden = self.rnn(embedded, hidden)
        output = self.fc(rnn_out)
        return output, hidden
    
    def init_hidden(self, batch_size):
        """
        Initialize hidden state.

        Args:
            batch_size (int): Batch size.

        Returns:
            torch.Tensor: Initial hidden state.
        """
        weight = next(self.parameters()).data
        hidden = weight.new(self.rnn.num_layers, batch_size, self.rnn.hidden_size).zero_()
        return hidden



def train_model_RNN(model, data_loader, criterion, optimizer, num_epochs, device):
    """Train the model."""
    model.to(device)
    total_training_time = 0.0
    for epoch in range(num_epochs):
        model.train()
        start_time = time.time()
        total_loss = 0.0
        for batch in data_loader:
            batch = batch.to(device)
            optimizer.zero_grad()
            inputs = batch[:, :-1]
            targets = batch[:, 1:].contiguous().view(-1)  # Flatten targets for loss calculation
            
            hidden = model.init_hidden(inputs.size(0))  # Initialize hidden state
            hidden = hidden.to(device)
            
            outputs, hidden = model(inputs, hidden)
            outputs = outputs.contiguous().view(-1, vocab_size)  # Reshape outputs for loss calculation
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        end_time = time.time()
        epoch_time = end_time - start_time
        total_training_time += epoch_time
        avg_loss = total_loss / len(data_loader)
        print(f'Epoch {epoch+1}, Average Loss: {avg_loss:.4f}')

    return total_training_time

def generate_sentence_RNN(model, start_sequence, vocab, reverse_vocab, max_length=20):
    
    model.eval()  # Set the model to evaluation mode
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    # Convert the start_sequence to indices
    sequence = [vocab[word] for word in start_sequence]
    
    # Initialize hidden state
    hidden = model.init_hidden(1)
    hidden = hidden.to(device)
    
    # Generate tokens one by one
    for _ in range(max_length - len(start_sequence)):
        inputs = torch.tensor([sequence], dtype=torch.long).to(device)
        with torch.no_grad():
            outputs, hidden = model(inputs, hidden)
        
        # Get the last token probabilities
        last_token_logits = outputs[0, -1, :]
        next_token_probs = torch.softmax(last_token_logits, dim=-1)
        
        # Sample the next token
        next_token = torch.multinomial(next_token_probs, 1).item()
        
        # Add the token to the sequence
        sequence.append(next_token)
        
        # Stop if the end token is generated (optional)
        if reverse_vocab[next_token] == '<END>':
            break

    # Convert indices back to words
    generated_sequence = [reverse_vocab[idx] for idx in sequence]
    
    return ' '.join(generated_sequence)

## Transformer

In [10]:
# Positional Encoding class
class PositionalEncoding(nn.Module):
    def __init__(self, embedding_dim, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, embedding_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-math.log(10000.0) / embedding_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x.transpose(0, 1)  # Transformer expects (sequence_length, batch_size, embedding_dim)
        x = x + self.pe[:x.size(0), :]
        return x.transpose(0, 1)  # Revert to (batch_size, sequence_length, embedding_dim)

# Transformer Language Model class
class TransformerLanguageModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, nhead, num_encoder_layers, dim_feedforward, max_seq_length):
        super(TransformerLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim, max_seq_length)
        encoder_layers = nn.TransformerEncoderLayer(embedding_dim, nhead, dim_feedforward)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_encoder_layers)
        self.fc = nn.Linear(embedding_dim, vocab_size)
        self.embedding_dim = embedding_dim
        
    def forward(self, x, src_key_padding_mask=None):
        x = self.embedding(x) * math.sqrt(self.embedding_dim)
        x = self.positional_encoding(x)
        x = x.transpose(0, 1)  # Transformer expects (sequence_length, batch_size, embedding_dim)
        output = self.transformer_encoder(x, src_key_padding_mask=src_key_padding_mask)
        output = output.transpose(0, 1)
        output = self.fc(output)
        return output

# Training function
def train_model_transformer(model, data_loader, criterion, optimizer, num_epochs, device):
    model.to(device)
    total_training_time = 0.0
    for epoch in range(num_epochs):
        model.train()
        start_time = time.time()
        total_loss = 0.0
        for batch in data_loader:
            batch = batch.to(device)
            optimizer.zero_grad()
            inputs = batch[:, :-1]
            targets = batch[:, 1:].contiguous().view(-1)
            
            src_key_padding_mask = (inputs == 0)  # Correct mask shape (batch_size, sequence_length)
            outputs = model(inputs, src_key_padding_mask=src_key_padding_mask)
            outputs = outputs.view(-1, vocab_size)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        end_time = time.time()
        epoch_time = end_time - start_time
        total_training_time += epoch_time
        avg_loss = total_loss / len(data_loader)
        print(f'Epoch {epoch+1}, Average Loss: {avg_loss:.4f}')


    return total_training_time


# Generate sentence function
def generate_sentence_transformer(model, start_sequence, vocab, reverse_vocab, max_length=20):
    model.eval()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    sequence = [vocab[word] for word in start_sequence]
    input_tensor = torch.tensor([sequence], dtype=torch.long).to(device)
    generated_sequence = sequence.copy()
    
    for _ in range(max_length - len(start_sequence)):
        src_key_padding_mask = (input_tensor == 0)
        with torch.no_grad():
            outputs = model(input_tensor, src_key_padding_mask=src_key_padding_mask)
        
        next_token_logits = outputs[0, -1, :]
        next_token_probs = torch.softmax(next_token_logits, dim=-1)
        next_token = torch.multinomial(next_token_probs, 1).item()
        
        if next_token == vocab['<PAD>']:
            break
        
        generated_sequence.append(next_token)
        input_tensor = torch.tensor([generated_sequence[-11:]], dtype=torch.long).to(device)
    
    generated_words = [reverse_vocab[idx] for idx in generated_sequence]
    return ' '.join(generated_words)


## Comparison

In [11]:
def calculate_perplexity(model, data_loader, criterion, device):
    model.eval()
    model.to(device)
    total_loss = 0.0
    total_tokens = 0

    with torch.no_grad():
        for batch in data_loader:
            batch = batch.to(device)
            inputs = batch[:, :-1]
            targets = batch[:, 1:].contiguous().view(-1)

            if isinstance(model, (RNNLanguageModel)):
                hidden = model.init_hidden(inputs.size(0)).to(device)
                outputs, hidden = model(inputs, hidden)
                outputs = outputs.contiguous().view(-1, vocab_size)
            elif isinstance(model, (PerceptronLanguageModel)):
                outputs = model(inputs)
                outputs = outputs.contiguous().view(-1, vocab_size)
            else:
                src_key_padding_mask = (inputs == 0)  # Assumes 0 is the padding index
                outputs = model(inputs, src_key_padding_mask=src_key_padding_mask)
                outputs = outputs.contiguous().view(-1, vocab_size)
            

            loss = criterion(outputs, targets)
            total_loss += loss.item() * targets.size(0)
            total_tokens += targets.size(0)

        average_loss = total_loss / total_tokens
        perplexity = torch.exp(torch.tensor(average_loss)).item()
    return perplexity

## Train & Execution

In [12]:
# Determine the device to be used (CPU or GPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Init models
perceptron_model = PerceptronLanguageModel(vocab_size, embedding_dim, hidden_dim)
rnn_model = RNNLanguageModel(vocab_size, embedding_dim, hidden_dim)
transformer_model = TransformerLanguageModel(vocab_size, embedding_dim, nhead, num_encoder_layers, dim_feedforward, sequence_length + 1)


criterion = nn.CrossEntropyLoss()
perceptron_optimizer = optim.Adam(perceptron_model.parameters(), lr=learning_rate)
rnn_optimizer = optim.Adam(rnn_model.parameters(), lr=learning_rate)
transformer_optimizer = optim.Adam(transformer_model.parameters(), lr=learning_rate)


# Train models
print("Training perceptron model...")
perceptron_training_time = train_model_perceptron(perceptron_model, data_loader, criterion, perceptron_optimizer, num_epochs, device)
perceptron_perplexity = calculate_perplexity(perceptron_model, data_loader, criterion, device)
print(f"Perceptron Model - Training Time: {perceptron_training_time:.2f}s, Perplexity: {perceptron_perplexity:.2f}")

print("Training RNN model...")
rnn_training_time = train_model_RNN(rnn_model, data_loader, criterion, rnn_optimizer, num_epochs, device)
rnn_perplexity = calculate_perplexity(rnn_model, data_loader, criterion, device)
print(f"RNN Model - Training Time: {rnn_training_time:.2f}s, Perplexity: {rnn_perplexity:.2f}")

print("Training Transformer model...")
trans_training_time = train_model_transformer(transformer_model, data_loader, criterion, transformer_optimizer, num_epochs, device)
trans_perplexity = calculate_perplexity(transformer_model, data_loader, criterion, device)
print(f"RNN Model - Training Time: {trans_training_time:.2f}s, Perplexity: {trans_perplexity:.2f}")



Training perceptron model...
Epoch 1, Average Loss: 5.0603, Time: 48.55s
Epoch 2, Average Loss: 4.6168, Time: 50.75s
Epoch 3, Average Loss: 4.5062, Time: 50.35s
Epoch 4, Average Loss: 4.4577, Time: 49.20s
Epoch 5, Average Loss: 4.4326, Time: 51.67s
Perceptron Model - Training Time: 250.51s, Perplexity: 81.18
Training RNN model...
Epoch 1, Average Loss: 4.8669
Epoch 2, Average Loss: 3.9678
Epoch 3, Average Loss: 3.6253
Epoch 4, Average Loss: 3.4213
Epoch 5, Average Loss: 3.2835
RNN Model - Training Time: 307.96s, Perplexity: 23.52
Training Transformer model...


  attn_output = scaled_dot_product_attention(q, k, v, attn_mask, dropout_p, is_causal)


Epoch 1, Average Loss: 3.5218
Epoch 2, Average Loss: 2.3410
Epoch 3, Average Loss: 1.5958
Epoch 4, Average Loss: 1.1692
Epoch 5, Average Loss: 0.9625
RNN Model - Training Time: 720.08s, Perplexity: 1.84


In [13]:
start_sequence = ['the', 'hobbit', 'went']

generated_sentence_perceptron = generate_sentence_perceptron(perceptron_model, start_sequence, vocab, {v: k for k, v in vocab.items()})
print(generated_sentence_perceptron)

generated_sentence_RNN = generate_sentence_RNN(rnn_model, start_sequence, vocab, {v: k for k, v in vocab.items()})
print(generated_sentence_RNN)

generated_sentence = generate_sentence_transformer(transformer_model, start_sequence, vocab, {v: k for k, v in vocab.items()})
print(generated_sentence)

the hobbit went down at the gaffer did this hour after all of the great so i wish to welcome
the hobbit went forward on their way swept up and turned from gandalf forgive them his eyes and the fire
the hobbit went went went went went went went went the bad went straight went on riding on from the


## PreTrained Transformer Model

In [14]:
from transformers import GPT2Tokenizer, GPT2LMHeadModel, Trainer, TrainingArguments
import torch

# Cargar el archivo de texto
with open(file_path, 'r', encoding='utf-8') as file:
    text = file.read()

# Inicializar el tokenizador y el modelo
tokenizerPre = GPT2Tokenizer.from_pretrained('gpt2')
modelPre = GPT2LMHeadModel.from_pretrained('gpt2')

# Tokenizar el texto
inputs = tokenizerPre(text, return_tensors='pt', max_length=512, truncation=True)
input_ids = inputs['input_ids']

# Crear etiquetas (labels) que son los mismos input_ids
labels = input_ids.clone()

class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, input_ids, labels):
        self.input_ids = input_ids
        self.labels = labels

    def __getitem__(self, idx):
        return {'input_ids': self.input_ids[idx], 'labels': self.labels[idx]}

    def __len__(self):
        return len(self.input_ids)

# Crear el conjunto de datos
dataset = CustomDataset(input_ids, labels)

# Configurar los argumentos del entrenamiento
training_args = TrainingArguments(
    output_dir='./results',
    overwrite_output_dir=True,
    num_train_epochs=3,
    per_device_train_batch_size=1,
    save_steps=10_000,
    save_total_limit=2,
)

# Inicializar el entrenador
trainer = Trainer(
    model=modelPre,
    args=training_args,
    train_dataset=dataset,
)

# Entrenar el modelo
trainer.train()

# Generar texto
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
modelPre.to(device)

prompt = "The Hobbit went"
inputs = tokenizerPre(prompt, return_tensors='pt')
inputs = {key: value.to(device) for key, value in inputs.items()}

outputs = modelPre.generate(inputs['input_ids'], max_length=100, num_return_sequences=1)

print(tokenizerPre.decode(outputs[0], skip_special_tokens=True))

  0%|          | 0/3 [00:00<?, ?it/s]

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


{'train_runtime': 0.8214, 'train_samples_per_second': 3.652, 'train_steps_per_second': 3.652, 'train_loss': 2.997549374898275, 'epoch': 3.0}
The Hobbit went on to be the first to-bearer of the Fellowship of the Ring, and the first to be the first to-bearer of the Fellowship of the Ring.

The Fellowship of the Ring was the first to be the first to be the first to-be-be-first to-be-first to-be-first to-be-be-be-be-first to-be-be-first to-be-first to-be-first
