In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=0.1)
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

class TransformerModel(nn.Module):
    def __init__(self, input_size, target_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
        super(TransformerModel, self).__init__()
        self.input_embed = nn.Embedding(input_size, d_model)
        self.target_embed = nn.Embedding(target_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, batch_first=True)
        self.out = nn.Linear(d_model, target_size)

    def forward(self, src, tgt, src_key_padding_mask=None, tgt_key_padding_mask=None):
        src = self.input_embed(src)
        src = self.positional_encoding(src)
        tgt = self.target_embed(tgt)
        tgt = self.positional_encoding(tgt)
        output = self.transformer(src, tgt, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
        output = self.out(output)
        return output

def train_model(model, dataloader, criterion, optimizer, num_epochs=20, pad_idx=0):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for src_batch, tgt_batch in dataloader:
            src_batch = src_batch.to(next(model.parameters()).device)
            tgt_batch = tgt_batch.to(next(model.parameters()).device)
            
            tgt_input = tgt_batch[:, :-1]
            tgt_output = tgt_batch[:, 1:]
            
            src_key_padding_mask = (src_batch == pad_idx)
            tgt_key_padding_mask = (tgt_input == pad_idx)
            
            optimizer.zero_grad()
            output = model(src_batch, tgt_input, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
            output = output.reshape(-1, output.shape[-1])
            tgt_output = tgt_output.reshape(-1)
            loss = criterion(output, tgt_output)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        average_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {average_loss:.4f}")

def generate_sequence_greedy(model, src_input, start_token_id, max_length, end_token_id, pad_idx=0):
    model.eval()
    src = torch.tensor([src_input], dtype=torch.long).to(next(model.parameters()).device)
    tgt_input = [start_token_id]  # Start with the start token

    for i in range(max_length):
        tgt = torch.tensor([tgt_input], dtype=torch.long).to(next(model.parameters()).device)
        src_key_padding_mask = (src == pad_idx)
        tgt_key_padding_mask = (tgt == pad_idx)

        with torch.no_grad():
            output = model(src, tgt, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
        
        next_token = output.argmax(-1)[:, -1].item()  # Take the most likely next token
        tgt_input.append(next_token)
        if next_token == end_token_id:  # Stop if the end token is generated
            break

    return tgt_input

def decode_sequence(sequence, vocab):
    return ' '.join([vocab[idx] for idx in sequence if idx not in (0, 1, 2)])  # Exclude padding, start, and end tokens

# Example Dataset
class TranslationDataset(Dataset):
    def __init__(self, src_sentences, tgt_sentences, src_vocab, tgt_vocab, max_len=10):
        self.src_sentences = src_sentences
        self.tgt_sentences = tgt_sentences
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.src_sentences)

    def __getitem__(self, idx):
        src = self.src_sentences[idx]
        tgt = self.tgt_sentences[idx]

        src_ids = [self.src_vocab.get(token, self.src_vocab['<unk>']) for token in src.split()]
        tgt_ids = [self.tgt_vocab.get(token, self.tgt_vocab['<unk>']) for token in tgt.split()]

        # Pad sequences to max_len
        src_ids = src_ids[:self.max_len] + [self.src_vocab['<pad>']] * (self.max_len - len(src_ids))
        tgt_ids = tgt_ids[:self.max_len] + [self.tgt_vocab['<pad>']] * (self.max_len - len(tgt_ids))

        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(tgt_ids, dtype=torch.long)




In [7]:
# Hyperparameters
input_size = 16  # Updated to match the vocabulary size
target_size = 16  # Updated to match the vocabulary size
d_model = 512
nhead = 8
num_encoder_layers = 3
num_decoder_layers = 3
dim_feedforward = 2048
dropout = 0.1
num_epochs = 20
learning_rate = 0.0001
batch_size = 32
pad_idx = 0  # Define pad_idx before it's used

# Define your vocabularies
src_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3, 'I': 4, 'am': 5, 'a': 6, 'student': 7, 'You': 8, 'are': 9, 'teacher': 10, 'He': 11, 'is': 12, 'doctor': 13, 'She': 14, 'nurse': 15}
tgt_vocab = {0: '<pad>', 1: '<sos>', 2: '<eos>', 3: '<unk>', 4: 'Je', 5: 'suis', 6: 'un', 7: 'étudiant', 8: 'Vous', 9: 'êtes', 10: 'enseignant', 11: 'Il', 12: 'est', 13: 'médecin', 14: 'Elle', 15: 'infirmière'}

# Invert the target vocabulary to decode sequences
idx_to_tgt_vocab = {v: k for k, v in tgt_vocab.items()}

# Example sentences
src_sentences = ["I am a student", "You are a teacher", "He is a doctor", "She is a nurse"] * 250  # Replace with actual sentences
tgt_sentences = ["Je suis un étudiant", "Vous êtes un enseignant", "Il est un médecin", "Elle est une infirmière"] * 250  # Replace with actual sentences



# Create Dataset and DataLoader
dataset = TranslationDataset(src_sentences, tgt_sentences, src_vocab, idx_to_tgt_vocab)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model, optimizer, and loss function
model = TransformerModel(len(src_vocab), len(idx_to_tgt_vocab), d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout).to('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

# Train the model
train_model(model, dataloader, criterion, optimizer, num_epochs, pad_idx)

Epoch 1/20, Loss: 0.4094
Epoch 2/20, Loss: 0.0034
Epoch 3/20, Loss: 0.0016
Epoch 4/20, Loss: 0.0012
Epoch 5/20, Loss: 0.0010
Epoch 6/20, Loss: 0.0009
Epoch 7/20, Loss: 0.0008
Epoch 8/20, Loss: 0.0007
Epoch 9/20, Loss: 0.0006
Epoch 10/20, Loss: 0.0005
Epoch 11/20, Loss: 0.0005
Epoch 12/20, Loss: 0.0004
Epoch 13/20, Loss: 0.0004
Epoch 14/20, Loss: 0.0004
Epoch 15/20, Loss: 0.0003
Epoch 16/20, Loss: 0.0003
Epoch 17/20, Loss: 0.0003
Epoch 18/20, Loss: 0.0003
Epoch 19/20, Loss: 0.0003
Epoch 20/20, Loss: 0.0002


In [8]:
# Example usage
start_token_id = 1  # Start token ID, defined according to your vocabulary
end_token_id = 2    # End token ID, defined according to your vocabulary
max_length = 2     # Maximum length of the generated sequence

# Example source input with padding
src_input = [src_vocab[token] for token in "I am a student".split()] + [src_vocab['<pad>']] * (10 - len("I am a student".split()))

# Generate a sequence
generated_sequence = generate_sequence_greedy(model, src_input, start_token_id, max_length, end_token_id, pad_idx)
print(generated_sequence)

# Decode the generated sequence
decoded_sequence = decode_sequence(generated_sequence, tgt_vocab)
print(decoded_sequence)


[1, 6, 7]
un étudiant


In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=0.1)
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

class TransformerModel(nn.Module):
    def __init__(self, input_size, target_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
        super(TransformerModel, self).__init__()
        self.input_embed = nn.Embedding(input_size, d_model)
        self.target_embed = nn.Embedding(target_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, batch_first=True)
        self.out = nn.Linear(d_model, target_size)

    def forward(self, src, tgt, src_key_padding_mask=None, tgt_key_padding_mask=None):
        src = self.input_embed(src)
        src = self.positional_encoding(src)
        tgt = self.target_embed(tgt)
        tgt = self.positional_encoding(tgt)
        output = self.transformer(src, tgt, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
        output = self.out(output)
        return output

def train_model(model, dataloader, criterion, optimizer, num_epochs=20, pad_idx=0):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for src_batch, tgt_batch in dataloader:
            src_batch = src_batch.to(next(model.parameters()).device)
            tgt_batch = tgt_batch.to(next(model.parameters()).device)
            
            tgt_input = tgt_batch[:, :-1]
            tgt_output = tgt_batch[:, 1:]
            
            src_key_padding_mask = (src_batch == pad_idx)
            tgt_key_padding_mask = (tgt_input == pad_idx)
            
            optimizer.zero_grad()
            output = model(src_batch, tgt_input, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
            output = output.reshape(-1, output.shape[-1])
            tgt_output = tgt_output.reshape(-1)
            loss = criterion(output, tgt_output)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        average_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {average_loss:.4f}")

def generate_sequence_greedy(model, src_input, start_token_id, max_length, end_token_id, pad_idx=0):
    model.eval()
    src = torch.tensor([src_input], dtype=torch.long).to(next(model.parameters()).device)
    tgt_input = [start_token_id]  # Start with the start token

    for i in range(max_length):  # Loop for max_length steps
        tgt = torch.tensor([tgt_input], dtype=torch.long).to(next(model.parameters()).device)
        src_key_padding_mask = (src == pad_idx)
        tgt_key_padding_mask = (tgt == pad_idx)

        with torch.no_grad():
            output = model(src, tgt, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
        
        next_token = output.argmax(-1)[:, -1].item()  # Take the most likely next token
        tgt_input.append(next_token)
        if next_token == end_token_id:  # Stop if the end token is generated
            break

    return tgt_input

def decode_sequence(sequence, vocab):
    return ' '.join([vocab[idx] for idx in sequence if idx not in (0, 1, 2)])  # Exclude padding, start, and end tokens

# Define your vocabularies
src_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3, 'I': 4, 'am': 5, 'a': 6, 'student': 7, 'You': 8, 'are': 9, 'teacher': 10, 'He': 11, 'is': 12, 'doctor': 13, 'She': 14, 'nurse': 15}
tgt_vocab = {0: '<pad>', 1: '<sos>', 2: '<eos>', 3: '<unk>', 4: 'Je', 5: 'suis', 6: 'un', 7: 'étudiant', 8: 'Vous', 9: 'êtes', 10: 'enseignant', 11: 'Il', 12: 'est', 13: 'médecin', 14: 'Elle', 15: 'infirmière'}

# Invert the target vocabulary to decode sequences
idx_to_tgt_vocab = {v: k for k, v in tgt_vocab.items()}

# Example Dataset
class TranslationDataset(Dataset):
    def __init__(self, src_sentences, tgt_sentences, src_vocab, tgt_vocab, max_len=10):
        self.src_sentences = src_sentences
        self.tgt_sentences = tgt_sentences
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.src_sentences)

    def __getitem__(self, idx):
        src = self.src_sentences[idx]
        tgt = self.tgt_sentences[idx]

        src_ids = [self.src_vocab.get(token, self.src_vocab['<unk>']) for token in src.split()]
        tgt_ids = [self.tgt_vocab.get(token, self.tgt_vocab['<unk>']) for token in tgt.split()]

        # Pad sequences to max_len
        src_ids = src_ids[:self.max_len] + [self.src_vocab['<pad>']] * (self.max_len - len(src_ids))
        tgt_ids = tgt_ids[:self.max_len] + [self.tgt_vocab['<pad>']] * (self.max_len - len(tgt_ids))

        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(tgt_ids, dtype=torch.long)


# Hyperparameters
input_size = 16  # Updated to match the vocabulary size
target_size = 16  # Updated to match the vocabulary size
d_model = 512
nhead = 8
num_encoder_layers = 3
num_decoder_layers = 3
dim_feedforward = 2048
dropout = 0.1
num_epochs = 20
learning_rate = 0.0001
batch_size = 32
pad_idx = 0  # Define pad_idx before it's used

# Example sentences
src_sentences = ["I am a student", "You are a teacher", "He is a doctor", "She is a nurse"] * 250  # Replace with actual sentences
tgt_sentences = ["Je suis un étudiant", "Vous êtes un enseignant", "Il est un médecin", "Elle est une infirmière"] * 250  # Replace with actual sentences

# Create Dataset and DataLoader
dataset = TranslationDataset(src_sentences, tgt_sentences, src_vocab, idx_to_tgt_vocab)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model, optimizer, and loss function
model = TransformerModel(len(src_vocab), len(idx_to_tgt_vocab), d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout).to('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

# Train the model
train_model(model, dataloader, criterion, optimizer, num_epochs, pad_idx)


Epoch 1/20, Loss: 0.4239
Epoch 2/20, Loss: 0.0033
Epoch 3/20, Loss: 0.0016
Epoch 4/20, Loss: 0.0012
Epoch 5/20, Loss: 0.0010
Epoch 6/20, Loss: 0.0008
Epoch 7/20, Loss: 0.0007
Epoch 8/20, Loss: 0.0006
Epoch 9/20, Loss: 0.0006
Epoch 10/20, Loss: 0.0005
Epoch 11/20, Loss: 0.0004
Epoch 12/20, Loss: 0.0004
Epoch 13/20, Loss: 0.0004
Epoch 14/20, Loss: 0.0003
Epoch 15/20, Loss: 0.0003
Epoch 16/20, Loss: 0.0003
Epoch 17/20, Loss: 0.0003
Epoch 18/20, Loss: 0.0002
Epoch 19/20, Loss: 0.0002
Epoch 20/20, Loss: 0.0002


In [10]:
# Example usage
start_token_id = 1  # Start token ID, defined according to your vocabulary
end_token_id = 2    # End token ID, defined according to your vocabulary
max_length = 3     # Maximum length of the generated sequence

# Example source input with padding
src_input = [src_vocab[token] for token in "I am a student".split()] + [src_vocab['<pad>']] * (10 - len("I am a student".split()))
print(f"src_input: {src_input}")

# Generate a sequence
generated_sequence = generate_sequence_greedy(model, src_input, start_token_id, max_length, end_token_id, pad_idx)
print(f"generated_sequence: {generated_sequence}")

# Decode the generated sequence
decoded_sequence = decode_sequence(generated_sequence, tgt_vocab)
print(f"decoded_sequence: {decoded_sequence}")


src_input: [4, 5, 6, 7, 0, 0, 0, 0, 0, 0]
generated_sequence: [1, 6, 7, 6]
decoded_sequence: un étudiant un


In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=0.1)
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

class TransformerModel(nn.Module):
    def __init__(self, input_size, target_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
        super(TransformerModel, self).__init__()
        self.input_embed = nn.Embedding(input_size, d_model)
        self.target_embed = nn.Embedding(target_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, batch_first=True)
        self.out = nn.Linear(d_model, target_size)

    def forward(self, src, tgt, src_key_padding_mask=None, tgt_key_padding_mask=None):
        src = self.input_embed(src)
        src = self.positional_encoding(src)
        tgt = self.target_embed(tgt)
        tgt = self.positional_encoding(tgt)
        output = self.transformer(src, tgt, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
        output = self.out(output)
        return output

def train_model(model, dataloader, criterion, optimizer, num_epochs=20, pad_idx=0):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for src_batch, tgt_batch in dataloader:
            src_batch = src_batch.to(next(model.parameters()).device)
            tgt_batch = tgt_batch.to(next(model.parameters()).device)
            
            tgt_input = tgt_batch[:, :-1]
            tgt_output = tgt_batch[:, 1:]
            
            src_key_padding_mask = (src_batch == pad_idx)
            tgt_key_padding_mask = (tgt_input == pad_idx)
            
            optimizer.zero_grad()
            output = model(src_batch, tgt_input, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
            output = output.reshape(-1, output.shape[-1])
            tgt_output = tgt_output.reshape(-1)
            loss = criterion(output, tgt_output)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        average_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {average_loss:.4f}")

def generate_sequence_greedy(model, src_input, start_token_id, max_length, end_token_id, pad_idx=0):
    model.eval()
    src = torch.tensor([src_input], dtype=torch.long).to(next(model.parameters()).device)
    tgt_input = [start_token_id]  # Start with the start token

    for i in range(max_length):  # Loop for max_length steps
        tgt = torch.tensor([tgt_input], dtype=torch.long).to(next(model.parameters()).device)
        src_key_padding_mask = (src == pad_idx)
        tgt_key_padding_mask = (tgt == pad_idx)

        with torch.no_grad():
            output = model(src, tgt, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
        
        next_token = output.argmax(-1)[:, -1].item()  # Take the most likely next token
        tgt_input.append(next_token)
        if next_token == end_token_id:  # Stop if the end token is generated
            break

    return tgt_input

def decode_sequence(sequence, vocab):
    return ' '.join([vocab[idx] for idx in sequence if idx not in (0, 1, 2)])  # Exclude padding, start, and end tokens

# Define your vocabularies
src_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3, 'I': 4, 'am': 5, 'a': 6, 'student': 7, 'You': 8, 'are': 9, 'teacher': 10, 'He': 11, 'is': 12, 'doctor': 13, 'She': 14, 'nurse': 15}
tgt_vocab = {0: '<pad>', 1: '<sos>', 2: '<eos>', 3: '<unk>', 4: 'Je', 5: 'suis', 6: 'un', 7: 'étudiant', 8: 'Vous', 9: 'êtes', 10: 'enseignant', 11: 'Il', 12: 'est', 13: 'médecin', 14: 'Elle', 15: 'infirmière'}

# Invert the target vocabulary to decode sequences
idx_to_tgt_vocab = {v: k for k, v in tgt_vocab.items()}

# Example Dataset
class TranslationDataset(Dataset):
    def __init__(self, src_sentences, tgt_sentences, src_vocab, tgt_vocab, max_len=10):
        self.src_sentences = src_sentences
        self.tgt_sentences = tgt_sentences
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.src_sentences)

    def __getitem__(self, idx):
        src = self.src_sentences[idx]
        tgt = self.tgt_sentences[idx]

        src_ids = [self.src_vocab.get(token, self.src_vocab['<unk>']) for token in src.split()]
        tgt_ids = [self.tgt_vocab.get(token, self.tgt_vocab['<unk>']) for token in tgt.split()]

        # Pad sequences to max_len
        src_ids = src_ids[:self.max_len] + [self.src_vocab['<pad>']] * (self.max_len - len(src_ids))
        tgt_ids = tgt_ids[:self.max_len] + [self.tgt_vocab['<pad>']] * (self.max_len - len(tgt_ids))

        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(tgt_ids, dtype=torch.long)

# Example usage
start_token_id = 1  # Start token ID, defined according to your vocabulary
end_token_id = 2    # End token ID, defined according to your vocabulary
max_length = 10     # Maximum length of the generated sequence

# Hyperparameters
input_size = 16  # Updated to match the vocabulary size
target_size = 16  # Updated to match the vocabulary size
d_model = 512
nhead = 8
num_encoder_layers = 3
num_decoder_layers = 3
dim_feedforward = 2048
dropout = 0.1
num_epochs = 20
learning_rate = 0.0001
batch_size = 32
pad_idx = 0  # Define pad_idx before it's used

# Example sentences
src_sentences = ["I am a student", "You are a teacher", "He is a doctor", "She is a nurse"] * 250  # Replace with actual sentences
tgt_sentences = ["Je suis un étudiant", "Vous êtes un enseignant", "Il est un médecin", "Elle est une infirmière"] * 250  # Replace with actual sentences

# Create Dataset and DataLoader
dataset = TranslationDataset(src_sentences, tgt_sentences, src_vocab, idx_to_tgt_vocab)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model, optimizer, and loss function
model = TransformerModel(len(src_vocab), len(idx_to_tgt_vocab), d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout).to('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

# Train the model
train_model(model, dataloader, criterion, optimizer, num_epochs, pad_idx)

# Example source input with padding
src_input = [src_vocab[token] for token in "I am a student".split()] + [src_vocab['<pad>']] * (10 - len("I am a student".split()))
print(f"src_input: {src_input}")

# Generate a sequence
generated_sequence = generate_sequence_greedy(model, src_input, start_token_id, max_length, end_token_id, pad_idx)
print(f"generated_sequence: {generated_sequence}")

# Decode the generated sequence
decoded_sequence = decode_sequence(generated_sequence, tgt_vocab)
print(f"decoded_sequence: {decoded_sequence}")


Epoch 1/20, Loss: 0.3709
Epoch 2/20, Loss: 0.0029
Epoch 3/20, Loss: 0.0014
Epoch 4/20, Loss: 0.0011
Epoch 5/20, Loss: 0.0009
Epoch 6/20, Loss: 0.0008
Epoch 7/20, Loss: 0.0007
Epoch 8/20, Loss: 0.0006
Epoch 9/20, Loss: 0.0006
Epoch 10/20, Loss: 0.0005
Epoch 11/20, Loss: 0.0004
Epoch 12/20, Loss: 0.0004
Epoch 13/20, Loss: 0.0004
Epoch 14/20, Loss: 0.0003
Epoch 15/20, Loss: 0.0003
Epoch 16/20, Loss: 0.0003
Epoch 17/20, Loss: 0.0003
Epoch 18/20, Loss: 0.0003
Epoch 19/20, Loss: 0.0002
Epoch 20/20, Loss: 0.0002
src_input: [4, 5, 6, 7, 0, 0, 0, 0, 0, 0]
generated_sequence: [1, 5, 6, 7, 6, 7, 6, 7, 6, 7, 6]
decoded_sequence: suis un étudiant un étudiant un étudiant un étudiant un


In [12]:
# Example usage
start_token_id = 1  # Start token ID, defined according to your vocabulary
end_token_id = 2    # End token ID, defined according to your vocabulary
max_length = 12     # Maximum length of the generated sequence


# Example source input with padding
src_input = [src_vocab[token] for token in "I am a student".split()] + [src_vocab['<pad>']] * (10 - len("I am a student".split()))
print(f"src_input: {src_input}")

# Generate a sequence
generated_sequence = generate_sequence_greedy(model, src_input, start_token_id, max_length, end_token_id, pad_idx)
print(f"generated_sequence: {generated_sequence}")

# Decode the generated sequence
decoded_sequence = decode_sequence(generated_sequence, tgt_vocab)
print(f"decoded_sequence: {decoded_sequence}")

src_input: [4, 5, 6, 7, 0, 0, 0, 0, 0, 0]
generated_sequence: [1, 5, 6, 7, 6, 7, 6, 7, 6, 7, 6, 7, 6]
decoded_sequence: suis un étudiant un étudiant un étudiant un étudiant un étudiant un


In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=0.1)
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

class TransformerModel(nn.Module):
    def __init__(self, input_size, target_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout):
        super(TransformerModel, self).__init__()
        self.input_embed = nn.Embedding(input_size, d_model)
        self.target_embed = nn.Embedding(target_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout, batch_first=True)
        self.out = nn.Linear(d_model, target_size)

    def forward(self, src, tgt, src_key_padding_mask=None, tgt_key_padding_mask=None):
        src = self.input_embed(src)
        src = self.positional_encoding(src)
        tgt = self.target_embed(tgt)
        tgt = self.positional_encoding(tgt)
        output = self.transformer(src, tgt, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
        output = self.out(output)
        return output

def train_model(model, dataloader, criterion, optimizer, num_epochs=20, pad_idx=0):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for src_batch, tgt_batch in dataloader:
            src_batch = src_batch.to(next(model.parameters()).device)
            tgt_batch = tgt_batch.to(next(model.parameters()).device)
            
            tgt_input = tgt_batch[:, :-1]
            tgt_output = tgt_batch[:, 1:]
            
            src_key_padding_mask = (src_batch == pad_idx)
            tgt_key_padding_mask = (tgt_input == pad_idx)
            
            optimizer.zero_grad()
            output = model(src_batch, tgt_input, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
            output = output.reshape(-1, output.shape[-1])
            tgt_output = tgt_output.reshape(-1)
            loss = criterion(output, tgt_output)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        average_loss = total_loss / len(dataloader)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {average_loss:.4f}")

def generate_sequence_greedy(model, src_input, start_token_id, max_length, end_token_id, pad_idx=0):
    model.eval()
    src = torch.tensor([src_input], dtype=torch.long).to(next(model.parameters()).device)
    tgt_input = [start_token_id]  # Start with the start token

    for i in range(max_length):  # Loop for max_length steps
        tgt = torch.tensor([tgt_input], dtype=torch.long).to(next(model.parameters()).device)
        src_key_padding_mask = (src == pad_idx)
        tgt_key_padding_mask = (tgt == pad_idx)

        with torch.no_grad():
            output = model(src, tgt, src_key_padding_mask=src_key_padding_mask, tgt_key_padding_mask=tgt_key_padding_mask)
        
        next_token = output.argmax(-1)[:, -1].item()  # Take the most likely next token
        tgt_input.append(next_token)
        if next_token == end_token_id:  # Stop if the end token is generated
            break

    return tgt_input

def decode_sequence(sequence, vocab):
    return ' '.join([vocab[idx] for idx in sequence if idx not in (0, 1, 2)])  # Exclude padding, start, and end tokens

# Define your vocabularies
src_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3, 'I': 4, 'am': 5, 'a': 6, 'student': 7, 'You': 8, 'are': 9, 'teacher': 10, 'He': 11, 'is': 12, 'doctor': 13, 'She': 14, 'nurse': 15}
tgt_vocab = {0: '<pad>', 1: '<sos>', 2: '<eos>', 3: '<unk>', 4: 'Je', 5: 'suis', 6: 'un', 7: 'étudiant', 8: 'Vous', 9: 'êtes', 10: 'enseignant', 11: 'Il', 12: 'est', 13: 'médecin', 14: 'Elle', 15: 'infirmière'}

# Invert the target vocabulary to decode sequences
idx_to_tgt_vocab = {v: k for k, v in tgt_vocab.items()}

# Example Dataset
class TranslationDataset(Dataset):
    def __init__(self, src_sentences, tgt_sentences, src_vocab, tgt_vocab, max_len=10):
        self.src_sentences = src_sentences
        self.tgt_sentences = tgt_sentences
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.src_sentences)

    def __getitem__(self, idx):
        src = self.src_sentences[idx]
        tgt = self.tgt_sentences[idx]

        src_ids = [self.src_vocab.get(token, self.src_vocab['<unk>']) for token in src.split()]
        tgt_ids = [self.tgt_vocab.get(token, self.tgt_vocab['<unk>']) for token in tgt.split()]

        # Pad sequences to max_len
        src_ids = src_ids[:self.max_len] + [self.src_vocab['<pad>']] * (self.max_len - len(src_ids))
        tgt_ids = tgt_ids[:self.max_len] + [self.tgt_vocab['<pad>']] * (self.max_len - len(tgt_ids))

        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(tgt_ids, dtype=torch.long)

# Example usage
start_token_id = 1  # Start token ID, defined according to your vocabulary
end_token_id = 2    # End token ID, defined according to your vocabulary
max_length = 10     # Maximum length of the generated sequence

# Hyperparameters
input_size = 16  # Updated to match the vocabulary size
target_size = 16  # Updated to match the vocabulary size
d_model = 512
nhead = 8
num_encoder_layers = 3
num_decoder_layers = 3
dim_feedforward = 2048
dropout = 0.1
num_epochs = 20
learning_rate = 0.0001
batch_size = 32
pad_idx = 0  # Define pad_idx before it's used

# Example sentences
src_sentences = ["I am a student", "You are a teacher", "He is a doctor", "She is a nurse"] * 250  # Replace with actual sentences
tgt_sentences = ["Je suis un étudiant", "Vous êtes un enseignant", "Il est un médecin", "Elle est une infirmière"] * 250  # Replace with actual sentences

# Create Dataset and DataLoader
dataset = TranslationDataset(src_sentences, tgt_sentences, src_vocab, idx_to_tgt_vocab)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model, optimizer, and loss function
model = TransformerModel(len(src_vocab), len(idx_to_tgt_vocab), d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout).to('cuda' if torch.cuda.is_available() else 'cpu')
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

# Train the model
train_model(model, dataloader, criterion, optimizer, num_epochs, pad_idx)

# Example source input with padding
src_input = [src_vocab[token] for token in "I am a student".split()] + [src_vocab['<pad>']] * (10 - len("I am a student".split()))
print(f"src_input: {src_input}")

# Generate a sequence
generated_sequence = generate_sequence_greedy(model, src_input, start_token_id, max_length, end_token_id, pad_idx)
print(f"generated_sequence: {generated_sequence}")

# Decode the generated sequence
decoded_sequence = decode_sequence(generated_sequence, tgt_vocab)
print(f"decoded_sequence: {decoded_sequence}")


Epoch 1/20, Loss: 0.4620
Epoch 2/20, Loss: 0.0032
Epoch 3/20, Loss: 0.0014
Epoch 4/20, Loss: 0.0011
Epoch 5/20, Loss: 0.0009
Epoch 6/20, Loss: 0.0008
Epoch 7/20, Loss: 0.0007
Epoch 8/20, Loss: 0.0006
Epoch 9/20, Loss: 0.0005
Epoch 10/20, Loss: 0.0005
Epoch 11/20, Loss: 0.0004
Epoch 12/20, Loss: 0.0004
Epoch 13/20, Loss: 0.0004
Epoch 14/20, Loss: 0.0003
Epoch 15/20, Loss: 0.0003
Epoch 16/20, Loss: 0.0003
Epoch 17/20, Loss: 0.0003
Epoch 18/20, Loss: 0.0003
Epoch 19/20, Loss: 0.0002
Epoch 20/20, Loss: 0.0002
src_input: [4, 5, 6, 7, 0, 0, 0, 0, 0, 0]
generated_sequence: [1, 6, 7, 6, 7, 6, 7, 6, 7, 6, 7]
decoded_sequence: un étudiant un étudiant un étudiant un étudiant un étudiant
