I have chosen the NLP problem from here: https://drive.google.com/drive/folders/1oxnjzFQXIll5OUX7kaCyrE8FIKdFdeuV

The requirements are not precise so i will just note down my assumptions at the top here and also call them out as part of the comments in code:

1. What should be the **synthetically created** input and output of the model ?

- **Input** : What should be the input to the model ? 

	The doc says - "(input should be) ... the architecture of a neural network created in PyTorch.This includes detailed information about its layers, configurations, and parameters." 

	There are several options which can be considered here:

	a) serialise the model and use the serialised string as input
	b) use the ouput of `__str__()` method of the model
	c) use a library like https://pypi.org/project/torch-summary/ to create it.

	It will be the toughest for the model to learn from (a) above . (b) & (c) should be comparable.

	For simplicity i have gone with (b) - using `model.__str__()` for input.
 
-  **Output**: What should be the output of the model ?

	A couple of options here are: 

	a) use a large language model to generate the output (openAI api/self-hosted LLAMA)
	b) write a simple function on my own.
		
	For simplicity, I have gone with (b)

2. Which **model** should I use ?
	
	The doc says a "seq2seq" model. The term seq2seq is most often used in the context of a RNN (encoder + decoder) based architecture, with or without attention. Although some people also use it in the context of transformer based architecture, but that is rare.
	
	I have gone with RNN based encoder/decoder arch with attention and adapted my model from here: https://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html
    
    The choice of seq2seq model means that it typically performs well with input sizes of about 30-50 characters. The sizes I have taken are longer but the model performs reasonably well.

In [5]:
"""
All imports at the top
"""

from __future__ import unicode_literals, print_function, division
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, RandomSampler


from io import open
import unicodedata
import re
import random
from time import time
import math
import numpy as np
import json


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [6]:
"""
A sample of the synthetically generated input I will be using
"""

sample = nn.Sequential(nn.Linear(2,10), nn.Linear(10,25), nn.Conv1d(25, 3, 1))
sample.__str__()

'Sequential(\n  (0): Linear(in_features=2, out_features=10, bias=True)\n  (1): Linear(in_features=10, out_features=25, bias=True)\n  (2): Conv1d(25, 3, kernel_size=(1,), stride=(1,))\n)'

In [7]:
"""
Helper functions to create synthetic data
"""

