In [46]:
import json
from datasets import load_dataset
from transformers import (
    T5ForConditionalGeneration,
    T5Tokenizer,
    Trainer,
    TrainingArguments
)
import torch
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
import re
import pandas as pd

In [47]:
df = pd.read_csv('flatten_dataset.csv')
df['cleaned_method'][0]



In [3]:
print(df)

                                         original_method  \
0      def parse_junit_reports(path_to_reports: str) ...   
1      def convert_junit_to_testcases(xml: JUnitXml |...   
2      def render_tests(testcases: list[TestCase]) ->...   
3      def deep_update(d: dict, u: dict) -> dict:  # ...   
4      def main() -> None:\n    recommended_settings ...   
...                                                  ...   
76935  def check_git_version():\n    # check git vers...   
76936  def sort_lines_mailmap(lines):\n    for n, lin...   
76937  def key(line):\n        # return lower case fi...   
76938  def generate_covered_files(top_dir):\n    for ...   
76939  def make_report(\n    test_args, source_dir='s...   

                                          cleaned_method  \
0      def parse_junit_reports(path_to_reports: str) ...   
1      def convert_junit_to_testcases(xml: JUnitXml |...   
2      def render_tests(testcases: list[TestCase]) ->...   
3      def deep_update(d: dict, u: dict

In [48]:
data_files = 'flatten_dataset.csv'
dataset = load_dataset("csv", split="train",data_files=data_files)  # Using a small subset for testing
print(dataset[0])
# 2. Preprocessing (Tokenization and Custom Masking)
# Load a pre-trained tokenizer
#tokenizer = T5Tokenizer.from_pretrained("t5-small")
tokenizer = AutoTokenizer.from_pretrained("../pretrained_codet5")
print(tokenizer.encode("<extra_id_0>"))

Found cached dataset csv (/home/miislam/.cache/huggingface/datasets/csv/default-f6e583c620e16a3a/0.0.0/eea64c71ca8b46dd3f537ed218fc9bf495d5707789152eb2764f5c78fa66d59d)


[1, 32099, 2]


In [33]:
print(dataset)

Dataset({
    features: ['original_method', 'cleaned_method', 'masked_method', 'condition_line', 'masked_method_token_count', 'condition_token_count', 'flattened_method'],
    num_rows: 76940
})


In [None]:
def convert_indentation_to_tabs(code):
    # Split the code into lines
    lines = code.split('\n')
    converted_lines = []

    for line in lines:
        # Determine the level of indentation (number of leading spaces)
        stripped_line = line.lstrip()
        indent_level = (len(line) - len(stripped_line)) // 4  # Assuming 4 spaces per indent level

        # Replace each indentation level with "<TAB>"
        tabbed_line = "<TAB>" * indent_level + stripped_line
        converted_lines.append(tabbed_line)

    # Join the lines with a space to match your required format
    return " ".join(converted_lines)
print(convert_indentation_to_tabs(df['cleaned_method'][0]))
for i in range(len(dataset)):
  dataset['cleaned_method'][i] = convert_indentation_to_tabs(dataset['cleaned_method'][i])



In [49]:
print(dataset)

Dataset({
    features: ['original_method', 'cleaned_method', 'masked_method', 'condition_line', 'masked_method_token_count', 'condition_token_count', 'flattened_method'],
    num_rows: 76940
})


In [50]:
import re
import random

def mask_if_statements(examples):
    # Regular expression pattern to match 'if' conditions from 'if' to the colon
    pattern = re.compile(r"if\s+(.*?):")

    masked_code = []

    for code in examples['cleaned_method']:
        # Find all 'if' conditions in the code
        matches = list(pattern.finditer(code))

        # If no 'if' conditions are found, add the original code to the masked list
        if not matches:
            masked_code.append(code)
            continue

        # Randomly select one 'if' condition to mask
        selected_match = random.choice(matches)

        # Replace the selected 'if' condition with '<fill-in>'
        start, end = selected_match.span()
        code_with_mask = code[:start] + '<extra_id_0>' + code[end:]
        masked_code.append(code_with_mask)

    # Return the modified dataset
    return {
        "input_text": masked_code,  # Masked code snippet
        "target_text": examples['condition_line']  # original unmasked code snippet
    }

# # Example usage with sample input
# examples = {
#     'input_method': [
#         """def example_function():
#     if x > 0:
#         print("Positive")
#     if y < 0:
#         print("Negative")
#     if z == 10:
#         print("Equal to ten")"""
#     ]
# }

# # Apply the function to mask one random 'if' condition
# masked_output = mask_if_statements(examples)
# print(masked_output)

In [51]:
# Apply masking to create input-output pairs for T5
masked_dataset = dataset.map(mask_if_statements, batched=True)
print(masked_dataset)

Map:   0%|          | 0/76940 [00:00<?, ? examples/s]

Dataset({
    features: ['original_method', 'cleaned_method', 'masked_method', 'condition_line', 'masked_method_token_count', 'condition_token_count', 'flattened_method', 'input_text', 'target_text'],
    num_rows: 76940
})


In [53]:
# Remove rows that contain non-string values in "target_text"
def filter_invalid_texts(example):
    return isinstance(example['target_text'], str)

# Apply the filter to the dataset
filtered_dataset = masked_dataset.filter(filter_invalid_texts)

# Now define the tokenization function again
def tokenize_function(examples):
    # Extract input and target texts
    input_texts = examples["input_text"]
    target_texts = examples["target_text"]

    # Ensure all target_texts are strings (filtering should have handled this)
    assert all(isinstance(text, str) for text in target_texts), "All elements in target_texts must be strings."

    # Tokenizing input texts
    model_inputs = tokenizer(
        input_texts, 
        padding="max_length", 
        truncation=True, 
        max_length=512
    )

    # Tokenizing target texts for labels
    labels = tokenizer(
        target_texts, 
        padding="max_length", 
        truncation=True, 
        max_length=512
    )["input_ids"]

    # Replace padding token id's with -100 for labels
    labels = [
        [(label if label != tokenizer.pad_token_id else -100) for label in label_list]
        for label_list in labels
    ]

    # Adding the labels to model_inputs
    model_inputs["labels"] = labels

    return model_inputs

# Tokenize the dataset using the corrected function
tokenized_datasets = filtered_dataset.map(tokenize_function, batched=True)

# Split into train and test sets for training
train_test_split = tokenized_datasets.train_test_split(test_size=0.2)
train_dataset = train_test_split['train']
test_dataset = train_test_split['test']
model = AutoModelForSeq2SeqLM.from_pretrained("../pretrained_codet5")


Loading cached processed dataset at /home/miislam/.cache/huggingface/datasets/csv/default-f6e583c620e16a3a/0.0.0/eea64c71ca8b46dd3f537ed218fc9bf495d5707789152eb2764f5c78fa66d59d/cache-ab287a2993843994.arrow


Map:   0%|          | 0/75419 [00:00<?, ? examples/s]

In [54]:
print(train_dataset,test_dataset)

Dataset({
    features: ['original_method', 'cleaned_method', 'masked_method', 'condition_line', 'masked_method_token_count', 'condition_token_count', 'flattened_method', 'input_text', 'target_text', 'input_ids', 'attention_mask', 'labels'],
    num_rows: 60335
}) Dataset({
    features: ['original_method', 'cleaned_method', 'masked_method', 'condition_line', 'masked_method_token_count', 'condition_token_count', 'flattened_method', 'input_text', 'target_text', 'input_ids', 'attention_mask', 'labels'],
    num_rows: 15084
})


In [58]:
# Set up training arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=1,  # Reduced for testing
    weight_decay=0.01,
    save_total_limit=3,
)

In [64]:
from datasets import load_metric
# Load the metric you want to use (in this case, ROUGE)
metric = load_metric("rouge")

# Define the compute_metrics function
def compute_metrics(eval_pred):
    # Extract predictions and labels
    predictions, labels = eval_pred

    # Decode the predictions and labels to strings
    decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

    # Use the metric to compute the scores
    result = metric.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)

    # Calculate the average ROUGE scores and return
    # You can also add additional metrics if desired
    result = {key: value.mid.fmeasure * 100 for key, value in result.items()}
    return result

In [65]:
# Initialize the Trainer with model, tokenizer, and datasets
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
# Start Training
trainer.train()



Epoch,Training Loss,Validation Loss


In [None]:
# Save the trained model
trainer.save_model("./fine_tuned_model")

In [None]:
# 4. Evaluation on test set
eval_results = trainer.evaluate()
print("Evaluation Results:", eval_results)

In [37]:
tokenizer = AutoTokenizer.from_pretrained("fine_tuned_model")

In [38]:
model = AutoModelForSeq2SeqLM.from_pretrained("fine_tuned_model")

In [39]:
# Example test input data (a list of Python methods with masked parts)
test_inputs = [
    """def factorial(n):
    <extra_id_0>
        return 1
    else:
        return n * factorial(n-1)""",
    """"def word_frequency(text):
    words = text.split()
    frequency = {}
    for word in words:
        word = word.lower()
        <fill-in>
            frequency[word] += 1
        else:
            frequency[word] = 1
    return frequency""",
    """"def primes_in_range(start, end):
    def is_prime(num):
        if num <= 1:
            return False
        for i in range(2, int(num**0.5) + 1):
            <fill-in>
                return False
        return True
    
    primes = [num for num in range(start, end + 1) if is_prime(num)]
    return primes""",
    """def put_blank_line(self, trace, count=1):
        count -= self.blank_line_count
        while count > ZERO:
            self.put(BLANK_LINE)  
            self.put(self.newline)  
            <fill-in>
                self.put(""blank(%s)"" % str(trace))  
            self.blank_line_count += 1
            count -= 1
        return self
        """,
        """
            def render(self, context):
            bits = []
            for node in self:
                <fill-in>
                    bit = self.render_node(node, context)
                else:
                    bit = node
                bits.append(force_unicode(bit))
            return mark_safe(u"".join(bits))
        """,
        """"def _cache_db_tables_iterator(tables, cache_alias, db_alias):
            no_tables = not tables
            cache_aliases = settings.CACHES if cache_alias is None else (cache_alias,)
            db_aliases = settings.DATABASES if db_alias is None else (db_alias,)
            for db_alias in db_aliases:
                if no_tables:
                    tables = connections[db_alias].introspection.table_names()
               <fill-in>
                    for cache_alias in cache_aliases:
                        yield cache_alias, db_alias, tables
         """,
         """def _cache_db_tables_iterator ( tables , cache_alias , db_alias ) : <TAB> no_tables = not tables <TAB> cache_aliases = settings . CACHES if cache_alias is None else ( cache_alias , ) <TAB> db_aliases = settings . DATABASES if db_alias is None else ( db_alias , ) <TAB> for db_alias in db_aliases : <TAB> <TAB> if no_tables : <TAB> <TAB> <TAB> tables = connections [ db_alias ] . introspection . table_names ( ) <TAB> <TAB> <fill-in> <TAB> <TAB> <TAB> for cache_alias in cache_aliases : <TAB> <TAB> <TAB> <TAB> yield cache_alias , db_alias , tables"""
         ,""""def bind(self, sock, path):
    # Bind the socket
    try:
        sock.bind(path)
    except OSError as e:
        <fill-in>
            self.skipTest(
                ""Pathname {0!a} is too long to serve as a AF_UNIX path"".format(path)
            )
        else:
            raise
    ""","""def render ( self , context ) : <TAB> bits = [ ] <TAB> for node in self : <TAB> <TAB> <fill-in> <TAB> <TAB> <TAB> bit = self . render_node ( node , context ) <TAB> <TAB> else : <TAB> <TAB> <TAB> bit = node <TAB> <TAB> bits . append ( force_unicode ( bit ) ) <TAB> return mark_safe ( u"" . join ( bits ) )""",
    """def _get_enclosing_context_level ( child_context , name ) : <TAB> <TAB> <TAB> if name in child_context . local_vars : <TAB> <TAB> return None <TAB> else : <TAB> <TAB> level = 0 <TAB> <TAB> for context in child_context . outer_contexts [ : : - 1 ] : <TAB> <TAB> <TAB> level += 1 <TAB> <TAB> <TAB> <fill-in> <TAB> <TAB> <TAB> <TAB> return level <TAB> return None""",
    """def calcPolygonRect ( pointArray ) : <TAB>  <TAB> <TAB> l , t , r , b = 10000000 , 10000000 , - 10000000 , - 10000000 <TAB> <TAB> <TAB> <TAB> <TAB> for n in pointArray : <TAB> <TAB> <fill-in> <TAB> <TAB> <TAB> l = n [ 0 ] <TAB> <TAB> if n [ 0 ] > r : <TAB> <TAB> <TAB> r = n [ 0 ] <TAB> <TAB> if n [ 1 ] < t : <TAB> <TAB> <TAB> t = n [ 1 ] <TAB> <TAB> if n [ 1 ] > b : <TAB> <TAB> <TAB> b = n [ 1 ] <TAB> return l , t , r , b"""
]


In [40]:
# Tokenize the test inputs
tokenized_inputs = tokenizer(test_inputs, padding=True, truncation=True, return_tensors="pt")

In [41]:
# Generate predictions
model.eval()  # Set the model to evaluation mode
with torch.no_grad():  # Disable gradient calculation for inference
    output_sequences = model.generate(
        input_ids=tokenized_inputs.input_ids,
        attention_mask=tokenized_inputs.attention_mask,
        max_length=5000,  # Specify the maximum length of the generated output
        num_return_sequences=1,
        temperature=0.7  # Adjust temperature for randomness
    )


In [42]:
# Decode the generated sequences
for i, output in enumerate(output_sequences):
    decoded_text = tokenizer.decode(output, skip_special_tokens=True)
    print(f"Test Input {i + 1}:")
    print(f"Original Input: {test_inputs[i]}")
    print(f"Model Prediction: {decoded_text}")
    print("-" * 50)

Test Input 1:
Original Input: def factorial(n):
    <extra_id_0>
        return 1
    else:
        return n * factorial(n-1)
Model Prediction: if n == 0:
--------------------------------------------------
Test Input 2:
Original Input: "def word_frequency(text):
    words = text.split()
    frequency = {}
    for word in words:
        word = word.lower()
        <fill-in>
            frequency[word] += 1
        else:
            frequency[word] = 1
    return frequency
Model Prediction: if word in frequency:
--------------------------------------------------
Test Input 3:
Original Input: "def primes_in_range(start, end):
    def is_prime(num):
        if num <= 1:
            return False
        for i in range(2, int(num**0.5) + 1):
            <fill-in>
                return False
        return True
    
    primes = [num for num in range(start, end + 1) if is_prime(num)]
    return primes
Model Prediction: if num <= 1:
--------------------------------------------------
Test Inpu