In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install transformers
!pip install datasets
!pip install accelerate -U
!pip install transformers[torch]
!pip install tensorflow

In [9]:
from torch.utils.data import Dataset, DataLoader
from transformers import T5Tokenizer, T5EncoderModel, T5ForConditionalGeneration, GPT2LMHeadModel, AutoModel, AutoTokenizer, EncoderDecoderModel, AutoConfig,Trainer, TrainingArguments
import pandas as pd
import torch.nn as nn
from torch.utils.data import Dataset
import torch
from sklearn.model_selection import train_test_split
from datasets import load_dataset
from torch.nn import TransformerDecoder, CrossEntropyLoss
from torch.nn import functional as F
from typing import Union, Tuple, Optional
from transformers.utils import is_torch_fx_proxy
from transformers.modeling_outputs import Seq2SeqLMOutput


In [2]:
def load_dataset_from_csv(file_path, tokenizer, max_length=512):
    dataset = load_dataset('csv', data_files=file_path, split='train')
    dataset = dataset.filter(lambda example: all(value is not None for value in example.values()))
    dataset = dataset.train_test_split(test_size=0.2)

    def tokenize_function(examples):
        inputs = tokenizer(examples['lang1'], padding='max_length', max_length=max_length, truncation=True, return_tensors="pt")
        targets = tokenizer(examples['lang2'], padding='max_length', max_length=max_length, truncation=True, return_tensors="pt")
        attention_mask = inputs.attention_mask

        return {
            'input_ids': inputs.input_ids,
            'attention_mask': attention_mask,
            'labels': targets.input_ids}


    tokenized_datasets = dataset.map(tokenize_function, batched=True, remove_columns=dataset["train"].column_names)
    return tokenized_datasets

In [4]:
tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
file_path = "./drive/MyDrive/preprocessed.csv"
dataset = load_dataset_from_csv(file_path, tokenizer)

Map:   0%|          | 0/1133 [00:00<?, ? examples/s]

Map:   0%|          | 0/284 [00:00<?, ? examples/s]

In [99]:
import torch.nn.functional as F

class CodeTranslator(nn.Module):
    def __init__(self, encoder_model, vocab_size, d_model, nhead=4, num_layers=4):
        super(CodeTranslator, self).__init__()
        self.block_size = encoder_model.config.max_position_embeddings
        self.token_embedding_table = nn.Embedding(vocab_size, d_model)
        self.position_embedding_table = nn.Embedding(self.block_size, d_model)
        self.encoder_model = encoder_model
        self.decoder_layer = nn.TransformerDecoderLayer(d_model=d_model,
                                                        nhead=nhead)
        self.decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_layers)
        self.lm_head = nn.Linear(in_features=d_model, out_features=vocab_size)
        self.softmax = nn.Softmax(-1)

    def forward(self, input_ids, labels=None, attention_mask=None):
        if attention_mask is None:
            encoder_outputs = self.encoder_model(input_ids=input_ids)
        else:
            encoder_outputs = self.encoder_model(input_ids=input_ids, attention_mask=attention_mask)

        B, T = input_ids.shape
        last_hidden_state = encoder_outputs[0]

        tok_emb = self.token_embedding_table(input_ids)

        pos_emb = self.position_embedding_table(torch.arange(T, device=input_ids.device))

        x = tok_emb + pos_emb

        x = self.decoder(
            tgt = x,
            memory = last_hidden_state,
        )

        logits = self.lm_head(x)

        loss = None
        if labels is not None:
            B, T, C = logits.shape
            loss_fct = CrossEntropyLoss(ignore_index=-100)
            logits = logits.view(B * T, C)
            labels = labels.view(B * T)
            loss = F.cross_entropy(logits, labels)
        return Seq2SeqLMOutput(
            loss=loss,
            logits=logits,
        )

    def generate(self, input_tokens, max_length=50):
        for _ in range(max_length):
            input_tokens_cond = input_tokens[:, -self.block_size:]

            logits = self(input_tokens_cond).logits

            logits = logits[:,-1,:]

            probs = F.softmax(logits, dim=-1)

            input_tokens_next = torch.multinomial(probs, num_samples=1)

            input_tokens = torch.cat((input_tokens, input_tokens_next), dim=1)
        return input_tokens


        output_tokens = output_ids[1:]
        return output_tokens

