<a href="https://colab.research.google.com/github/theantigone/Fine-Tuning-CodeT5/blob/master/Copy_of_CodeT5_CTransl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Hugging Face - Fine-Tuning CodeT5 for Code Translation (AI4SE Focus)

# This notebook demonstrates how to fine-tune the CodeT5 model using Hugging Face Transformers
# for a Software Engineering task: translating Python code to Java.

# ------------------------
# 1. Install Required Libraries
# ------------------------
!pip install torch==2.5.1 torchvision==0.20.1 torchaudio==2.5.1 --index-url https://download.pytorch.org/whl/cu124
!pip install transformers datasets evaluate

Looking in indexes: https://download.pytorch.org/whl/cu124
Collecting torch==2.5.1
  Downloading https://download.pytorch.org/whl/cu124/torch-2.5.1%2Bcu124-cp311-cp311-linux_x86_64.whl (908.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m908.3/908.3 MB[0m [31m801.0 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting torchvision==0.20.1
  Downloading https://download.pytorch.org/whl/cu124/torchvision-0.20.1%2Bcu124-cp311-cp311-linux_x86_64.whl (7.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.3/7.3 MB[0m [31m68.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting torchaudio==2.5.1
  Downloading https://download.pytorch.org/whl/cu124/torchaudio-2.5.1%2Bcu124-cp311-cp311-linux_x86_64.whl (3.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.4/3.4 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch==2.5.1)
  Downloading https://download.pytorch.org/whl/cu124/nvidia_cuda_nvrtc_cu

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pandas as pd
import re
import datasets
from datasets import Dataset

# Set the maximum column width to None (i.e., no limit)
pd.set_option('display.max_colwidth', None)

# Optionally, expand the display to the full width of the terminal or notebook
pd.set_option('display.width', None)

In [4]:
# ------------------------------------------------------------------------
# 2. Load Dataset (CodeXGLUE - Code Translation Java <=> C#)
# ------------------------------------------------------------------------
from datasets import load_dataset

train_df = pd.read_csv("/content/drive/Shareddrives/CSCI_420/assignment2/datasets/ft_train.csv")
val_df = pd.read_csv("/content/drive/Shareddrives/CSCI_420/assignment2/datasets/ft_valid.csv")
test_df = pd.read_csv("/content/drive/Shareddrives/CSCI_420/assignment2/datasets/ft_test.csv")

In [5]:
def flatten_code(code):
    """
    Flattens the code by removing extra whitespace and newlines.
    """
    return " ".join(code.split())

In [6]:
def generate_pattern(target):
    """
    Generate a regex pattern that matches the target block
    in a whitespace-independent manner.
    """
    # Remove leading/trailing whitespace
    target = target.strip()
    # Tokenize by splitting on any whitespace
    tokens = re.findall(r'\S+', target)

    # If the last token is a colon, remove it from tokens and add a literal colon in the pattern.
    if tokens and tokens[-1] == ":":
        tokens = tokens[:-1]
        # Join tokens with optional whitespace in between and force a colon at the end.
        pattern = r'\s*'.join(map(re.escape, tokens)) + r'\s*:'
    else:
        pattern = r'\s*'.join(map(re.escape, tokens))

    return pattern

In [7]:
def mask_if_conditions(df, df_name="dataset"):
    masked_data = []

    for idx, row in df.iterrows():
        function_code = row['cleaned_method']
        target_if_condition = row['target_block']

        if not isinstance(target_if_condition, str) or not target_if_condition.strip():
            print(f"[{df_name} row {idx}] Skipped: Empty or invalid target_block")
            continue

        # Raw target condition (for replacement prompt)
        raw_condition = target_if_condition.strip()
        # Flatten the function code so that it becomes a single-line string
        flattened_func = flatten_code(function_code)
        # Generate a regex pattern from the target condition that is flexible with whitespace differences.
        pattern = generate_pattern(raw_condition)

        #print(f"\n=== Debug Info ===")
        #print(f"[{df_name} row {idx}] Raw Condition: {raw_condition}")
        #print(f"[{df_name} row {idx}] Generated Pattern: {pattern}")
        #print(f"[{df_name} row {idx}] Flattened Function: {flattened_func}")

        # Replace the first occurrence of the target block with "<mask>:"
        masked_func, count = re.subn(pattern, "<mask>:", flattened_func, count=1)

        if count == 0:
            print(f"[{df_name} row {idx}] Warning: Condition not found or not replaced")
            continue

        #print(f"[{df_name} row {idx}] Masked Function: {masked_func}")

        masked_data.append({
            'masked_input': masked_func,
            'target': raw_condition,
            'original_function': function_code
        })

    return pd.DataFrame(masked_data)

