Motivation:

https://towardsdatascience.com/building-a-python-code-generator-4b476eec5804 (Building a Python Code Generator)

https://github.com/divyam96/English-to-Python-Converter/tree/main (Github implementation)

https://huggingface.co/docs/transformers/tasks/translation (HuggingFace Seq2Seq Translation)

In [None]:
#!wget "https://drive.google.com/u/0/uc?id=1rHb0FQ5z5ZpaY2HpyFGY6CeyDG0kTLoO&export=download" -O english_python_data.txt

In [1]:
!pip install transformers[torch]



In [2]:
import pandas as pd
import numpy as np

np.random.seed(0)

with open("english_python_data.txt", "r") as f:
    file_lines = f.readlines()

dps = []
current_question = None
current_solution = []

for line in file_lines:
    if line.startswith("#"):
        if current_question is not None:
            dps.append({"question": current_question, "solution": ''.join(current_solution)})
        current_question = line[1:].strip()
        current_solution = []
    else:
        current_solution.append(line)

if current_question is not None:
    dps.append({"question": current_question, "solution": ''.join(current_solution)})

python_problems_df = pd.DataFrame(dps)

msk = np.random.rand(len(python_problems_df)) < 0.8

train_df = python_problems_df[msk]
val_df = python_problems_df[~msk]

In [3]:
def augment_dataframe(train_df):
    augmented_rows = []

    for index, row in train_df.iterrows():
        solution = row["solution"]
        
        augmented_row_1 = row.copy()
        augmented_solution_1 = solution.replace("num1", "x").replace("num2", "y").replace("num3", "z") \
                                        .replace("num", "number") \
                                        .replace("largest", "highest").replace("smallest", "lowest") \
                                        .replace("l1", "list1").replace("l2", "list2") \
                                        .replace(" i ", " j ") \
                                        .replace("x", "num")
        if augmented_solution_1 != solution:
            augmented_row_1["solution"] = augmented_solution_1
            augmented_rows.append(augmented_row_1)

        augmented_row_2 = row.copy()
        augmented_solution_2 = solution.replace("num1", "first").replace("num2", "second").replace("num3", "third") \
                                        .replace(" i ", " k ") \
                                        .replace("num", "x")
        if augmented_solution_2 != solution:
            augmented_row_2["solution"] = augmented_solution_2
            augmented_rows.append(augmented_row_2)

        augmented_row_3 = row.copy()
        augmented_solution_3 = solution.replace("num1", "a").replace("num2", "b").replace("num2", "c")
        if augmented_solution_3 != solution:
            augmented_row_3["solution"] = augmented_solution_3
            augmented_rows.append(augmented_row_3)

        augmented_row_4 = row.copy()
        augmented_solution_4 = solution.replace("num1", "var1").replace("num2", "var2").replace("num2", "var3")
        if augmented_solution_4 != solution:
            augmented_row_4["solution"] = augmented_solution_4
            augmented_rows.append(augmented_row_4)

    augmented_train_df = pd.DataFrame(augmented_rows)
    return augmented_train_df

augmented_train_df = pd.concat([train_df, augment_dataframe(train_df)], ignore_index=True).reset_index()

count_before = len(train_df)
count_after = len(augmented_train_df)

print("Count before augmentation:", count_before)
print("Count after augmentation:", count_after)

Count before augmentation: 3970
Count after augmentation: 7023


In [None]:
train_df.iloc[10]["question"]

In [None]:
train_df.iloc[10]["solution"]

In [4]:
import shutil
import os

def clear_directory(directory):
    for root, dirs, files in os.walk(directory, topdown=False):
        for name in files:
            os.remove(os.path.join(root, name))
        for name in dirs:
            os.rmdir(os.path.join(root, name))
    os.rmdir(directory)

directory_to_clear = "codet5_large_python_code_gen"

clear_directory(directory_to_clear)

FileNotFoundError: [Errno 2] No such file or directory: 'codet5_large_python_code_gen'

In [5]:
from transformers import TrainerCallback, EarlyStoppingCallback, AutoTokenizer, T5ForConditionalGeneration, AutoModelForSeq2SeqLM, DataCollatorForSeq2Seq, Seq2SeqTrainingArguments, Seq2SeqTrainer
import torch
from torch.utils.data import Dataset

if torch.cuda.is_available():
    num_gpus = torch.cuda.device_count()
    print(f"Training is using {num_gpus} GPU(s).")
else:
    print("Training is using CPU.")

class CustomDataset(Dataset):
    def __init__(self, data, tokenizer, max_length):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data.iloc[idx]
        inputs = item['question']
        targets = item['solution']
        
        encoding = self.tokenizer(
            inputs,
            text_target=targets,
            max_length=self.max_length,
            truncation=True,
            padding="max_length"
        )
        
        labels = encoding.pop("labels")
        encoding["labels"] = labels
        
        return encoding

# max_solution_length = max(len(i) for i in python_problems_df['question'] + ' ' + python_problems_df['solution'])
max_solution_length = 64

checkpoint = "Salesforce/codet5-large" # "t5-small" # 
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint)

# model = T5ForConditionalGeneration.from_pretrained(checkpoint) # for codet5

