In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchtext
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import random

# Define the encoder
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, hidden = self.rnn(embedded)
        return outputs, hidden


# Define the attention mechanism
class Attention(nn.Module):
    def __init__(self, hid_dim):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hid_dim * 2, hid_dim)
        self.v = nn.Linear(hid_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        src_len = encoder_outputs.shape[0]
        hidden = hidden[-1].unsqueeze(0).repeat(src_len, 1, 1)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        attention = self.v(energy).squeeze(2)
        return nn.functional.softmax(attention, dim=0)

# Define the decoder
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout, attention):
        super(Decoder, self).__init__()
        self.output_dim = output_dim
        self.attention = attention
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim + hid_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(emb_dim + hid_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, encoder_outputs):
        # print("enter decoder forward")
        input = input.unsqueeze(0)  # input shape: [1, batch_size]
        embedded = self.dropout(self.embedding(input))  # embedded shape: [1, batch_size, emb_dim]
        # print("finished embeded")
        # print("hidden shape", hidden.shape)
        # print("encoder_output shape", encoder_outputs.shape)

        a = self.attention(hidden, encoder_outputs)  # a shape: [src_len, batch_size]
        # print("finished attention")
        a = a.unsqueeze(1)  # a shape: [src_len, 1, batch_size]
        encoder_outputs = encoder_outputs.permute(1, 0, 2)  # encoder_outputs shape: [batch_size, src_len, hid_dim]

        weighted = torch.bmm(a.permute(2, 1, 0), encoder_outputs)  # weighted shape: [batch_size, 1, hid_dim]
        weighted = weighted.permute(1, 0, 2)  # weighted shape: [1, batch_size, hid_dim]
        rnn_input = torch.cat((embedded, weighted), dim=2)  # rnn_input shape: [1, batch_size, emb_dim + hid_dim]

        output, hidden = self.rnn(rnn_input, hidden)  # output shape: [1, batch_size, hid_dim]
        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted = weighted.squeeze(0)

        prediction = self.fc_out(torch.cat((output, weighted, embedded), dim=1))  # prediction shape: [batch_size, output_dim]
        return prediction, hidden

# Define the Seq2Seq model
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # print("enter forward")
        trg_len = trg.shape[0]
        batch_size = trg.shape[1]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        encoder_outputs, hidden = self.encoder(src)
        # print("done hidden")
        input = trg[0, :]
        for t in range(1, trg_len):
            # print("before decoder")
            output, hidden = self.decoder(input, hidden, encoder_outputs)
            # print("finished decoder")
            outputs[t] = output
            top1 = output.argmax(1)
            input = trg[t] if random.random() < teacher_forcing_ratio else top1
        return outputs






In [14]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device, vocab, tokenizer):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        self.vocab = vocab  # Add vocab as a class attribute
        self.tokenizer = tokenizer

    def forward(self, src, trg=None, teacher_forcing_ratio=0.5, mode='train'):
        trg_len = trg.shape[0] if trg is not None else 100  # Set a maximum length for inference
        batch_size = src.shape[1]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        
        encoder_outputs, hidden = self.encoder(src)
        input = trg[0, :] if trg is not None else torch.zeros(batch_size, dtype=torch.long).to(self.device)  # Start tokens or zero for inference
        
        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden, encoder_outputs)
            outputs[t] = output
            top1 = output.argmax(1)
            
            if mode == 'train':
                input = trg[t] if random.random() < teacher_forcing_ratio else top1
            else:
                input = top1

            # If at inference and reach <eos> token, break early
            if mode != 'train' and (input == 2).all():  # Assuming 2 is the <eos> token
                break
        
        return outputs

    def train_model(self, dataloader, optimizer, criterion, clip, vocab):
        self.train()
        epoch_loss = 0

        for i, (src, trg) in enumerate(dataloader):
            src = torch.nn.utils.rnn.pad_sequence(src, padding_value=vocab["<pad>"], batch_first=False).to(self.device)
            trg = torch.nn.utils.rnn.pad_sequence(trg, padding_value=vocab["<pad>"], batch_first=False).to(self.device)

            optimizer.zero_grad()

            output = self(src, trg)

            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)

            loss = criterion(output, trg)

            loss.backward()

            torch.nn.utils.clip_grad_norm_(self.parameters(), clip)

            optimizer.step()

            epoch_loss += loss.item()

        return epoch_loss / len(dataloader)

    def sample(self, sentence):
        self.eval()
        tokenizer = self.tokenizer
        tokens = tokenizer(sentence)
        token_indices = [self.vocab[token] for token in tokens]
        src_tensor = torch.tensor(token_indices).unsqueeze(1).to(self.device)  # Shape: [src_len, 1]

        with torch.no_grad():
            outputs = self.forward(src_tensor, mode='eval')
        
        output_indices = outputs.argmax(-1).squeeze().tolist()
        output_tokens = [self.vocab.get_itos()[index] for index in output_indices if index != self.vocab["<pad>"]]

        # Remove everything after the <eos> token
        if '<eos>' in output_tokens:
            output_tokens = output_tokens[:output_tokens.index('<eos>')]

        return ' '.join(output_tokens)

# Example usage
# Assuming model, vocab, and device are already defined


In [15]:
with open('50_idioms.txt') as f:
    idioms = f.read()
    idiomatic_sentences = idioms.split("\n")
    
with open('50_translated_idiom.txt') as f:
    translated = f.read()
    plain_sentences = translated.split("\n")
    
print(idiomatic_sentences)
print(plain_sentences)
idiomatic_sentences = idiomatic_sentences[0:-1]
print(len(idiomatic_sentences), len(plain_sentences))