In [8]:
masked_train_df = mask_if_conditions(train_df, df_name="train")
masked_val_df = mask_if_conditions(val_df, df_name="val")
masked_test_df = mask_if_conditions(test_df, df_name="test")

In [9]:
masked_train_df.head()

Unnamed: 0,masked_input,target,original_function
0,"def _resolve_lib_imported_symbols(self, lib, imported_libs, generic_refs): """"""Resolve the imported symbols in a library."""""" for symbol in lib.elf.imported_symbols: imported_lib = self._find_exported_symbol(symbol, imported_libs) if not imported_lib: lib.unresolved_symbols.add(symbol) else: lib.linked_symbols[symbol] = imported_lib <mask>: ref_lib = generic_refs.refs.get(imported_lib.path) if not ref_lib or not symbol in ref_lib.exported_symbols: lib.imported_ext_symbols[imported_lib].add(symbol)",if generic_refs :,"def _resolve_lib_imported_symbols(self, lib, imported_libs, generic_refs):\n """"""Resolve the imported symbols in a library.""""""\n for symbol in lib.elf.imported_symbols:\n imported_lib = self._find_exported_symbol(symbol, imported_libs)\n if not imported_lib:\n lib.unresolved_symbols.add(symbol)\n else:\n lib.linked_symbols[symbol] = imported_lib\n if generic_refs:\n ref_lib = generic_refs.refs.get(imported_lib.path)\n if not ref_lib or not symbol in ref_lib.exported_symbols:\n lib.imported_ext_symbols[imported_lib].add(symbol)\n"
1,"def make_docs_directory(output_dir, name): if not isdir(pjoin(output_dir, name)): subprocess.run([""mkdir"", pjoin(output_dir, name)], stdout=subprocess.PIPE) for i in range(10): <mask>: subprocess.run( [""mkdir"", pjoin(output_dir, name, str(i))], stdout=subprocess.PIPE )","if not isdir ( pjoin ( output_dir , name , str ( i ) ) ) :","def make_docs_directory(output_dir, name):\n if not isdir(pjoin(output_dir, name)):\n subprocess.run([""mkdir"", pjoin(output_dir, name)], stdout=subprocess.PIPE)\n for i in range(10):\n if not isdir(pjoin(output_dir, name, str(i))):\n subprocess.run(\n [""mkdir"", pjoin(output_dir, name, str(i))], stdout=subprocess.PIPE\n )\n"
2,"def assert_results(self, results, activities, msg=""""): activity_ids = [] extra_context = [] for result in results: if hasattr(result, ""serialization_id""): activity_ids.append(result.serialization_id) else: activity_ids.append(result) <mask>: extra_context.append(result.extra_context) compare_lists(activity_ids, [a.serialization_id for a in activities], msg) if extra_context: self.assertEquals([a.extra_context for a in activities], extra_context)","if hasattr ( result , ""extra_context"" ) :","def assert_results(self, results, activities, msg=""""):\n activity_ids = []\n extra_context = []\n for result in results:\n if hasattr(result, ""serialization_id""):\n activity_ids.append(result.serialization_id)\n else:\n activity_ids.append(result)\n if hasattr(result, ""extra_context""):\n extra_context.append(result.extra_context)\n compare_lists(activity_ids, [a.serialization_id for a in activities], msg)\n if extra_context:\n self.assertEquals([a.extra_context for a in activities], extra_context)\n"
3,"def for_file(cls, filename: str, modname: str) -> ""ModuleAnalyzer"": if (""file"", filename) in cls.cache: return cls.cache[""file"", filename] try: with tokenize.open(filename) as f: obj = cls(f, modname, filename, decoded=True) cls.cache[""file"", filename] = obj except Exception as err: <mask>: obj = cls.cache[""file"", filename] = cls.for_egg(filename, modname) else: raise PycodeError(""error opening %r"" % filename, err) from err return obj","if "".egg"" + path . sep in filename :","def for_file(cls, filename: str, modname: str) -> ""ModuleAnalyzer"":\n if (""file"", filename) in cls.cache:\n return cls.cache[""file"", filename]\n try:\n with tokenize.open(filename) as f:\n obj = cls(f, modname, filename, decoded=True)\n cls.cache[""file"", filename] = obj\n except Exception as err:\n if "".egg"" + path.sep in filename:\n obj = cls.cache[""file"", filename] = cls.for_egg(filename, modname)\n else:\n raise PycodeError(""error opening %r"" % filename, err) from err\n return obj\n"
4,"def merge_dicts(source: Dict, destination: Dict) -> Dict: for key, value in source.items(): <mask>: # get node or create one node = destination.setdefault(key, {}) merge_dicts(value, node) else: destination[key] = value return destination","if isinstance ( value , dict ) :","def merge_dicts(source: Dict, destination: Dict) -> Dict:\n for key, value in source.items():\n if isinstance(value, dict):\n # get node or create one\n node = destination.setdefault(key, {})\n merge_dicts(value, node)\n else:\n destination[key] = value\n return destination\n"


In [10]:
# =======================
# Test Helper Function
# =======================
def test_mask_presence(masked_df, df_name="dataset"):
    """
    Verify that each 'masked_input' in the DataFrame contains '<mask>'.
    Raises an AssertionError if any row fails the test.
    """
    missing_mask = masked_df[~masked_df['masked_input'].str.contains("<mask>")]
    if not missing_mask.empty:
        error_msg = f"Test failed: Some rows in {df_name} do not contain '<mask>':\n{missing_mask}"
        raise AssertionError(error_msg)
    else:
        print(f"All rows in {df_name} have '<mask>' in the masked_input.")

In [11]:
# =======================
# Run Test Cases to Check Mask Presence
# =======================
test_mask_presence(masked_train_df, df_name="train")
test_mask_presence(masked_val_df, df_name="val")
test_mask_presence(masked_test_df, df_name="test")

All rows in train have '<mask>' in the masked_input.
All rows in val have '<mask>' in the masked_input.
All rows in test have '<mask>' in the masked_input.


In [12]:
df_no_mask_train = masked_train_df[~masked_train_df["masked_input"].str.contains("<mask>")]

# Display the complete rows of the new masked dataframe that do not have <mask>
print("Rows with no <mask> in the 'masked_method' column:")
df_no_mask_train

Rows with no <mask> in the 'masked_method' column:


Unnamed: 0,masked_input,target,original_function


In [13]:
df_no_mask_val = masked_val_df[~masked_val_df["masked_input"].str.contains("<mask>")]

# Display the complete rows of the new masked dataframe that do not have <mask>
print("Rows with no <mask> in the 'masked_method' column:")
df_no_mask_val

Rows with no <mask> in the 'masked_method' column:


Unnamed: 0,masked_input,target,original_function


In [14]:
df_no_mask_test = masked_test_df[~masked_test_df["masked_input"].str.contains("<mask>")]

# Display the complete rows of the new masked dataframe that do not have <mask>
print("Rows with no <mask> in the 'masked_method' column:")
df_no_mask_test

Rows with no <mask> in the 'masked_method' column:


Unnamed: 0,masked_input,target,original_function


✅ This following loads a pre-trained models & tokenizer from Hugging Face using the checkpoint name (e.g., "Salesforce/codet5-small").


*  The tokenizer knows how to convert text into tokens that the model

*   It also handles things like padding, truncation, special tokens, etc.

*	It comes with a fixed vocabulary learned during pretraining, that however we can expand if needed as shown

In [15]:
# ------------------------------------------------------------------------
# 3. Load Pre-trained Model & Tokenizer
# ------------------------------------------------------------------------
from transformers import T5ForConditionalGeneration, AutoModelForSeq2SeqLM
from transformers import RobertaTokenizer
from datasets import DatasetDict
from transformers import TrainingArguments, Trainer
from transformers import EarlyStoppingCallback

model_checkpoint = "Salesforce/codet5-small"

model = T5ForConditionalGeneration.from_pretrained(model_checkpoint)

tokenizer = RobertaTokenizer.from_pretrained(model_checkpoint)
tokenizer.add_tokens(["<IF-STMT>"]) #Imagine we need an extra token. This line adds the extra token to the vocabulary

model.resize_token_embeddings(len(tokenizer))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.57k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/242M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/1.48k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/703k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/294k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/12.5k [00:00<?, ?B/s]

The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`


Embedding(32101, 512)

⚠️⚠️⚠️ If you add new tokens like this, you must also resize the model’s embedding layer: model.resize_token_embeddings(len(tokenizer))

Otherwise, the model won’t know what to do with the new token IDs!


In [16]:
train_df = Dataset.from_dict(masked_train_df)
val_df = Dataset.from_dict(masked_val_df)
test_df = Dataset.from_dict(masked_test_df)

my_dataset_dict = datasets.DatasetDict({
    'train': train_df,
    'validation': val_df,
    'test': test_df
})

In [17]:
my_dataset_dict

DatasetDict({
    train: Dataset({
        features: ['masked_input', 'target', 'original_function'],
        num_rows: 50000
    })
    validation: Dataset({
        features: ['masked_input', 'target', 'original_function'],
        num_rows: 5000
    })
    test: Dataset({
        features: ['masked_input', 'target', 'original_function'],
        num_rows: 5000
    })
})

In [18]:
# ------------------------------------------------------------------------------------------------
# 4. We prepare now the fine-tuning dataset using the tokenizer we preloaded
# ------------------------------------------------------------------------------------------------

def preprocess_function(examples):
    inputs = examples["masked_input"]
    targets = examples["target"]
    model_inputs = tokenizer(inputs, max_length=256, truncation=True, padding="max_length")
    labels = tokenizer(targets, max_length=256, truncation=True, padding="max_length")
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

# Now use the map function with batched=True
tokenized_datasets = my_dataset_dict.map(preprocess_function, batched=True)

Map:   0%|          | 0/50000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

Map:   0%|          | 0/5000 [00:00<?, ? examples/s]

In [19]:
# ------------------------------------------------------------------------
# 5. Define Training Arguments and Trainer
# ------------------------------------------------------------------------


training_args = TrainingArguments(
    output_dir="./codet5-finetuned",
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_dir="./logs",
    learning_rate=5e-5,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    num_train_epochs=5,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    save_total_limit=2,
    logging_steps=100,
    push_to_hub=False,
)



trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    tokenizer=tokenizer,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

  trainer = Trainer(


In [None]:
# ------------------------
# 6. Train the Model
# ------------------------
trainer.train()

# ------------------------
# 7. Evaluate on Test Set
# ------------------------
metrics = trainer.evaluate(tokenized_datasets["test"])
print("Test Evaluation Metrics:", metrics)

# ------------------------
# 8. Test Code Translation
# ------------------------
input_code = "def add(a, b):\n    return a + b"
inputs = tokenizer(input_code, return_tensors="pt", padding=True, truncation=True)
outputs = model.generate(**inputs, max_length=256)
print("Generated Java Code:\n", tokenizer.decode(outputs[0], skip_special_tokens=True))

[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33mqhoang[0m ([33mqhoang-william-mary[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Epoch,Training Loss,Validation Loss
