In [None]:
!pip install sentencepiece --upgrade



In [1]:
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence
import sentencepiece as spm
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
import numpy as np


In [2]:

# Parameters
BATCH_SIZE = 64
EMBED_SIZE = 256
HIDDEN_SIZE = 512
NUM_LAYERS = 1
LR = 0.001
VOCAB_SIZE = 8000       # subword vocab size for each language
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [3]:

# 1. Load corpus
data = pd.read_csv('sentences_with_audio_names.csv')  # columns: 'src','tgt', optional prosody cols

# 2. Prepare and train SentencePiece models (once)
#    Extract columns to plain text files for reliability, then train.
with open('src.txt', 'w', encoding='utf-8') as f_src, open('tgt.txt', 'w', encoding='utf-8') as f_tgt:
    for s, t in zip(data['src'], data['tgt']):
        f_src.write(s.replace('\n',' ').strip() + '\n')
        f_tgt.write(t.replace('\n',' ').strip() + '\n')


#    Train SentencePiece on the extracted files
#    This generates src_spm.model / src_spm.vocab and tgt_spm.model / tgt_spm.vocab
spm.SentencePieceTrainer.Train(
    f"--input=src.txt --model_prefix=src_spm --vocab_size={VOCAB_SIZE} "
    "--model_type=bpe --unk_id=0 --pad_id=1 --bos_id=2 --eos_id=3"
)
spm.SentencePieceTrainer.Train(
    f"--input=tgt.txt --model_prefix=tgt_spm --vocab_size={VOCAB_SIZE} "
    "--model_type=bpe --unk_id=0 --pad_id=1 --bos_id=2 --eos_id=3"
)

# 3. Load SP models
src_sp = spm.SentencePieceProcessor(model_file='src_spm.model')
tgt_sp = spm.SentencePieceProcessor(model_file='tgt_spm.model')
SRC_VOCAB = src_sp.get_piece_size()
TGT_VOCAB = tgt_sp.get_piece_size()


In [4]:
data.head()

Unnamed: 0,filename,src,tgt
0,LJ001-0001,"Printing, in the only sense with which we are ...",ఎగ్జిబిషన్‌లో ప్రాతినిధ్యం వహించే అన్ని కళలు మ...
1,LJ001-0002,in being comparatively modern.,తులనాత్మకంగా ఆధునికంగా ఉండటంలో.
2,LJ001-0003,For although the Chinese took impressions from...,"ఎందుకంటే, నెదర్లాండ్స్‌లోని చెక్కలను కొట్టేవార..."
3,LJ001-0004,"produced the block books, which were the immed...",నిజమైన ముద్రిత పుస్తకానికి తక్షణ పూర్వీకులు అయ...
4,LJ001-0005,the invention of movable metal letters in the ...,పదిహేనవ శతాబ్దం మధ్యలో కదిలే లోహ అక్షరాల ఆవిష్...


In [21]:
def add_special(ids, sos_id, eos_id):
    return [sos_id] + ids + [eos_id]

# Dataset class
class MTDataset(Dataset):
    def __init__(self, df, src_sp, tgt_sp, prosody_npz_path, prosody_dim=19):
        self.src = df['src'].tolist()
        self.tgt = df['tgt'].tolist()
        self.filenames = df['filename'].tolist()
        self.src_sp = src_sp
        self.tgt_sp = tgt_sp
        self.src_sos, self.src_eos = src_sp.bos_id(), src_sp.eos_id()
        self.tgt_sos, self.tgt_eos = tgt_sp.bos_id(), tgt_sp.eos_id()

        # Load prosody data from .npz file
        self.prosody_data = dict(np.load(prosody_npz_path))  # Assuming prosody features are in a .npz file
        self.prosody_dim = prosody_dim

    def __len__(self):
        return len(self.src)

    def encode(self, text, sp):
        return sp.encode(text, out_type=int)

    def __getitem__(self, idx):
        src_ids = add_special(self.encode(self.src[idx], self.src_sp), self.src_sos, self.src_eos)
        tgt_ids = add_special(self.encode(self.tgt[idx], self.tgt_sp), self.tgt_sos, self.tgt_eos)
        filename = self.filenames[idx]

        # Check if the filename exists in the prosody data
        # print(self.prosody_data)
        if filename in self.prosody_data.keys():
            # print(filename)
            pros = self.prosody_data[filename].astype(np.float32)
            pros = pros.mean(axis=1)  # Take the mean over dim=1 to make it a fixed-size vector
        else:
            # If filename not found, generate a random prosody vector of size 19
            # print("Generating random prosody vector")
            pros = np.random.rand(self.prosody_dim).astype(np.float32)

        return torch.tensor(src_ids), torch.tensor(tgt_ids), torch.tensor(pros)


# Collate function
def collate_fn(batch):
    src_batch, tgt_batch, pros_batch = zip(*batch)
    src_pad = pad_sequence(src_batch, padding_value=src_sp.pad_id(), batch_first=True)
    tgt_pad = pad_sequence(tgt_batch, padding_value=tgt_sp.pad_id(), batch_first=True)
    pros = torch.stack(pros_batch)
    pros = pros.unsqueeze(1).expand(-1, src_pad.size(1), -1)
    return src_pad.to(DEVICE), tgt_pad.to(DEVICE), pros.to(DEVICE)