NUM_ITEMS = 20000
def find_divisors(n):
    # Handle edge case for non-positive numbers
    if n <= 0:
        return []
    
    divisors = []
    for i in range(1, int(n**0.5) + 1):
        if n % i == 0:
            divisors.append(i)
            if i != n // i:  # Avoid duplicates for perfect squares
                divisors.append(n // i)
    
    return sorted(divisors)


def perfect_square_root(n):
    if n < 0:
        return None
    
    root = int(math.sqrt(n))
    
    if root * root == n:
        return root
    else:
        return None



def get_dims_helper(dims_left):
    divs = find_divisors(dims_left)
    for div in divs:
        n_2 = dims_left // div
        root = perfect_square_root(n_2)
        if root is not None:
            return div, root, root
    return dims_left, 1, 1

def get_out_dims_helper(dims_left, h):
    divs = find_divisors(dims_left)
    for div in divs:
        n_2 = dims_left // div
        root = perfect_square_root(n_2)
        if root is not None:
            k = h - root + 1
            if k > 0:
                return div, k
    return dims_left, h
        

class Reshape(nn.Module):
    def __init__(self, size):
        super().__init__()
        self.size = size
    
    def forward(self,x):
        return x.view(*self.size)

In [8]:
"""
Generate one synthetic sample.

I have made sure to create models that will "work". Random sizes for layer dimensions and layer placements
would still have been enough for learning the task at hand, but the incompatibility between layers 
would have meant they would have thrown runtime errors in practice.

"""

def get_one_sample():
    layer_options = [nn.Linear, nn.Conv2d]
    layers = []
    sizes = []
    num_layers = random.choice(range(0,6))
    dim_choices = range(1,10)
    curr = None
    for layer_idx in range(num_layers):
        layer_type = random.choice(layer_options)
        if layer_idx == 0:
            if layer_type == nn.Linear:
                dims = [random.choice(dim_choices) for _ in range(2) ]
                layers.append(nn.Linear(*dims))
                sizes.append((["N", dims[0]] , ["N", dims[1]]))
                curr = (layers[-1](torch.ones(32 * dims[0]).view(32, dims[0])))
            elif layer_type == nn.Conv2d:
                dims = [random.choice(dim_choices) for _ in range(3)]
                layers.append(nn.Conv2d(*dims))
                sizes.append((["N" , dims[0], "H", "W"], ["N", dims[1], f"H-{dims[2]}+1", f"W-{dims[2]}+1"]))
                curr = (layers[-1](torch.ones(32 * dims[0]* 64 * 64).view(32, dims[0], 64, 64)))
        else:
                size = curr.size()
                if layer_type == nn.Linear:
                    if isinstance(layers[-1] , nn.Linear):
                        dims = [size[1], random.choice(dim_choices)]
                        layers.append(nn.Linear(*dims))
                        sizes.append((["N", dims[0]], ["N", dims[1]]))
                        curr = (layers[-1](curr))
                    elif isinstance(layers[-1] , nn.Conv2d):
                        layers.append(Reshape([size[0], -1]))
                        curr = layers[-1](curr)
                        dims = [curr.size()[1], random.choice(dim_choices)]
                        layers.append(nn.Linear(*dims))
                        sizes.append((["N", dims[0]], ["N", dims[1]]))
                        curr = (layers[-1](curr))
                elif layer_type == nn.Conv2d:
                    if isinstance(layers[-1] , nn.Linear):
                        dims_left = curr.numel() // size[0]
                        c, h, w = get_dims_helper(dims_left) 
                        layers.append(Reshape([size[0], c, h, w] ))
                        curr = layers[-1](curr)
                        co, k = get_out_dims_helper(dims_left, h)
                        dims = [c, co, k]
                        layers.append(nn.Conv2d(*dims))
                        sizes.append((["N", c, h, w], ["N", co, h-k+1, h-k+1]))
                        curr = layers[-1](curr)
                    else:
                        continue
    in_dim = sizes[0][0] if len(sizes) > 0 else None
    out_dim = sizes[-1][1] if len(sizes) > 0 else None
    return nn.Sequential(*layers), in_dim, out_dim

In [9]:
"""
Simple description for a model
"""

def get_description(n,i,o):
    base =  f"this model has {n} layers."
    if i != None:
        i = i.__str__().replace("'", "")
        base = base + f"the input has shape {i}."
    if o != None:
        o = o.__str__().replace("'", "")
        base = base + f"the output has shape {o}"
    return base



In [10]:
"""
create dataset in memory
"""

def create_dataset(num_samples, report_after=4000):
    data_pairs = []
    for idx in range(num_samples):
        m, i ,o = get_one_sample()
        n = len(list(m.named_children()))
        data_pairs.append({"model": m.__str__(), "desc": get_description(n, i, o)})
        if idx % report_after == 0 and idx >= report_after:
            print(f"wrote {idx} items to disk ....")
    return data_pairs

dp = create_dataset(2000)
dp[:10]

[{'model': 'Sequential(\n  (0): Linear(in_features=3, out_features=7, bias=True)\n  (1): Reshape()\n  (2): Conv2d(7, 7, kernel_size=(1, 1), stride=(1, 1))\n  (3): Reshape()\n  (4): Linear(in_features=7, out_features=9, bias=True)\n  (5): Linear(in_features=9, out_features=6, bias=True)\n)',
  'desc': 'this model has 6 layers.the input has shape [N, 3].the output has shape [N, 6]'},
 {'model': 'Sequential(\n  (0): Conv2d(6, 3, kernel_size=(6, 6), stride=(1, 1))\n)',
  'desc': 'this model has 1 layers.the input has shape [N, 6, H, W].the output has shape [N, 3, H-6+1, W-6+1]'},
 {'model': 'Sequential(\n  (0): Linear(in_features=8, out_features=3, bias=True)\n  (1): Linear(in_features=3, out_features=7, bias=True)\n  (2): Linear(in_features=7, out_features=1, bias=True)\n  (3): Reshape()\n  (4): Conv2d(1, 1, kernel_size=(1, 1), stride=(1, 1))\n)',
  'desc': 'this model has 5 layers.the input has shape [N, 8].the output has shape [N, 1, 1, 1]'},
 {'model': 'Sequential()', 'desc': 'this mod

In [11]:
"""
write dataset to disk
"""

def write_dataset():
  data = create_dataset(NUM_ITEMS)
  json_data = json.dumps(data)
  json_data

  with open("simple_data.json", "w") as f:
    f.write(json_data)
  print(f"created a synthetic dataset with number of items: {len(data)} and saved to disk")
  print(f"sample dataset entry:\n input: {data[0]['model']}\n output: {data[0]['desc']}")

write_dataset()


wrote 4000 items to disk ....
wrote 8000 items to disk ....
wrote 12000 items to disk ....
wrote 16000 items to disk ....
created a synthetic dataset with number of items: 20000 and saved to disk
sample dataset entry:
 input: Sequential(
  (0): Conv2d(8, 8, kernel_size=(1, 1), stride=(1, 1))
)
 output: this model has 1 layers.the input has shape [N, 8, H, W].the output has shape [N, 8, H-1+1, W-1+1]


In [12]:
"""
class to create a Vocabulary
"""

SOS_token = 0
EOS_token = 1
PADDING_token = 2

class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS", 2: "PADDING"}
        self.n_words = 2  # Count SOS and EOS

    def addSentence(self, sentence):
        for word in sentence.split(' '):
            if word.isdigit():
              self.addInteger(word)
            else:
              self.addWord(word)

    def addInteger(self, word):
      for digit in word:
        self.addWord(word)

    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

In [13]:
"""
data preprocessing helper: normalise input and output
"""

def normalizeString(s):
    s = re.sub(r"([.!?()=,])", r" \1 ", s)
#     s = re.sub(r"([():,=\n])", r"", s)
#     s = re.sub(r"  ", r" ", s)
    return s.lower().strip()

x = normalizeString("Sequential(\n  (0): Conv1d(7, 1, kernel_size=(3,), stride=(1,))\n  (1): Linear(in_features=8, out_features=1, bias=True)\n)")
x

'sequential ( \n   ( 0 ) : conv1d ( 7 ,  1 ,  kernel_size =  ( 3 ,  )  ,  stride =  ( 1 ,  )  ) \n   ( 1 ) : linear ( in_features = 8 ,  out_features = 1 ,  bias = true ) \n )'

In [14]:
"""
Helper function to read data from disk and create vocabularies
"""

def load_data():
    print("Reading lines...")

    # Read the file and split into lines
    lines = []
    with open('simple_data.json') as f:
      lines = json.load(f)

    pairs = [[normalizeString(item["model"]), normalizeString(item["desc"])] for item in lines]
    input_lang = Lang("model")
    output_lang = Lang("desc")

    return input_lang, output_lang, pairs

In [15]:
"""
read data from disk and create vocabs
"""

def prepareData():
    input_lang, output_lang, pairs = load_data()
    print("Read %s sentence pairs" % len(pairs))
    print("Counting words...")
    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])
    print("Counted words:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)
    return input_lang, output_lang, pairs

input_lang, output_lang, pairs = prepareData()
random_pair = random.choice(pairs)
print(f"A random input/output pair : {random_pair}")

Reading lines...
Read 20000 sentence pairs
Counting words...
Counted words:
model 110
desc 53
A random input/output pair : ['sequential ( \n   ( 0 ) : linear ( in_features = 4 ,  out_features = 6 ,  bias = true ) \n   ( 1 ) : linear ( in_features = 6 ,  out_features = 8 ,  bias = true ) \n   ( 2 ) : reshape (  ) \n   ( 3 ) : conv2d ( 2 ,  2 ,  kernel_size =  ( 1 ,  1 )  ,  stride =  ( 1 ,  1 )  ) \n )', 'this model has 4 layers . the input has shape [n ,  4] . the output has shape [n ,  2 ,  2 ,  2]']


In [16]:
"""
Helper functions
"""

def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(' ')]

def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(1, -1)

def tensorsFromPair(pair):
    input_tensor = tensorFromSentence(input_lang, pair[0])
    target_tensor = tensorFromSentence(output_lang, pair[1])
    return (input_tensor, target_tensor)

In [17]:
"""
Empirically test max length of input and output
"""

def maxLength(l,lang):
  m = len(indexesFromSentence(lang, l[0]))
  for s in l:
    if len(indexesFromSentence(lang, s)) > m:
      m = len(indexesFromSentence(lang,s))
  return m

l1s = [item[0] for item in pairs]
l2s = [item[1] for item in pairs]
m1 = maxLength(l1s, input_lang)
m2 = maxLength(l2s, output_lang)
print(f"longest input sentence : {m1}.\nlongest output sentence {m2}.")

longest input sentence : 208.
longest output sentence 35.


In [18]:
"""
set input and output max length
"""

MAX_LENGTH_INPUT = 220
MAX_LENGTH_OUTPUT = 50

In [19]:
"""
Create a dataloader
"""
def helper(inputs, targets, l, r):
    input_ids = inputs[l :  r, :]
    target_ids = targets[l: r, :]
    data = TensorDataset(torch.LongTensor(input_ids).to(device),
                               torch.LongTensor(target_ids).to(device))
    sampler = RandomSampler(data)
    dataloader = DataLoader(data, sampler=sampler, batch_size=batch_size)
    return dataloader
    
def get_dataloader(batch_size, test_split = 0.2):
    input_lang, output_lang, pairs = prepareData()

    n = len(pairs)
    input_ids = np.full((n, MAX_LENGTH_INPUT), PADDING_token ,dtype=np.int32)
    target_ids = np.full((n, MAX_LENGTH_OUTPUT),  PADDING_token,  dtype=np.int32)

    for idx, (inp, tgt) in enumerate(pairs):
        inp_ids = indexesFromSentence(input_lang, inp)
        tgt_ids =  indexesFromSentence(output_lang, tgt)
        inp_ids.append(EOS_token)
        tgt_ids.append(EOS_token)
        input_ids[idx, :len(inp_ids)] = inp_ids
        target_ids[idx, :len(tgt_ids)] = tgt_ids
    
    test_size = int(test_split * n)
    train_size = n - test_size
    
    print(f"train size: {train_size}. test size: {test_size}")
    train_dataloader = helper(input_ids, target_ids, 0, train_size)
    test_dataloader = helper(input_ids, target_ids, train_size, n)
    
    
    return input_lang, output_lang, train_dataloader, test_dataloader

In [20]:
"""
Encoder RNN
"""
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, dropout_p=0.1):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, input):
        embedded = self.dropout(self.embedding(input))
        output, hidden = self.gru(embedded)
        return output, hidden

