In [45]:
!pip uninstall -y torch torchtext torchvision
!pip install torch==2.0.1 torchvision==0.15.2 torchtext==0.15.2


Found existing installation: torch 2.0.1
Uninstalling torch-2.0.1:
  Successfully uninstalled torch-2.0.1
Found existing installation: torchtext 0.15.2
Uninstalling torchtext-0.15.2:
  Successfully uninstalled torchtext-0.15.2
Found existing installation: torchvision 0.15.2
Uninstalling torchvision-0.15.2:
  Successfully uninstalled torchvision-0.15.2
Collecting torch==2.0.1
  Using cached torch-2.0.1-cp310-cp310-manylinux1_x86_64.whl.metadata (24 kB)
Collecting torchvision==0.15.2
  Using cached torchvision-0.15.2-cp310-cp310-manylinux1_x86_64.whl.metadata (11 kB)
Collecting torchtext==0.15.2
  Using cached torchtext-0.15.2-cp310-cp310-manylinux1_x86_64.whl.metadata (7.4 kB)
Using cached torch-2.0.1-cp310-cp310-manylinux1_x86_64.whl (619.9 MB)
Using cached torchvision-0.15.2-cp310-cp310-manylinux1_x86_64.whl (6.0 MB)
Using cached torchtext-0.15.2-cp310-cp310-manylinux1_x86_64.whl (2.0 MB)
Installing collected packages: torch, torchvision, torchtext
[31mERROR: pip's dependency resolve

In [46]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import time
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torchtext.vocab import build_vocab_from_iterator
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
import time
import torch.utils.data as data
import os



In [47]:
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

In [48]:
# Load dataset
file_path = "/kaggle/input/spoc-train/spoc-train.tsv"  # Update this if needed
df = pd.read_csv(file_path, sep='\t')


In [49]:
# Remove NaN values
df = df.dropna(subset=['text', 'code'])

In [50]:
# Tokenizer (Basic)
def tokenize(text):
    return text.lower().split()

In [51]:
# Build vocabulary
class Vocab:
    def __init__(self, texts, min_freq=1):
        self.word2idx = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.idx2word = ["<PAD>", "<SOS>", "<EOS>", "<UNK>"]
        self.freqs = {}
        
        for text in texts:
            for word in tokenize(text):
                self.freqs[word] = self.freqs.get(word, 0) + 1
                
                if word not in self.word2idx and self.freqs[word] >= min_freq:
                    self.word2idx[word] = len(self.idx2word)
                    self.idx2word.append(word)
        
    def encode(self, text):
        return [self.word2idx.get(word, self.word2idx["<UNK>"]) for word in tokenize(text)] + [self.word2idx["<EOS>"]]
    
    def decode(self, tokens):
        return " ".join([self.idx2word[token] for token in tokens if token > 2])

In [52]:
# Prepare vocabularies
source_vocab = Vocab(df['text'].tolist())
target_vocab = Vocab(df['code'].tolist())

In [53]:
# Dataset class
class CodeDataset(Dataset):
    def __init__(self, df, source_vocab, target_vocab):
        self.df = df
        self.source_vocab = source_vocab
        self.target_vocab = target_vocab
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        source = self.source_vocab.encode(self.df.iloc[idx]['text'])
        target = self.target_vocab.encode(self.df.iloc[idx]['code'])
        return torch.tensor(source), torch.tensor(target)

In [54]:
# Collate function
def collate_fn(batch):
    sources, targets = zip(*batch)
    sources = pad_sequence(sources, padding_value=0, batch_first=True)
    targets = pad_sequence(targets, padding_value=0, batch_first=True)
    return sources, targets

In [55]:
# Create DataLoader
dataset = CodeDataset(df, source_vocab, target_vocab)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)


In [56]:
# Transformer Model
class TransformerSeq2Seq(nn.Module):
    def __init__(self, input_dim, output_dim, emb_dim=256, n_heads=8, ff_dim=512, num_layers=3, dropout=0.1):
        super().__init__()
        
        self.encoder_embedding = nn.Embedding(input_dim, emb_dim)
        self.decoder_embedding = nn.Embedding(output_dim, emb_dim)
        
        self.transformer = nn.Transformer(
            d_model=emb_dim, nhead=n_heads, num_encoder_layers=num_layers,
            num_decoder_layers=num_layers, dim_feedforward=ff_dim, dropout=dropout
        )
        
        self.fc_out = nn.Linear(emb_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, src, tgt):
        src = self.encoder_embedding(src).permute(1, 0, 2)
        tgt = self.decoder_embedding(tgt).permute(1, 0, 2)
        
        output = self.transformer(src, tgt)
        output = self.fc_out(output).permute(1, 0, 2)
        
        return output