['just in case', 'a sorry sight', 'rule of thumb', 'carpe diem', 'salad days', 'off the record', 'thank goodness', 'big bucks', 'dog days', 'wet behind the ears', 'just deserts', 'an arm and a leg', 'never mind', 'bricks and mortar', 'close call', 'a sight for sore eyes', 'open warfare', 'pin money', 'third time lucky', 'race against time', 'rain or shine', 'hold on a second', 'next to nothing', 'cheek by jowl', 'black and blue', 'dead wood', 'stranger things have happened', 'pigs might fly', 'poetry in motion', 'dressed up to the nines', 'way around', 'no strings attached', 'a shot in the arm', 'below the salt', 'heart of gold', 'in the pipeline', 'golden age', 'in limbo', 'baptism of fire', 'beyond words', 'on the table', 'in a bad shape', 'on the horns of a dilemma', 'under pressure', 'dead right', 'give me five', 'in the heat of the moment', 'fit as a fiddle', 'charity begins at home', 'syrup of figs', '']
['As a precaution', 'Something pitiful or disappointing to see', 'A general 

In [17]:
from tqdm import tqdm
# Helper function to tokenize and build vocabulary
def yield_tokens(data_iter, tokenizer):
    for text in data_iter:
        yield tokenizer(text)

tokenizer = get_tokenizer('spacy', language='en_core_web_sm')

# Example idiomatic and plain sentence pairs
# idiomatic_sentences = [
#     "Break a leg",
#     "Once in a blue moon",
#     "Spill the beans"
# ]
# plain_sentences = [
#     "Good luck",
#     "Very rarely",
#     "Reveal a secret"
# ]

vocab = build_vocab_from_iterator(yield_tokens(idiomatic_sentences + plain_sentences, tokenizer), specials=["<unk>", "<pad>", "<sos>", "<eos>"])
vocab.set_default_index(vocab["<unk>"])

# Convert sentences to tensors
def sentence_to_tensor(sentence, vocab, tokenizer):
    tokens = tokenizer(sentence)
    indexes = [vocab[token] for token in tokens]
    return torch.tensor([vocab["<sos>"]] + indexes + [vocab["<eos>"]])

# Parameters
INPUT_DIM = len(vocab)
OUTPUT_DIM = len(vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
BATCH_SIZE = 2
N_EPOCHS = 10
CLIP = 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

attn = Attention(HID_DIM)
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT, attn)

model = Seq2Seq(enc, dec, device, vocab, tokenizer).to(device)

optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=vocab["<pad>"])

# Create DataLoader
class TranslationDataset(torch.utils.data.Dataset):
    def __init__(self, src_sentences, trg_sentences, vocab, tokenizer):
        self.src_sentences = src_sentences
        self.trg_sentences = trg_sentences
        self.vocab = vocab
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.src_sentences)

    def __getitem__(self, idx):
        src_tensor = sentence_to_tensor(self.src_sentences[idx], self.vocab, self.tokenizer)
        trg_tensor = sentence_to_tensor(self.trg_sentences[idx], self.vocab, self.tokenizer)
        return src_tensor, trg_tensor

dataset = TranslationDataset(idiomatic_sentences, plain_sentences, vocab, tokenizer)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

# Training function
def train(model, dataloader, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    for i, (src, trg) in enumerate(dataloader):
        src = torch.nn.utils.rnn.pad_sequence(src, padding_value=vocab["<pad>"], batch_first=False).to(device)
        trg = torch.nn.utils.rnn.pad_sequence(trg, padding_value=vocab["<pad>"], batch_first=False).to(device)
        # print('Source Sentence:')
        # for idx in src.transpose(0, 1):
        #     print(' '.join([vocab.get_itos()[i.item()] for i in idx if i != vocab["<pad>"]]))

        # # Print target sentence
        # print('Target Sentence:')
        # for idx in trg.transpose(0, 1):
        #     print(' '.join([vocab.get_itos()[i.item()] for i in idx if i != vocab["<pad>"]]))


        optimizer.zero_grad()

        output = model(src, trg)

        output_dim = output.shape[-1]
        # for b in range(output.size(1)):  # Iterate over batch dimension
        #     pred_indices = output[:, b, :].argmax(dim=-1)
        #     pred_sentence = ' '.join([vocab.get_itos()[idx.item()] for idx in pred_indices])
        #     print("Predicted sentence for batch item", b, ":", pred_sentence)


        output = output[1:].view(-1, output_dim)

        trg = trg[1:].view(-1)

        loss = criterion(output, trg)

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step()

        epoch_loss += loss.item()

        # Convert predicted indices to tokens

    return epoch_loss / len(dataloader)

# Start training
for epoch in tqdm(range(41)):
    # print("start of epoch", epoch, "========================================")
    train_loss = train(model, dataloader, optimizer, criterion, CLIP)
    if epoch %10 == 0:
        print(f'Epoch: {epoch+1:02}, Train Loss: {train_loss:.3f}')
        pass



  2%|▏         | 1/41 [00:00<00:11,  3.39it/s]

Epoch: 01, Train Loss: 4.747


 27%|██▋       | 11/41 [00:03<00:08,  3.55it/s]

Epoch: 11, Train Loss: 0.385


 51%|█████     | 21/41 [00:05<00:05,  3.59it/s]

Epoch: 21, Train Loss: 0.002


 76%|███████▌  | 31/41 [00:08<00:02,  3.62it/s]

Epoch: 31, Train Loss: 0.001


100%|██████████| 41/41 [00:11<00:00,  3.53it/s]

Epoch: 41, Train Loss: 0.000





In [21]:
sentence = idiomatic_sentences[49]
generated_sentence = model.sample(sentence)
print("Generated Sentence:", generated_sentence)


Generated Sentence: <unk> Laxative