In [100]:
encoder_model = AutoModel.from_pretrained("microsoft/codebert-base")
hidden_size = encoder_model.config.hidden_size
vocab_size = tokenizer.vocab_size
d_model = 768
decoder_start_token_id = tokenizer.pad_token_id

model = CodeTranslator(encoder_model=encoder_model,
                       vocab_size=vocab_size,
                       d_model=d_model,)
model = model.to('cuda')

In [None]:
training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/code_translator",
    num_train_epochs=50,
    per_device_train_batch_size=32,
    save_steps=600,
    save_total_limit=2,
    prediction_loss_only=False,
    logging_steps=100,
    evaluation_strategy="steps",
    eval_steps=200,
    logging_dir="./logs",
    logging_first_step=True,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    learning_rate=5e-5,
    lr_scheduler_type="linear",
    warmup_steps=0,
    gradient_accumulation_steps=1,
    logging_strategy="steps",
)



trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset['train'],
    eval_dataset=dataset['test'],
)

trainer.train()

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


Step,Training Loss,Validation Loss
200,3.8072,3.806203
400,3.7096,3.76675
600,3.6825,3.73162
800,3.6264,3.675152
1000,3.5706,3.657036
1200,3.5351,3.657521
1400,3.5013,3.646331
1600,3.4756,3.644694


Checkpoint destination directory /content/drive/MyDrive/code_translator/checkpoint-600 already exists and is non-empty. Saving will proceed but saved results may be invalid.
Checkpoint destination directory /content/drive/MyDrive/code_translator/checkpoint-1200 already exists and is non-empty. Saving will proceed but saved results may be invalid.


In [102]:
java_code = """
class MyClass {
    public static void main(String[] args) {
        System.out.println("Hello, World!");
    }
}
"""

In [103]:
def generate_translation(model, input_text, tokenizer, max_length=50):
    model.eval()
    input_ids = tokenizer.encode(input_text, return_tensors="pt")
    input_ids = input_ids.to('cuda')

    # Generate translation
    with torch.no_grad():
        output_ids = model.generate(input_ids, max_length=max_length)

    # Decode the generated output
    output_text = tokenizer.decode(output_ids[0][1:], skip_special_tokens=True)

    return output_text

python_code = generate_translation(model, java_code, tokenizer)
print("Generated Python code:\n", python_code)

Generated Python code:
 
class MyClass {
    public static void main(String[] args) {
        System.out.println("Hello, World!");
    }
}
  rotateN10 
 BST) range ==() n print alternateseandIS, image Node None next in(
 


In [104]:
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [None]:
%tensorboard serve --logdir logs

In [109]:
dataset = load_dataset('csv', data_files=file_path, split='train')
dataset = dataset.filter(lambda example: all(value is not None for value in example.values()))
dataset = dataset.train_test_split(test_size=0.2)

DatasetDict({
    train: Dataset({
        features: ['query', 'lang1', 'lang2'],
        num_rows: 1133
    })
    test: Dataset({
        features: ['query', 'lang1', 'lang2'],
        num_rows: 284
    })
})

In [119]:
df = dataset['test'].to_pandas()

In [139]:
reference_dataset_java = []  # List of Java code strings
reference_dataset_python = []  # List of Python code strings

In [140]:
for index, row in df.iterrows():
    reference_dataset_java.append(row['lang1'])
    reference_dataset_python.append(row['lang2'])

In [141]:
reference_dataset_java = reference_dataset_java[:10]
reference_dataset_python = reference_dataset_python[:10]


In [142]:
import nltk
from nltk.translate.bleu_score import corpus_bleu
import tokenize
import io

def tokenize_python_code(code):
    return [token.string for token in tokenize.tokenize(io.BytesIO(code.encode('utf-8')).readline)]

In [None]:
py_gen = []
for _ in range(10):
    out = generate_translation(model,java_code[:50], tokenizer)
    py_gen.append(out)

In [None]:
reference_dataset_tokenized = [[tokenize_python_code(code) for code in refs] for refs in reference_dataset_python]
generated_texts_tokenized = [tokenize_python_code(code) for code in py_gen]

bleu_score = corpus_bleu(reference_dataset_tokenized, generated_texts_tokenized)
print("BLEU Score:", bleu_score)
