In [1]:
import pandas as pd
import nltk
from nltk.tokenize import word_tokenize
import nltk
import torch
# Download the 'punkt' tokenizer data
nltk.download('punkt')

# Load dataset
file_path = r"/kaggle/input/spoc-db/spoc-train-train.tsv"  # Change to your dataset's path
df = pd.read_csv(file_path, delimiter="\t")

[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [2]:
df["text"] = df["text"].astype(str)
df["text_tokens"] = df["text"].apply(word_tokenize)
df["text"] = df["text"].fillna("")  # Replace NaN with empty strings
df["text_tokens"] = df["text"].apply(word_tokenize)
df["text_tokens"] = df["text"].apply(lambda x: word_tokenize(x) if isinstance(x, str) else [])
df["text_tokens"] = df["text"].apply(word_tokenize)
df["code_tokens"] = df["code"].apply(word_tokenize)
# Add start and end tokens to tokenized C++ code
df["code_tokens"] = df["code_tokens"].apply(lambda tokens: ["<start>"] + tokens + ["<end>"])
max_len = max(df["text_tokens"].apply(len).max(), df["code_tokens"].apply(len).max())
df["text_tokens"] = df["text_tokens"].apply(lambda tokens: tokens + ["<pad>"] * (max_len - len(tokens)))
df["code_tokens"] = df["code_tokens"].apply(lambda tokens: tokens + ["<pad>"] * (max_len - len(tokens)))
output_file = "tokenized_spoc.csv"
df[["text_tokens", "code_tokens"]].to_csv(output_file, index=False)

print(f"Tokenized data saved to {output_file}")

Tokenized data saved to tokenized_spoc.csv


In [3]:
import json

# Define special tokens with fixed indices
vocab = {
    "<unk>": 0,
    "<pad>": 1,
    "<start>": 2,
    "<end>": 3
}

# Assign indices to other tokens
for column in ["text_tokens", "code_tokens"]:
    for tokens in df[column]:
        for token in tokens:
            if token not in vocab:
                vocab[token] = len(vocab)

# Save vocabulary to JSON
vocab_file = "tokenizer_vocab.json"
with open(vocab_file, "w") as f:
    json.dump(vocab, f, indent=4)

print(f"Vocabulary saved to {vocab_file}")

Vocabulary saved to tokenizer_vocab.json


In [4]:
with open("tokenizer_vocab.json", "r") as f:
    vocab = json.load(f)

# Load tokenized data
df = pd.read_csv("/kaggle/working/tokenized_spoc.csv")

# Convert string tokens to lists
df["text_tokens"] = df["text_tokens"].apply(eval)
df["code_tokens"] = df["code_tokens"].apply(eval)

# Convert tokens to sequences using vocabulary
df["text_sequences"] = df["text_tokens"].apply(lambda tokens: [vocab.get(token, vocab["<unk>"]) for token in tokens])
df["code_sequences"] = df["code_tokens"].apply(lambda tokens: [vocab.get(token, vocab["<unk>"]) for token in tokens])

# Save sequences to CSV
output_file = "sequences.csv"
df[["text_sequences", "code_sequences"]].to_csv(output_file, index=False)

print(f"Tokenized sequences saved to {output_file}")

Tokenized sequences saved to sequences.csv


In [5]:
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import torch
import ast
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm

class DataLoad(Dataset):
    def __init__(self, file_path):
        df = pd.read_csv(file_path)
        self.inputs = [ast.literal_eval(x) for x in df['text_sequences']]
        self.outputs = [ast.literal_eval(x) for x in df['code_sequences']]

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        input_tensor = torch.tensor(self.inputs[idx], dtype=torch.int64)
        output_tensor = torch.tensor(self.outputs[idx], dtype=torch.int64)
        return input_tensor, output_tensor

def Add_Pad(batch):
    inputs, outputs = zip(*batch)
    inputs = pad_sequence(inputs, batch_first=True, padding_value=0)
    outputs = pad_sequence(outputs, batch_first=True, padding_value=0)
    return inputs, outputs

# Load dataset and dataloader
dataset = DataLoad('/kaggle/working/sequences.csv')
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, collate_fn=Add_Pad)

# Iterate with progress bar
data_iter = iter(dataloader)
for batch in tqdm(dataloader, desc="Loading Batches"):
    features, labels = batch  # Get a batch of data
    break  # Remove this if you want to iterate over all batches

print("Batch loaded successfully!")


Loading Batches:   0%|          | 0/3846 [00:00<?, ?it/s]

Batch loaded successfully!





In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

# Transformer Hyperparameters
class Config:
    vocab_size = 12388  # Adjust based on vocabulary.json
    max_length = 100  # Adjust based on sequence length
    embed_dim = 256
    num_heads = 8
    num_layers =2
    feedforward_dim = 512
    dropout = 0.1
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

config = Config()

# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, max_len=100):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-math.log(10000.0) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)  # Shape: (1, max_len, embed_dim)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

