In [35]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

BATCH_SIZE = 4

x_train = np.loadtxt("akshar_sequences//x_train.csv", delimiter=",", dtype=int)
y_train = np.loadtxt("akshar_sequences//y_train.csv", delimiter=",", dtype=int)
x_test = np.loadtxt("akshar_sequences//x_test.csv", delimiter=",", dtype=int)
y_test = np.loadtxt("akshar_sequences//y_test.csv", delimiter=",", dtype=int)
x_val = np.loadtxt("akshar_sequences//x_val.csv", delimiter=",", dtype=int)
y_val = np.loadtxt("akshar_sequences//y_val.csv", delimiter=",", dtype=int)


class SequenceDataset(torch.utils.data.Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y
    
    def __getitem__(self, index):
        x = torch.from_numpy(self.x[index]).long() 
        y = torch.from_numpy(self.y[index]).long() 
        return x, y
    
    def __len__(self):
        return len(self.x)

train_dataset = SequenceDataset(x_train, y_train)
val_dataset = SequenceDataset(x_val, y_val)
test_dataset = SequenceDataset(x_test, y_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [36]:
for x,y in train_loader:
    print(x.shape)
    print(y.shape)
    for seq in x:
        print(seq)
    for seq in y:
        print(seq)
    break

torch.Size([4, 28])
torch.Size([4, 28])
tensor([128,  10,   7,  24,   0,   0,  19,   8,  24,   0,   0,  13, 130, 130,
        130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 129])
tensor([128,  15,  14,   1,   0, 130, 130, 130, 130, 130, 130, 130, 130, 130,
        130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 129])
tensor([128,   2,  14,  13,  18,  14,  17,  19,  18, 130, 130, 130, 130, 130,
        130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 129])
tensor([128,  10,   0,  11,   9,   8, 130, 130, 130, 130, 130, 130, 130, 130,
        130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 129])
tensor([128,  48, 103,  73,  88,  62,  89,  73,  88,  28, 130, 130, 130, 130,
        130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 129])
tensor([128,  68, 101,  70,  88, 130, 130, 130, 130, 130, 130, 130, 130, 130,
        130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 130, 129])
tensor([128,  47, 

In [37]:
VOCAB_SIZE = 131
EMBEDDING_DIM = 64
HIDDEN_DIM = 64
EPOCHS = 10
NUM_LAYERS = 1
DROPOUT = 0.2
BIDIRECTIONAL = 1
CELL_TYPE = "RNN"
BEAM_SIZE = 5

import torch.nn as nn

class Encoder(nn.Module):

    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, dropout):
        super(Encoder, self).__init__()

        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.dropout_prob = dropout

        self.dropout = nn.Dropout(self.dropout_prob)
        self.embedding = nn.Embedding(self.vocab_size, self.embedding_dim)
        self.rnn = nn.RNN(self.embedding_dim, self.hidden_dim, self.num_layers, batch_first=True)

    def forward(self, x):
        # x has shape (batch_size, seq_len)

        # Calculate embedding
        embedding = self.embedding(x)

        # Pass embedding through RNN
        _, hidden = self.rnn(embedding)

        # Apply dropout to hidden state
        hidden = self.dropout(hidden)

        return hidden

class Decoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, dropout):
        super(Decoder, self).__init__()

        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.dropout_prob = dropout

        self.dropout = nn.Dropout(self.dropout_prob)
        self.embedding = nn.Embedding(self.vocab_size, self.embedding_dim)
        self.rnn = nn.RNN(self.embedding_dim, self.hidden_dim, self.num_layers, batch_first=True)
        self.fc = nn.Linear(self.hidden_dim, self.vocab_size)

    def forward(self, x, hidden):
        # x has shape (batch_size, seq_len)

        # Calculate embedding
        embedding = self.embedding(x)

        # Pass embedding and hidden state through RNN
        output, hidden = self.rnn(embedding, hidden)

        # Apply dropout to output
        output = self.dropout(output)

        # Reshape output to (batch_size * seq_len, hidden_dim)
        output = output.reshape(-1, self.hidden_dim)

        # Pass output through fully-connected layer to get predictions
        preds = self.fc(output)

        # Reshape predictions to (batch_size, seq_len, vocab_size)
        preds = preds.reshape(x.shape[0], x.shape[1], -1)

        return preds, hidden
        

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, source, target):
        # Encode source sequence
        encoder_hidden = self.encoder(source)

        # Initialize decoder hidden state with encoder final hidden state
        decoder_hidden = encoder_hidden

        # Initialize output tensor
        target_len = target.shape[1]
        batch_size = target.shape[0]

        vocab_size = self.decoder.vocab_size
        outputs = torch.zeros(batch_size, target_len, vocab_size).to(target.device)

        # Use teacher forcing
        input_token = target[:, 0] 
        for t in range(1, target_len):
            output, decoder_hidden = self.decoder(input_token.unsqueeze(1), decoder_hidden)
            outputs[:, t] = output.squeeze(1)
            input_token = target[:, t]

        return outputs

encoder = Encoder(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
decoder = Decoder(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)

source = torch.randint(low=0, high=VOCAB_SIZE, size=(BATCH_SIZE, 10))
target = torch.randint(low=0, high=VOCAB_SIZE, size=(BATCH_SIZE, 10))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Seq2Seq(encoder, decoder, device)

output = model.forward(source,target)
          

In [38]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = Seq2Seq(encoder, decoder, device=device).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

for epoch in range(EPOCHS):
    # Train
    model.train()
    train_loss = 0
    for src, tgt in train_loader:
        src = src.to(device)
        tgt = tgt.to(device)

        optimizer.zero_grad()

        output = model.forward(src, tgt)
        output_dim = output.shape[-1]

        # Flatten output and target tensors to calculate loss
        loss = criterion(output.view(-1, output_dim), tgt.view(-1))

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

        optimizer.step()
        train_loss += loss.item()

    # Evaluate on validation set
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for src, tgt in val_loader:
            src = src.to(device)
            tgt = tgt.to(device)

            output= model.forward(src, tgt)
            output_dim = output.shape[-1]

            # Flatten output and target tensors to calculate loss
            loss = criterion(output.view(-1, output_dim), tgt.view(-1))

            val_loss += loss.item()

    # Print statistics for the epoch
    print("Epoch [{}/{}], Train Loss: {:.4f}, Val Loss: {:.4f}"
          .format(epoch+1, EPOCHS, train_loss/len(train_loader), val_loss/len(val_loader)))


Epoch [1/10], Train Loss: 1.1340, Val Loss: 1.0491
Epoch [2/10], Train Loss: 1.0718, Val Loss: 0.9161
Epoch [3/10], Train Loss: 1.0590, Val Loss: 0.9186
