<a href="https://colab.research.google.com/github/shraghvi28/genomic-prediction-using-multi-omics-data-in-plants/blob/branch1/Untitled1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import re
import unicodedata
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import random
from collections import defaultdict

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)

Using device: cpu


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Load data (replace 'tel.txt' with your file)
with open('tel.txt', encoding='utf-8') as f:
    lines = f.read().strip().split('\n')

# Split into pairs and limit dataset
pairs = [[s for s in l.split('\t')] for l in lines][:50000]

# Text normalization
def normalize(s):
    s = ''.join(c for c in unicodedata.normalize('NFD', s)
              if unicodedata.category(c) != 'Mn')
    s = re.sub(r"([?.!,<])", r" \1 ", s)
    s = re.sub(r"[' ]+", " ", s)
    return s.lower().strip()

for pair in pairs:
    pair[0] = normalize(pair[0])
    pair[1] = normalize(pair[1])

print("Sample pairs:", random.sample(pairs, 3))

Sample pairs: [['she asked me how many languages i spoke .', 'నను ఎనన భషలు మటలడుతనన తను అడగంద', 'CC-BY 2.0 (France) Attribution: tatoeba.org #314190 (CK) & #7204349 (bharath)'], ['can you tell me where the nearest bus stop is ?', 'దగగరల వునన బస సటప ఎకకడ కంచం చపతవ', 'CC-BY 2.0 (France) Attribution: tatoeba.org #27269 (CK) & #7204372 (bharath)'], ['more coffee , please .', 'దయచస ఇంకంచం కఫ ఇవవర .', 'CC-BY 2.0 (France) Attribution: tatoeba.org #701897 (Eldad) & #4639482 (bharath)']]


In [None]:
class Lang:
    def __init__(self):
        self.word2index = {"<pad>":0, "<sos>":1, "<eos>":2}
        self.index2word = {0:"<pad>", 1:"<sos>", 2:"<eos>"}
        self.n_words = 3

    def add_sentence(self, sentence):
        for word in sentence.split():
            if word not in self.word2index:
                self.word2index[word] = self.n_words
                self.index2word[self.n_words] = word
                self.n_words += 1

# Build vocabularies
eng = Lang()
tel = Lang()
for pair in pairs:
    tel.add_sentence(pair[0])
    eng.add_sentence(pair[1])

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim, batch_first=True)
        self.dropout = nn.Dropout(0.5)

    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, hidden = self.rnn(embedded)
        return outputs, hidden


In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim + hid_dim, hid_dim, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, output_dim)
        self.dropout = nn.Dropout(0.5)
        self.attention = Attention(hid_dim)

    def forward(self, x, hidden, encoder_outputs):
        x = x.unsqueeze(1)
        embedded = self.dropout(self.embedding(x))
        a = self.attention(hidden.squeeze(0), encoder_outputs).unsqueeze(1)
        weighted = torch.bmm(a, encoder_outputs)
        rnn_input = torch.cat((embedded, weighted), dim=2)
        output, hidden = self.rnn(rnn_input, hidden)
        return self.fc_out(output.squeeze(1)), hidden

In [None]:
class Attention(nn.Module):
    def __init__(self, hid_dim):
        super().__init__()
        self.attn = nn.Linear(hid_dim * 2, hid_dim)
        self.v = nn.Linear(hid_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        src_len = encoder_outputs.shape[1]
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        return F.softmax(self.v(energy).squeeze(2), dim=1)

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg):
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.fc_out.out_features

        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(device)
        encoder_outputs, hidden = self.encoder(src)

        input = trg[:, 0]
        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden, encoder_outputs)
            outputs[:, t] = output
            input = trg[:, t]  # Teacher forcing

        return outputs

In [None]:
# Hyperparameters
INPUT_DIM = tel.n_words
OUTPUT_DIM = eng.n_words
EMB_DIM = 256
HID_DIM = 512
EPOCHS = 25
BATCH_SIZE = 64

# Initialize models
enc = Encoder(INPUT_DIM, EMB_DIM, HID_DIM).to(device)
dec = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM).to(device)
model = Seq2Seq(enc, dec).to(device)

# Optimizer and loss
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=0)

In [None]:
# Convert sentences to tensors
def tensorize(lang, sentences):
    tensors = []
    for s in sentences:
        tokens = [lang.word2index.get(word, 0) for word in s.split()]
        tensors.append(torch.LongTensor([1] + tokens + [2]))  # Add <sos> and <eos>
    return torch.nn.utils.rnn.pad_sequence(tensors, padding_value=0)

# Create dataset
src_tensors = tensorize(tel, [p[0] for p in pairs])
trg_tensors = tensorize(eng, [p[1] for p in pairs])
dataset = torch.utils.data.TensorDataset(src_tensors, trg_tensors)
train_loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

AssertionError: Size mismatch between tensors