In [22]:
# Prepare dataset and dataloaders
dataset = MTDataset(data, src_sp, tgt_sp, prosody_npz_path='prosody_features.npz')
train_size = int(0.98 * len(dataset))
test_size = len(dataset) - train_size
train_data, test_data = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_data, batch_size=1, shuffle=False, collate_fn=collate_fn)

In [23]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers, prosody_dim=0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.input_dim = embed_size + prosody_dim  # Concatenating prosody features with embeddings
        self.gru = nn.GRU(self.input_dim, hidden_size, num_layers, batch_first=True)

    def forward(self, x, prosody):
        if prosody is None:
            raise ValueError("Prosody tensor should never be None")

        embedded = self.embedding(x)

        # Ensure prosody tensor matches sequence length and concatenate
        prosody = prosody[:, :embedded.size(1), :]  # Ensure prosody has the same length as the sequence

        # Concatenate prosody features with embeddings
        embedded = torch.cat((embedded, prosody), dim=2)

        # Pass through GRU
        outputs, hidden = self.gru(embedded)
        return outputs, hidden


# Decoder
class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.gru = nn.GRU(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc_out = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        output, hidden = self.gru(embedded, hidden)
        pred = self.fc_out(output.squeeze(1))
        return pred, hidden

# Seq2Seq model
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, tgt, prosody=None, teacher_forcing_ratio=0.5):
        batch_size, tgt_len = tgt.size()
        outputs = torch.zeros(batch_size, tgt_len, TGT_VOCAB).to(DEVICE)
        _, hidden = self.encoder(src, prosody)
        input = tgt[:, 0]
        for t in range(1, tgt_len):
            out, hidden = self.decoder(input, hidden)
            outputs[:, t, :] = out
            input = tgt[:, t] if torch.rand(1).item() < teacher_forcing_ratio else out.argmax(1)
        return outputs




In [30]:
def evaluate_bleu(model, dataloader):
    model.eval()
    refs, hyps = [], []

    # print("Sample translations:\n")
    with torch.no_grad():
        for i, (src, tgt, pros) in enumerate(dataloader):
            _, hidden = model.encoder(src, pros)
            input = torch.tensor([tgt_sp.bos_id()] * src.size(0)).to(DEVICE)
            outputs = []
            for _ in range(50):
                out, hidden = model.decoder(input, hidden)
                input = out.argmax(1)
                outputs.append(input)
            outputs = torch.stack(outputs, dim=1).cpu().tolist()
            tgt = tgt.cpu().tolist()
            for ref, hyp in zip(tgt, outputs):
                ref_tokens = [t for t in ref[1:] if t not in [tgt_sp.pad_id(), tgt_sp.eos_id()]]
                hyp_tokens = [t for t in hyp if t not in [tgt_sp.pad_id(), tgt_sp.eos_id()]]
                refs.append([ref_tokens])
                hyps.append(hyp_tokens)
                if i < 5:
                    ref_text = tgt_sp.decode(ref_tokens)
                    hyp_text = tgt_sp.decode(hyp_tokens)
    bleu = corpus_bleu(refs, hyps, smoothing_function=SmoothingFunction().method4)
    print(f"BLEU Score: {bleu:.4f}")

In [26]:
def train(model, dataloader, optimizer, criterion, epochs=10):
    model.to(DEVICE)
    for ep in range(epochs):
        model.train()
        total = 0
        for src_batch, tgt_batch, pros in dataloader:
            optimizer.zero_grad()
            preds = model(src_batch, tgt_batch, pros)
            out = preds[:, 1:].reshape(-1, preds.size(-1))
            tgt = tgt_batch[:, 1:].reshape(-1)
            loss = criterion(out, tgt)
            loss.backward()
            optimizer.step()
            total += loss.item()
        print(f"Epoch {ep+1}, Loss: {total/len(dataloader):.4f}")

encoder = Encoder(SRC_VOCAB, EMBED_SIZE, HIDDEN_SIZE, NUM_LAYERS, prosody_dim=19)
decoder = Decoder(TGT_VOCAB, EMBED_SIZE, HIDDEN_SIZE, NUM_LAYERS)
model = Seq2Seq(encoder, decoder).to(DEVICE)

# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=tgt_sp.pad_id())
optimizer = torch.optim.Adam(model.parameters(), lr=LR)

if __name__ == '__main__':
    train(model, train_loader, optimizer, criterion)



Epoch 1, Loss: 7.3427
Epoch 2, Loss: 6.7586
Epoch 3, Loss: 6.3134
Epoch 4, Loss: 5.9158
Epoch 5, Loss: 5.6191
Epoch 6, Loss: 5.3255
Epoch 7, Loss: 5.0978
Epoch 8, Loss: 4.8639
Epoch 9, Loss: 4.6325
Epoch 10, Loss: 4.4278


In [31]:
evaluate_bleu(model, test_loader)


BLEU Score: 0.2394
