In [None]:
pip uninstall torch torchtext

In [None]:
pip install torch==2.0.1 torchtext==0.15.2


In [None]:
import torch
import torchtext
from torchtext.data import Field, BucketIterator
# Define the fields
SRC = Field(tokenize="spacy", tokenizer_language="en_core_web_sm",
init_token="<sos>", eos_token="<eos>", lower=True)
TRG = Field(tokenize="spacy", tokenizer_language="de_core_news_sm",
init_token="<sos>", eos_token="<eos>", lower=True)
# Load the dataset (using Multi30k for demonstration)
train_data, valid_data, test_data = torchtext.datasets.Multi30k.splits(exts=(".en", ".de"), fields=(SRC, TRG))

In [None]:
# Build vocab
SRC.build_vocab(train_data, min_freq=2)
TRG.build_vocab(train_data, min_freq=2)
# Create iterators
BATCH_SIZE = 32
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
(train_data, valid_data, test_data), batch_size=BATCH_SIZE,
device=device
)

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, n_layers, dropout):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout,batch_first=True)
        self.dropout = nn.Dropout(dropout)
    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.rnn(embedded)
return hidden, cell

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, n_layers, dropout):
        super(Decoder, self).__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout,batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        def forward(self, input, hidden, cell):
            input = input.unsqueeze(1) # Add batch dimension (for 1 timestep)
            embedded = self.dropout(self.embedding(input))output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
            prediction = self.fc_out(output.squeeze(1))
return prediction, hidden, cell

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device, trg_pad_idx, max_length=100):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        self.trg_pad_idx = trg_pad_idx
        self.max_length = max_length
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.output_dim
        # Tensor to hold the decoder outputs
        outputs = torch.zeros(batch_size, trg_len,trg_vocab_size).to(self.device)
        # Encoder forward pass
        hidden, cell = self.encoder(src)
        # First input to the decoder is the <sos> token (start-of-sequencetoken)
        input = trg[:, 0]
        # Decoding
    for t in range(1, trg_len):
        output, hidden, cell = self.decoder(input, hidden, cell)
        outputs[:, t] = output
        # Get the highest predicted token for the next time step
        top1 = output.argmax(1)
        # Decide if we are going to use teacher forcing or not
        input = trg[:, t] if torch.rand(1).item() < teacher_forcing_ratio 
    else
        top1
return outputs

In [None]:
# Hyperparameters
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
EMBEDDING_DIM = 256
HIDDEN_DIM = 512
N_LAYERS = 2
DROPOUT = 0.5
TRG_PAD_IDX = TRG.vocab.stoi['<pad>']
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the encoder, decoder, and seq2seq model
encoder = Encoder(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, N_LAYERS, DROPOUT).to(device)
decoder = Decoder(OUTPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, N_LAYERS, DROPOUT).to(device)
model = Seq2Seq(encoder, decoder, device, TRG_PAD_IDX).to(device)

# Optimizer and loss function
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX)

# Training loop
n_epochs = 10
for epoch in range(n_epochs):
    model.train()
    epoch_loss = 0

    for batch in train_iterator:
        src = batch.src.to(device)
        trg = batch.trg.to(device)

        optimizer.zero_grad()

        # Forward pass
        output = model(src, trg)

        # Flatten the output and target tensors
        output_dim = output.shape[-1]
        output = output.view(-1, output_dim)
        trg = trg.view(-1)

        # Compute the loss
        loss = criterion(output, trg)
        loss.backward()

        # Update the parameters
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch + 1}, Loss: {epoch_loss / len(train_iterator):.4f}")


In [None]:
def translate_sentence(sentence, model, src_vocab, trg_vocab, device, max_length=100):
    model.eval()

    # Tokenize and convert the sentence to indices
    tokens = [token.lower() for token in sentence.split()]
    src_indices = [src_vocab.stoi[token] for token in tokens]
    src_tensor = torch.tensor(src_indices).unsqueeze(0).to(device)

    # Get encoder hidden and cell states
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

    # Start with the <sos> token
    trg_indices = [trg_vocab.stoi['<sos>']]

    # Decode the sentence
    for t in range(max_length):
        trg_tensor = torch.tensor([trg_indices[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)

        pred_token = output.argmax(1).item()
        trg_indices.append(pred_token)

        # Stop if <eos> is predicted
        if pred_token == trg_vocab.stoi['<eos>']:
            break

    # Convert indices back to tokens
    trg_tokens = [trg_vocab.itos[idx] for idx in trg_indices]

    return trg_tokens[1:]  # exclude <sos>