In [21]:
"""
Decoder RNN. The code is written so that we have the option of using teacher forcing at runtime.
In practice, teacher forcing works very well.
"""

class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)

    def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(SOS_token)
        decoder_hidden = encoder_hidden
        decoder_outputs = []

        for i in range(MAX_LENGTH_OUTPUT):
            decoder_output, decoder_hidden  = self.forward_step(decoder_input, decoder_hidden)
            decoder_outputs.append(decoder_output)

            if target_tensor is not None:
                # Teacher forcing: Feed the target as the next input
                decoder_input = target_tensor[:, i].unsqueeze(1) # Teacher forcing
            else:
                # Without teacher forcing: use its own predictions as the next input
                _, topi = decoder_output.topk(1)
                decoder_input = topi.squeeze(-1).detach()  # detach from history as input

        decoder_outputs = torch.cat(decoder_outputs, dim=1)
        decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
        return decoder_outputs, decoder_hidden, None # We return `None` for consistency in the training loop

    def forward_step(self, input, hidden):
        output = self.embedding(input)
        output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        output = self.out(output)
        return output, hidden

  
    
class BahdanauAttention(nn.Module):
    def __init__(self, hidden_size):
        super(BahdanauAttention, self).__init__()
        self.Wa = nn.Linear(hidden_size, hidden_size)
        self.Ua = nn.Linear(hidden_size, hidden_size)
        self.Va = nn.Linear(hidden_size, 1)

    def forward(self, query, keys):
        scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys)))
        scores = scores.squeeze(2).unsqueeze(1)

        weights = F.softmax(scores, dim=-1)
        context = torch.bmm(weights, keys)

        return context, weights