In [57]:
# Model Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TransformerSeq2Seq(len(source_vocab.idx2word), len(target_vocab.idx2word)).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=0.0005)


In [58]:
# Mixed Precision Training
scaler = torch.cuda.amp.GradScaler()

In [59]:
# Training Loop
def train(model, dataloader, criterion, optimizer, epochs=10):
    model.train()
    for epoch in range(epochs):
        start_time = time.time()
        epoch_loss = 0
        
        for src, tgt in tqdm(dataloader):
            src, tgt = src.to(device), tgt.to(device)
            optimizer.zero_grad()
            
            with torch.cuda.amp.autocast():
                output = model(src, tgt[:, :-1])
                output = output.reshape(-1, output.shape[-1])
                tgt = tgt[:, 1:].reshape(-1)
                loss = criterion(output, tgt)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            
            epoch_loss += loss.item()
        
        end_time = time.time()
        print(f"Epoch {epoch+1}, Loss: {epoch_loss/len(dataloader):.4f}, Time: {end_time - start_time:.2f}s")

In [60]:
# Train model
train(model, dataloader, criterion, optimizer, epochs=10)

100%|██████████| 6758/6758 [04:16<00:00, 26.34it/s]


Epoch 1, Loss: 1.3929, Time: 256.56s


100%|██████████| 6758/6758 [04:14<00:00, 26.53it/s]


Epoch 2, Loss: 0.7776, Time: 254.76s


100%|██████████| 6758/6758 [04:19<00:00, 26.07it/s]


Epoch 3, Loss: 0.6013, Time: 259.26s


100%|██████████| 6758/6758 [04:18<00:00, 26.17it/s]


Epoch 4, Loss: 0.5029, Time: 258.27s


100%|██████████| 6758/6758 [04:16<00:00, 26.34it/s]


Epoch 5, Loss: 0.4417, Time: 256.55s


100%|██████████| 6758/6758 [04:19<00:00, 26.02it/s]


Epoch 6, Loss: 0.4070, Time: 259.71s


100%|██████████| 6758/6758 [04:15<00:00, 26.42it/s]


Epoch 8, Loss: 0.3834, Time: 255.84s


100%|██████████| 6758/6758 [04:19<00:00, 26.05it/s]


Epoch 9, Loss: 0.3771, Time: 259.46s


100%|██████████| 6758/6758 [04:19<00:00, 26.06it/s]

Epoch 10, Loss: 0.3681, Time: 259.29s





In [61]:
# Save model
model_path = "transformer_seq2seq.pth"
torch.save(model.state_dict(), model_path)
print("Model saved!")

Model saved!


In [62]:
# Optimized Code Generation (Inference) Function
def generate_code(model, input_text, source_vocab, target_vocab, max_length=50):
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        # Encode input text to token IDs
        src_tokens = torch.tensor([source_vocab.encode(input_text)], device=device)

        # Start decoding with <SOS> token
        tgt_tokens = torch.tensor([[target_vocab.word2idx["<SOS>"]]], device=device)

        for _ in range(max_length):
            # Get model output
            output = model(src_tokens, tgt_tokens)
            
            # Get the last token prediction
            next_token = output[:, -1, :].argmax(dim=-1).item()

            # Append next token to sequence
            tgt_tokens = torch.cat((tgt_tokens, torch.tensor([[next_token]], device=device)), dim=1)

            # Stop decoding if <EOS> is generated
            if next_token == target_vocab.word2idx["<EOS>"]:
                break

        # Convert token IDs back to code string
        generated_code = target_vocab.decode(tgt_tokens.squeeze().tolist())
    
    return generated_code

In [67]:
# Test model with a sample input
sample_input = "create integer flag with flag = 1 "
predicted_output = generate_code(model, sample_input, source_vocab, target_vocab)
print(f"Input: {sample_input}\nGenerated Code:\n{predicted_output}")

Input: create integer flag with flag = 1 
Generated Code:

