In [1]:
import json

with open("step_definitions.json", "r") as f:
    data = json.load(f)

data[:3]

[{'AttributeText': 'Given("Returns a Task")',
  'MethodSignature': 'ReturnsATask()',
  'MethodBody': '{\n            throw new NotSupportedException("should be mocked");\n        }',
  'SourceFile': '/tmp/repos_56e52f16-5434-4b1d-a1e1-219bacac2163/reqnroll_Reqnroll/Tests/Reqnroll.RuntimeTests/StepExecutionTests.cs'},
 {'AttributeText': 'Then("SpecificBindingRegistryTests")',
  'MethodSignature': 'Transform(string val)',
  'MethodBody': '{\n                return 42;\n            }',
  'SourceFile': '/tmp/repos_56e52f16-5434-4b1d-a1e1-219bacac2163/reqnroll_Reqnroll/Tests/Reqnroll.RuntimeTests/RuntimeBindingRegistryBuilderTests.cs'},
 {'AttributeText': 'Then("SpecificBindingRegistryTests")',
  'MethodSignature': 'Transform(string val)',
  'MethodBody': '{\n                return 24;\n            }',
  'SourceFile': '/tmp/repos_56e52f16-5434-4b1d-a1e1-219bacac2163/reqnroll_Reqnroll/Tests/Reqnroll.RuntimeTests/RuntimeBindingRegistryBuilderTests.cs'}]

In [2]:
!pip install transformers datasets scikit-learn evaluate --quiet
import Levenshtein
import json
import torch
import numpy as np
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
import evaluate
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

with open("step_definitions.json", "r") as f:
    raw_data = json.load(f)

pairs = []
for item in raw_data:
    input_text = item["AttributeText"]
    output_text = item["MethodSignature"] + "\n" + item["MethodBody"]
    pairs.append({"input": input_text, "output": output_text})

train_data, eval_data = train_test_split(pairs, test_size=0.1, random_state=42)
train_dataset = Dataset.from_list(train_data)
eval_dataset = Dataset.from_list(eval_data)

model_name = "Salesforce/codet5-small"
tokenizer = AutoTokenizer.from_pretrained(model_name)

def tokenize_function(example):
    model_inputs = tokenizer(example["input"], max_length=128, truncation=True, padding="max_length")
    labels = tokenizer(example["output"], max_length=256, truncation=True, padding="max_length")
    labels_ids = [(l if l != tokenizer.pad_token_id else -100) for l in labels["input_ids"]]
    model_inputs["labels"] = labels_ids
    return model_inputs

tokenized_train = train_dataset.map(tokenize_function, batched=False)
tokenized_eval = eval_dataset.map(tokenize_function, batched=False)

model = AutoModelForSeq2SeqLM.from_pretrained(model_name)

bleu = evaluate.load("bleu")
rouge = evaluate.load("rouge")
exact_match_metric = evaluate.load("exact_match")

def postprocess(preds, labels):
    if isinstance(preds, tuple):
        preds = preds[0]

    if preds.ndim == 3:
        preds = np.argmax(preds, axis=-1)

    labels = np.where(labels == -100, tokenizer.pad_token_id, labels)

    decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

    decoded_preds = [pred.strip() for pred in decoded_preds]
    decoded_labels = [label.strip() for label in decoded_labels]

    return decoded_preds, decoded_labels

def compute_levenshtein(preds, refs):
    distances = [Levenshtein.distance(p, r) for p, r in zip(preds, refs)]
    avg_distance = sum(distances) / len(distances)
    return {
        "levenshtein_avg": avg_distance
    }

def compute_metrics(eval_preds):
    preds, labels = eval_preds

    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    decoded_preds, decoded_labels = postprocess(preds, labels)
    
    bleu_result = bleu.compute(
        predictions=decoded_preds,
        references=[[label] for label in decoded_labels]
    )

    rouge_result = rouge.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)
    exact_match_result = exact_match_metric.compute(predictions=decoded_preds, references=decoded_labels)
    levenshtein_result = compute_levenshtein(decoded_preds, decoded_labels)

    results = {}
    results.update(bleu_result)
    results["rougeL"] = rouge_result["rougeL"]
    results.update(exact_match_result)
    results.update(levenshtein_result)
    
    return results

training_args = TrainingArguments(
    output_dir="./model_output",
    per_device_train_batch_size=24,
    per_device_eval_batch_size=24,
    eval_accumulation_steps=1,
    eval_strategy="epoch",
    save_strategy="epoch",
    num_train_epochs=4,
    gradient_accumulation_steps=1,
    dataloader_num_workers=6,
    fp16=True,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    report_to="none",
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss"
)

small_tokenized_eval = tokenized_eval.select(range(250))

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=small_tokenized_eval,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

trainer.train()

best_model_dir = "./best_model"
trainer.save_model(best_model_dir)
tokenizer.save_pretrained(best_model_dir)

Map:   0%|          | 0/14383 [00:00<?, ? examples/s]

Map:   0%|          | 0/1599 [00:00<?, ? examples/s]

  trainer = Trainer(
Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Epoch,Training Loss,Validation Loss,Bleu,Precisions,Brevity Penalty,Length Ratio,Translation Length,Reference Length,Rougel,Exact Match,Levenshtein Avg
1,1.5056,1.458105,0.457376,"[0.728021978021978, 0.5142455482661669, 0.38925143953934743, 0.30029498525073745]",1.0,1.022568,10920,10679,0.495031,0.056,69.496
2,1.4506,1.282816,0.502829,"[0.7503178928247048, 0.5569702602230483, 0.4372978116079924, 0.34980506822612084]",1.0,1.030995,11010,10679,0.526083,0.072,62.924
3,1.2577,1.220771,0.516126,"[0.754355872528663, 0.5690403620578184, 0.4521130755412688, 0.36564345889416094]",1.0,1.037269,11077,10679,0.541097,0.112,60.748
4,1.2773,1.197983,0.520685,"[0.7566123188405797, 0.5714550509731232, 0.45721062618595826, 0.3718172983479106]",1.0,1.033805,11040,10679,0.545097,0.108,59.992


There were missing keys in the checkpoint model loaded: ['encoder.embed_tokens.weight', 'decoder.embed_tokens.weight', 'lm_head.weight'].


('./best_model/tokenizer_config.json',
 './best_model/special_tokens_map.json',
 './best_model/vocab.json',
 './best_model/merges.txt',
 './best_model/added_tokens.json',
 './best_model/tokenizer.json')

In [4]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

tokenizer = AutoTokenizer.from_pretrained(best_model_dir)
model = AutoModelForSeq2SeqLM.from_pretrained(best_model_dir)

test_inputs = [
    "Given I log in as user",
    "When I log in as user 'user123'"
]

inputs = tokenizer(test_inputs, max_length=128, truncation=True, padding="max_length", return_tensors="pt")

outputs = model.generate(
    input_ids=inputs["input_ids"],
    attention_mask=inputs["attention_mask"],
    max_length=256,
    num_beams=5,
    early_stopping=True
)

predicted_texts = tokenizer.batch_decode(outputs, skip_special_tokens=True)

for i, prediction in enumerate(predicted_texts):
    print(f"Input: {test_inputs[i]}")
    print(f"Generated output:\n{prediction}\n")


Input: Given I log in as user
Generated output:
GivenILogInAsUser()
{
            loginPage.Login();
        }

Input: When I log in as user 'user123'
Generated output:
WhenILogInAsUser123()
{
            loginPage.Login(user123);
        }