"""
Decoder RNN with attention
"""      
class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1):
        super(AttnDecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.attention = BahdanauAttention(hidden_size)
        self.gru = nn.GRU(2 * hidden_size, hidden_size, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(SOS_token)
        decoder_hidden = encoder_hidden
        decoder_outputs = []
        attentions = []

        for i in range(MAX_LENGTH_OUTPUT):
            decoder_output, decoder_hidden, attn_weights = self.forward_step(
                decoder_input, decoder_hidden, encoder_outputs
            )
            decoder_outputs.append(decoder_output)
            attentions.append(attn_weights)

            if target_tensor is not None:
                # Teacher forcing: Feed the target as the next input
                decoder_input = target_tensor[:, i].unsqueeze(1) # Teacher forcing
            else:
                # Without teacher forcing: use its own predictions as the next input
                _, topi = decoder_output.topk(1)
                decoder_input = topi.squeeze(-1).detach()  # detach from history as input

        decoder_outputs = torch.cat(decoder_outputs, dim=1)
        decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)
        attentions = torch.cat(attentions, dim=1)

        return decoder_outputs, decoder_hidden, attentions


    def forward_step(self, input, hidden, encoder_outputs):
        embedded =  self.dropout(self.embedding(input))

        query = hidden.permute(1, 0, 2)
        context, attn_weights = self.attention(query, encoder_outputs)
        input_gru = torch.cat((embedded, context), dim=2)

        output, hidden = self.gru(input_gru, hidden)
        output = self.out(output)

        return output, hidden, attn_weights



In [22]:
"""
train one epoch
"""

def train_epoch(dataloader, encoder, decoder, encoder_optimizer,
          decoder_optimizer, criterion, epoch, print_every = 100 ):

    total_loss = 0
    t = time()
    idx = 0
    for data in dataloader:

        input_tensor, target_tensor = data

        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor)

        loss = criterion(
            decoder_outputs.view(-1, decoder_outputs.size(-1)),
            target_tensor.view(-1)
        )
        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

        total_loss += loss.item()

        if(idx % print_every == 0 and idx != 0):
          print(f"batches : {idx-print_every} - {idx}  of epoch {epoch} took time: {time() - t}.")
          t = time()
        idx = idx+1

    return total_loss / len(dataloader)

In [23]:
"""
helper functions
"""

def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

def timeSince(since, percent):
    now = time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

In [38]:
"""
define function for complete training
"""

def train(train_dataloader, encoder, decoder, n_epochs, learning_rate=0.001,
               print_every=100, plot_every=100, save_file_suffix = ""):
    start = time()
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate)
    criterion = nn.NLLLoss()

    for epoch in range(1, n_epochs + 1):
        loss = train_epoch(train_dataloader, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, epoch = epoch)
        print_loss_total += loss
        plot_loss_total += loss

        if epoch % print_every == 0:
            torch.save(encoder.state_dict(), f"./encoder-{save_file_suffix}-{epoch}")
            torch.save(decoder.state_dict(), f"./decoder-{save_file_suffix}-{epoch}")
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print('EPOCH %d done. %s (%d %d%%) avearage loss: %.4f' % (epoch, timeSince(start, epoch / n_epochs),
                                        epoch, epoch / n_epochs * 100, print_loss_avg))

        if epoch % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0


In [39]:
"""
actual training
"""


hidden_size = 128
batch_size = 32

input_lang, output_lang, train_dataloader, test_loader = get_dataloader(batch_size)

encoder = EncoderRNN(input_lang.n_words, hidden_size).to(device)
decoder = AttnDecoderRNN(hidden_size, output_lang.n_words).to(device)

train(train_dataloader, encoder, decoder, 10 , print_every=1, plot_every=1, save_file_suffix="trial")

Reading lines...
Read 20000 sentence pairs
Counting words...
Counted words:
model 110
desc 53
train size: 16000. test size: 4000
batches : 0 - 100  of epoch 1 took time: 8.473941802978516.
batches : 100 - 200  of epoch 1 took time: 8.34061861038208.
batches : 200 - 300  of epoch 1 took time: 8.379926919937134.
batches : 300 - 400  of epoch 1 took time: 8.469292402267456.
EPOCH 1 done. 0m 42s (- 6m 18s) (1 10%) avearage loss: 0.2971
batches : 0 - 100  of epoch 2 took time: 8.423267841339111.
batches : 100 - 200  of epoch 2 took time: 8.37239670753479.
batches : 200 - 300  of epoch 2 took time: 8.450244903564453.
batches : 300 - 400  of epoch 2 took time: 8.290489196777344.
EPOCH 2 done. 1m 23s (- 5m 35s) (2 20%) avearage loss: 0.0562
batches : 0 - 100  of epoch 3 took time: 8.436162233352661.
batches : 100 - 200  of epoch 3 took time: 8.402474164962769.
batches : 200 - 300  of epoch 3 took time: 8.35907769203186.
batches : 300 - 400  of epoch 3 took time: 8.267791748046875.
EPOCH 3 done

In [40]:
"""
helpers to load trained models
"""

import os.path


def load_model(model, path):
    model.load_state_dict(torch.load(path))
    model.eval()
    return model

def get_trained_models(save_path_encoder, save_path_decoder):
    if os.path.isfile(save_path_encoder) and os.path.isfile(save_path_decoder):
        e = EncoderRNN(input_lang.n_words, hidden_size).to(device)
        d = AttnDecoderRNN(hidden_size, output_lang.n_words).to(device)
        e = load_model(e, save_path_encoder)
        d = load_model(d, save_path_decoder)
        return e, d
    print(f"pre trained weigths do not exist at given path")
    return None, None

In [41]:
"""
helper to evaluate a single translation
"""

def evaluate(encoder, decoder, sentence, input_lang, output_lang):
    with torch.no_grad():
        
        input_ids = np.full((1, MAX_LENGTH_INPUT),  PADDING_token,  dtype=np.int32)
        inputs = indexesFromSentence(input_lang, sentence)
        inputs.append(EOS_token)
        input_ids[0, : len(inputs)] = inputs
        
        input_tensor = torch.tensor(input_ids).to(device)

        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, decoder_hidden, decoder_attn = decoder(encoder_outputs, encoder_hidden)

        _, topi = decoder_outputs.topk(1)
        decoded_ids = topi.squeeze()

        decoded_words = []
        for idx in decoded_ids:
            if idx.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            decoded_words.append(output_lang.index2word[idx.item()])
    return decoded_words, decoder_attn

In [42]:
"""
get a taste of the translations
"""

def evaluateRandomly(encoder, decoder, n=10):
    for i in range(n):
        pair = random.choice(pairs)
        print('>', pair[0])
        print('=', pair[1])
        output_words, _ = evaluate(encoder, decoder, pair[0], input_lang, output_lang)
        output_sentence = ' '.join(output_words)
        print('<', output_sentence)
        print('')

In [43]:
def print_key():
    print(f"KEY:\n> INPUT\n= TARGET\n< MACHINE TRANSLATION\n\n----------------\n")

encoder.eval()
decoder.eval()