train_dataset = CustomDataset(augmented_train_df, tokenizer, max_solution_length)
val_dataset = CustomDataset(val_df, tokenizer, max_solution_length)

data_collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=checkpoint)

training_args = Seq2SeqTrainingArguments(
    output_dir=directory_to_clear,
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    weight_decay=0.01,
    save_total_limit=3,
    num_train_epochs=3,
    predict_with_generate=True,
    fp16=True,
    push_to_hub=False,
    # load_best_model_at_end=True,
    # metric_for_best_model="val_loss",
    # greater_is_better=False,
    # save_strategy="epoch"
)

class LabelSmoothingCrossEntropyLoss(torch.nn.Module):
    def __init__(self, smoothing=0.1, reduction='mean', ignore_index=-100):
        super(LabelSmoothingCrossEntropyLoss, self).__init__()
        self.smoothing = smoothing
        self.reduction = reduction
        self.ignore_index = ignore_index

    def forward(self, input, target):
        log_prob = torch.nn.functional.log_softmax(input, dim=-1)
        nll_loss = -log_prob.gather(dim=-1, index=target.unsqueeze(1))
        nll_loss = nll_loss.squeeze(1)

        if self.ignore_index is not None:
            pad_mask = target.eq(self.ignore_index)
            nll_loss.masked_fill_(pad_mask, 0.0)

        smooth_loss = -log_prob.mean(dim=-1)

        if self.reduction == 'sum':
            smooth_loss = smooth_loss.sum()
            nll_loss = nll_loss.sum()
        elif self.reduction == 'mean':
            smooth_loss = smooth_loss.mean()
            nll_loss = nll_loss.mean()

        loss = (1.0 - self.smoothing) * nll_loss + self.smoothing * smooth_loss
        return loss

def custom_compute_loss(model, inputs, labels, **kwargs):
    outputs = model(**inputs, labels=labels)
    logits = outputs.logits
    labels = inputs.pop("labels")
    loss_fn = LabelSmoothingCrossEntropyLoss(smoothing=0.1) # torch.nn.CrossEntropyLoss()
    loss = loss_fn(logits.view(-1, logits.shape[-1]), labels.view(-1))
    return loss

training_args.loss_fn = custom_compute_loss

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    # callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

trainer.train()

predictions = trainer.predict(val_dataset, max_new_tokens=64)

2024-02-16 14:09:51.703599: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-02-16 14:09:51.744347: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-02-16 14:09:51.744389: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-02-16 14:09:51.745457: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-02-16 14:09:51.752181: I tensorflow/core/platform/cpu_feature_guar

Training is using 1 GPU(s).


Epoch,Training Loss,Validation Loss
1,No log,0.873991
2,2.443000,0.729998
3,0.712200,0.696153


In [14]:
def print_example(index, val_dataset, predictions, tokenizer, val_df):

    example = val_dataset[index]
    
    question = tokenizer.decode(example["input_ids"], skip_special_tokens=True)
    solution = tokenizer.decode(example["labels"], skip_special_tokens=True)
    prediction = tokenizer.decode(predictions.predictions[index], skip_special_tokens=True)

    print("Question:")
    print(question)
    print("\nTrue Solution:")
    print(solution)
    print("\nPredicted Solution:")
    print(prediction)

example_index = 7

print_example(example_index, val_dataset, predictions, tokenizer, val_df)

Question:
Write a function to calculate simple interest, given p, r, t

True Solution:
def simp_int(p, r, t):
    interest = (p*r*t)/100
    return interest




Predicted Solution:
def simple_interest(p, r, t):
    return (p*r*t)/100




In [7]:
import torch

def generate_solution(user_text, trainer, tokenizer, max_length=64):
    input_ids = tokenizer.encode(user_text, max_length=max_length, truncation=True, return_tensors="pt")
    input_ids = input_ids.to(trainer.args.device)

    output_ids = trainer.model.generate(input_ids, max_length=max_length, num_beams=4, early_stopping=True)

    generated_solution = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    
    return generated_solution

user_text = "write a function to print sum of two number"
generated_solution = generate_solution(user_text, trainer, tokenizer)
print("Generated Solution:")
print(generated_solution)

Generated Solution:
def sum_two_numbers(num, y):
    sum = num + y
    print(f'Sum: {sum}')




In [8]:
user_text = "print sum of two number"
generated_solution = generate_solution(user_text, trainer, tokenizer)
print("Generated Solution:")
print(generated_solution)

Generated Solution:
a = 1.5
b = 6.3
sum = a + b
print(f'Sum: {sum}')




In [None]:
tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-large")
model = T5ForConditionalGeneration.from_pretrained("Salesforce/codet5-large")

text = "def greet(user): print(f'hello <extra_id_0>!')"
input_ids = tokenizer(text, return_tensors="pt").input_ids

# simply generate a single sequence
generated_ids = model.generate(input_ids, max_length=8)
print(tokenizer.decode(generated_ids[0], skip_special_tokens=True))

text = "Write a python function to greet an user"
input_ids = tokenizer(text, return_tensors="pt").input_ids

# simply generate a single sequence
generated_ids = model.generate(input_ids, max_length=8)
print(tokenizer.decode(generated_ids[0], skip_special_tokens=True))