# Transformer Model
class PseudoCodeTransformer(nn.Module):
    def __init__(self, config):
        super(PseudoCodeTransformer, self).__init__()
        self.embedding = nn.Embedding(config.vocab_size, config.embed_dim)
        self.positional_encoding = PositionalEncoding(config.embed_dim, config.max_length)

        self.transformer = nn.Transformer(
            d_model=config.embed_dim,
            nhead=config.num_heads,
            num_encoder_layers=config.num_layers,
            num_decoder_layers=config.num_layers,
            dim_feedforward=config.feedforward_dim,
            dropout=config.dropout
        )

        self.fc_out = nn.Linear(config.embed_dim, config.vocab_size)
        self.dropout = nn.Dropout(config.dropout)

    def generate_square_subsequent_mask(self, sz):
        return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1).to(config.device)

    def forward(self, src, tgt):
        src_emb = self.embedding(src) * math.sqrt(config.embed_dim)
        tgt_emb = self.embedding(tgt) * math.sqrt(config.embed_dim)

        src_emb = self.positional_encoding(src_emb)
        tgt_emb = self.positional_encoding(tgt_emb)

        src_mask = self.generate_square_subsequent_mask(src.size(1))
        tgt_mask = self.generate_square_subsequent_mask(tgt.size(1))

        out = self.transformer(src_emb.permute(1, 0, 2), tgt_emb.permute(1, 0, 2),
                               src_mask=src_mask, tgt_mask=tgt_mask)

        out = self.fc_out(out.permute(1, 0, 2))  # Convert back to batch-first
        return out

# Initialize Model
model = PseudoCodeTransformer(config).to(config.device)
print("Model initialized successfully!")




Model initialized successfully!


In [7]:
def translate(model, pseudocode_tokens, vocab, device, max_length=50):
    model.eval()

    # Convert pseudocode tokens to numerical indices
    input_ids = [vocab.get(token, vocab["<unk>"]) for token in pseudocode_tokens]
    input_tensor = torch.tensor(input_ids, dtype=torch.long).unsqueeze(0).to(device)  # Add batch dimension

    # Start token for generation
    output_ids = [vocab["<start>"]]

    for _ in range(max_length):
        output_tensor = torch.tensor(output_ids, dtype=torch.long).unsqueeze(0).to(device)

        # Get model predictions
        with torch.no_grad():
            predictions = model(input_tensor, output_tensor)

        # Select the most probable token
        next_token_id = predictions.argmax(dim=-1)[:, -1].item()
        output_ids.append(next_token_id)

        # Stop if end token is generated
        if next_token_id == vocab["<end>"]:
            break

    # Convert token indices back to words
    id_to_token = {idx: token for token, idx in vocab.items()}
    translated_code = [id_to_token.get(idx, "<unk>") for idx in output_ids[1:]]  # Exclude <start> token

    return " ".join(translated_code)


In [8]:
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm
import os
import json

# Load vocabulary
with open("tokenizer_vocab.json", "r") as f:
    vocab = json.load(f)

# Check for GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Move model to device
model.to(device)

# Loss Function & Optimizer
criterion = nn.CrossEntropyLoss(ignore_index=1)  # Ignore padding token
optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-3)

# Create directory to save models
os.makedirs("checkpoints", exist_ok=True)

# Training Loop
num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0

    progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")
    for batch in progress_bar:
        src, tgt = batch
        src, tgt = src.to(device), tgt.to(device)  # Move batch to GPU

        tgt_input = tgt[:, :-1]  # Remove <end> token
        tgt_output = tgt[:, 1:]  # Shifted version

        optimizer.zero_grad()
        output = model(src, tgt_input)

        loss = criterion(output.view(-1, config.vocab_size), tgt_output.contiguous().view(-1))
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        progress_bar.set_postfix(loss=loss.item())

    avg_loss = epoch_loss / len(dataloader) 
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

# Save the entire model after all epochs
final_model_path = "checkpoints/PseudoToCode_transformer.pth"
torch.save(model, final_model_path)
print(f"Model saved: {final_model_path}")

Using device: cuda


Epoch 1/5: 100%|██████████| 3846/3846 [04:12<00:00, 15.26it/s, loss=0.071]


Epoch [1/5], Loss: 0.9334


Epoch 2/5: 100%|██████████| 3846/3846 [04:11<00:00, 15.27it/s, loss=0.71]


Epoch [2/5], Loss: 0.4669


Epoch 3/5: 100%|██████████| 3846/3846 [04:11<00:00, 15.31it/s, loss=0.202]


Epoch [3/5], Loss: 0.3663


Epoch 4/5: 100%|██████████| 3846/3846 [04:10<00:00, 15.38it/s, loss=0.817]


Epoch [4/5], Loss: 0.3151


Epoch 5/5: 100%|██████████| 3846/3846 [04:10<00:00, 15.36it/s, loss=0.691]


Epoch [5/5], Loss: 0.2820
Model saved: checkpoints/PseudoToCode_transformer.pth


In [9]:
example_pseudocode = "if x is greater than 10 then print x"
tokenized_input = example_pseudocode.split()
translated_code = translate(model, tokenized_input, vocab, device)
print(f"Final Example Prediction (Pseudocode to Code): {translated_code}\n")

Final Example Prediction (Pseudocode to Code): if ( x > 10 10 10 10 10 10 10 ) cout < < < x > 10 < x < x < endl ; <end>