print_key()
# e, d = get_trained_models("encoder-trial-10", "decoder-trial-10")
# evaluateRandomly(e, d)
# UNCOMMENT TO TEST WITH TRAINED MODEL DIRECTLY
evaluateRandomly(encoder, decoder)

KEY:
> INPUT
= TARGET
< MACHINE TRANSLATION

----------------

> sequential ( 
   ( 0 ) : linear ( in_features = 1 ,  out_features = 8 ,  bias = true ) 
   ( 1 ) : linear ( in_features = 8 ,  out_features = 7 ,  bias = true ) 
 )
= this model has 2 layers . the input has shape [n ,  1] . the output has shape [n ,  7]
< this model has 2 layers . the input has shape [n ,  1] . the output has shape [n ,  7] <EOS>

> sequential ( 
   ( 0 ) : conv2d ( 8 ,  2 ,  kernel_size =  ( 2 ,  2 )  ,  stride =  ( 1 ,  1 )  ) 
   ( 1 ) : reshape (  ) 
   ( 2 ) : linear ( in_features = 7938 ,  out_features = 3 ,  bias = true ) 
 )
= this model has 3 layers . the input has shape [n ,  8 ,  h ,  w] . the output has shape [n ,  3]
< this model has 3 layers . the input has shape [n ,  8 ,  h ,  w] . the output has shape [n ,  3] <EOS>

> sequential ( 
   ( 0 ) : conv2d ( 6 ,  5 ,  kernel_size =  ( 4 ,  4 )  ,  stride =  ( 1 ,  1 )  ) 
 )
= this model has 1 layers . the input has shape [n ,  6 ,  h ,  w] . t

In [44]:
"""
helper to get tensor translation of a single input. will be used to calculate metrics 
"""

def get_prediction(source, encoder, decoder, input_lang):
    with torch.no_grad():
        input_tensor = tensorFromSentence(input_lang, source)

        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, decoder_hidden, decoder_attn = decoder(encoder_outputs, encoder_hidden)
        
        _, topi = decoder_outputs.topk(1)
        decoded_ids = topi.squeeze(-1)
    return decoded_ids

source, target = random.choice(pairs)
predicted = get_prediction(source, encoder, decoder, input_lang)
print(f"predicted : {predicted}")

predicted : tensor([[ 2,  3,  4, 24,  6,  7,  8,  9,  4, 10, 11, 12, 13, 14, 12, 13, 15, 12,
         13, 16,  7,  8, 17,  4, 10, 11, 12, 13, 31, 12, 13, 51, 12, 13, 52,  1,
          2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2]],
       device='cuda:0')


In [45]:
"""
accuracy metric definition
"""

def create_ngrams(x , n):
    return x.unfold(1,n,1)

def accuracy(predicted, target, ngram=1):
    predicted_ngrams = create_ngrams(predicted, ngram)
    target_ngrams = create_ngrams(target, ngram)
    matches = (predicted_ngrams == target_ngrams).all(dim=2)
    num = matches.sum()
    denom = matches.numel()
    #print(f"matches : {num} , denom : {denom}")
    return num / denom

In [46]:
"""
generic metric calculator. f is the metric function
"""
def metric(f, encoder, decoder):
    pair = random.choice(pairs)
    target_ids = np.full((1, MAX_LENGTH_OUTPUT),  PADDING_token,  dtype=np.int32)
    target = indexesFromSentence(output_lang, pair[1])
    target.append(EOS_token)
    target_ids[0, : len(target)] = target
    
    predicted = get_prediction(pair[0], encoder, decoder, input_lang)
    
    return f(predicted, torch.tensor(target_ids).to(device))

x = metric(accuracy, encoder, decoder)
print(f"metric for a single random item {x.item()}/1.0")

metric for a single random item 0.3799999952316284/1.0


In [47]:
"""
actual metric calculation with a stroed model
"""

def metric(f, encoder, decoder):
    acc = 0.0
    num = 0 
    for data in test_loader:
        input_tensor, target_tensor = data
        batch_size = input_tensor.size(0)
        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden)
        decoder_outputs = ((decoder_outputs.topk(1)[1]).squeeze(-1))
        t_acc = f(decoder_outputs, target_tensor)
        acc = ( (t_acc * batch_size) + (acc * num) ) / (num + batch_size)
        num = num + batch_size
    return acc
        
e, d = get_trained_models("encoder-trial-10", "decoder-trial-10")

m = metric(accuracy, e, d)
print(f"Accuracy metric: {m.item()/1.0}")

Accuracy metric: 0.9999993443489075


**This completes my implementation. I will list some improvements that I am thinking of here for completeness:**

1. I can include more type of layers. Right now I am using Linear and Conv2d layers. It will be easy to add ReLU and MaxPool layers. I'll have to spend some time for adding other types of layers if I wish to maintain "shape compatibility" between layers.

2. I am just using Sequential layers right now. I could use generic Module derived classes. Again, that will be work in the synthetic data generation segment

3. Can increase num layers in the models. Right now it's upto 6 layers. This will increase MAX_LENGTH for input so I'll have to see how the RNN handles that.

4. Can add more metrics based on precision and recall. (something similar to BLEU or ROGUE). May also think about "PADDING" matches to have a lower weight for metric calculations

In [None]:
"""
OLD SIMPLE IMPLEMENTATION OF SYNTHETIC DATA GENERATION. NOT USED NOW.
"""


"""
simple_synthetic_data_generator generates a single synthetic input

for simplicity, I have used only Linear and Conv1d layers as part of my Input.
"""

def get_shape_helper(model, stage):
  if isinstance(model, nn.Linear):
    if stage == "input":
      return f"(b, {model.in_features})"
    elif stage == "output":
      return f"(b, {model.out_features})"
  if isinstance(model, nn.Conv1d):
    if stage == "input":
      return f"(b, {model.in_channels}, l)"
    elif stage == "output":
      return f"(b, {model.out_channels}, l)"
  return NotImplementedError()


def simple_synthetic_data_generator(model):
  children = list(model.named_children())
  desc = f"this model has {len(children)} layers."
  if(len(children) > 0):
    desc = desc + f" the input has shape {get_shape_helper(children[0][1], 'input')} and the output has shape {get_shape_helper(children[-1][1], 'output')}"
  return desc

"""
test to see input for non empty model
"""

def non_empty_model_test():
  non_empty_model = nn.Sequential(nn.Linear(12,30), nn.Linear(30,1))
  print(simple_synthetic_data_generator(non_empty_model))
non_empty_model_test()


"""
test to see input for empty model
"""

def empty_model_test():
  empty_model = nn.Sequential()
  print(simple_synthetic_data_generator(empty_model))
empty_model_test()

"""
Assume that each layer has 0-9 input and output number of nodes / kernels etc.
"""

def get_random_sizes_for_layers(dims):
  return np.random.randint(0, 10, dims)

"""
Create a synthetic dataset.

The random selection of layers here means that the created synthetic model may throw runtime errors. 
However, that is not a problem for the task at hand. In fact, the random selection of layers and dims in the synthetic 
dataset means our trained seq2seq model learns a harder task as there is less "structure" in our synthetic models. 
"""

NUM_ITEMS = 20000

def create_dataset_old(num_items):
  layer_options = [(nn.Linear, 2), (nn.Conv1d, 3)] # use linear and conv1d layers
  len_layer_optins = len(layer_options)
  data_pairs = []
  for item in range(num_items):
    num_layers = np.random.randint(0,6) # assume 0-2 layers only
    layers = []
    for _ in range(num_layers):
      layer_tuple = layer_options[np.random.randint(0,len(layer_options))] # pick layer type
      layer = layer_tuple[0]
      dims = layer_tuple[1]
      layers.append(layer(*get_random_sizes_for_layers(dims))) # add layer to architecture
    model = nn.Sequential(*layers)
    data_pairs.append({"model": model.__str__(), "desc": simple_synthetic_data_generator(model)})
  return data_pairs
