In [1]:
import os
import pickle
from tqdm import tqdm, trange
from dataclasses import dataclass
import pandas as pd
import json
from dataclasses import dataclass
from typing import Optional
from drl_patches.sparse_autoencoders.get_vectorizer import load_tfidf_vectorizer
from drl_patches.sparse_autoencoders.getting_experiment_config import (
    load_training_indexes,
)
import torch
from drl_patches.logger import logger
from drl_patches.sparse_autoencoders.classical_data_mining import get_metrics
from drl_patches.sparse_autoencoders.utils import read_jsonl_file, set_seed
from drl_patches.sparse_autoencoders.vulnerability_detection_features import (
    get_diff_data, get_most_important_features, get_vuln_safe_data
)
set_seed(42)



[2m2025-04-21 23:56:09[0m [[32m[1minfo     [0m] [1mGetting device.               [0m [36mdevice[0m=[35mcpu[0m
[2m2025-04-21 23:56:09[0m [[32m[1minfo     [0m] [1mDevice                        [0m [36mdevice[0m=[35mcpu[0m
[2m2025-04-21 23:56:09[0m [[32m[1minfo     [0m] [1mGetting device.               [0m [36mdevice[0m=[35mcpu[0m


In [2]:
@dataclass
class Dataset:
    dataset_path: str
    training_idx_path: str

GBUG_DATASET = Dataset(
    dataset_path="../artifacts/gbug-java.csv",
    training_idx_path="../artifacts/gbug-java_train_indexes.json",
)

DEFECT_DATASET = Dataset(
    dataset_path="../artifacts/defects4j.csv",
    training_idx_path="../artifacts/defects4j_train_indexes.json",
)

HUMAN_DATASET = Dataset(
    dataset_path="../artifacts/humaneval.csv",
    training_idx_path="../artifacts/humaneval_train_indexes.json",
)

In [3]:

def get_testing_dataset(dataset: Dataset):
    """
    Get the testing dataset path and the training indexes path.
    """
    df = pd.read_csv(dataset.dataset_path)
    with open( dataset.training_idx_path, "r") as f:
        training_indices = json.load(f)
    train_df = df.loc[training_indices]
    test_df = df.drop(train_df.index)

    return train_df, test_df



In [4]:
@dataclass
class SAE_ACTIVATIONS:
    feature_diff_path: str
    model_path: str
    layer: int
    top_k: int
    dataset: Dataset
    gbug_feature_safe_path: str
    gbug_feature_vuln_path: str
    defects_feature_safe_path: str
    defects_feature_vuln_path: str
    humaneval_feature_safe_path: str
    humaneval_feature_vuln_path: str

SAE_ACTIVATIONS_GBUG_GPT2 = SAE_ACTIVATIONS(
    feature_diff_path="../gpt2_gbug-java/layer2/feature_importance_diff.jsonl",
    gbug_feature_safe_path="../gpt2_gbug-java/layer2/feature_importance_safe.jsonl",
    gbug_feature_vuln_path="../gpt2_gbug-java/layer2/feature_importance_vuln.jsonl",
    defects_feature_safe_path="../gpt2_defects4j/layer2/feature_importance_safe.jsonl",
    defects_feature_vuln_path="../gpt2_defects4j/layer2/feature_importance_vuln.jsonl",
    humaneval_feature_safe_path="../gpt2_humaneval/layer2/feature_importance_safe.jsonl",
    humaneval_feature_vuln_path="../gpt2_humaneval/layer2/feature_importance_vuln.jsonl",
    model_path = "../gpt2_gbug-java/layer2/random_forest_k_25.pt",
    layer = 2,
    top_k = 25,
    dataset = GBUG_DATASET,
)
OUR_CONFIG = SAE_ACTIVATIONS_GBUG_GPT2

In [5]:
with open(OUR_CONFIG.dataset.training_idx_path, "r") as f:
    training_indices = json.load(f)

train_df_diff = get_diff_data(
    SAE_ACTIVATIONS_GBUG_GPT2.feature_diff_path,
)
top_k = get_most_important_features(train_df_diff, n=OUR_CONFIG.top_k)

100%|██████████| 24576/24576 [00:31<00:00, 777.73it/s] 


In [6]:
with open(OUR_CONFIG.model_path, "rb") as f:
    clf = pickle.load(f)

# Original (GBUG)
_, df_test = get_vuln_safe_data(
    OUR_CONFIG.gbug_feature_vuln_path,
    OUR_CONFIG.gbug_feature_safe_path,
    training_indices,
)

100%|██████████| 24576/24576 [01:31<00:00, 269.20it/s]


In [7]:
def evaluate_clf(clf, test_df):
    """
    Evaluate the model on the test dataset.
    """

    X_test = test_df[top_k.tolist()]
    X_test = torch.tensor(X_test.values, dtype=torch.float32)

    y_test = test_df["vuln"]

    y_pred = clf.predict(X_test)
    tp = sum((y_pred == 1) & (y_test == 1))
    fp = sum((y_pred == 1) & (y_test == 0))
    tn = sum((y_pred == 0) & (y_test == 0))
    fn = sum((y_pred == 0) & (y_test == 1))


    # Calculate the accuracy
    accuracy = (tp + tn) / (tp + fp + fn + tn)
    # Calculate the precision
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    # Calculate the recall
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    # Calculate the F1 score
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "tp": tp,
        "fp": fp,
        "fn": fn,
        "tn": tn
    }

In [8]:
results = evaluate_clf(clf, df_test)
results

{'accuracy': 0.7,
 'precision': 0.6764705882352942,
 'recall': 0.7666666666666667,
 'f1': 0.71875,
 'tp': 23,
 'fp': 11,
 'fn': 7,
 'tn': 19}

In [9]:
# print clf details
print("Model details:")
print(clf)
print("Model parameters:")
print(clf.get_params())

Model details:
GridSearchCV(cv=5, estimator=RandomForestClassifier(), n_jobs=2,
             param_grid={'max_depth': [None, 50], 'max_features': ['sqrt'],
                         'min_samples_leaf': [1, 2],
                         'min_samples_split': [2, 5],
                         'n_estimators': [100, 300]},
             verbose=2)
Model parameters:
{'cv': 5, 'error_score': nan, 'estimator__bootstrap': True, 'estimator__ccp_alpha': 0.0, 'estimator__class_weight': None, 'estimator__criterion': 'gini', 'estimator__max_depth': None, 'estimator__max_features': 'sqrt', 'estimator__max_leaf_nodes': None, 'estimator__max_samples': None, 'estimator__min_impurity_decrease': 0.0, 'estimator__min_samples_leaf': 1, 'estimator__min_samples_split': 2, 'estimator__min_weight_fraction_leaf': 0.0, 'estimator__monotonic_cst': None, 'estimator__n_estimators': 100, 'estimator__n_jobs': None, 'estimator__oob_score': False, 'estimator__random_state': None, 'estimator__verbose': 0, 'estimator__warm_st

In [None]:
# Defects4J
_, df_test = get_vuln_safe_data(
    OUR_CONFIG.defects_feature_vuln_path,
    OUR_CONFIG.defects_feature_safe_path,
    training_indices,
)
# filter columns based on top_k
df_test_filtered = df_test[
    top_k.tolist() + ["vuln"]
]

results = evaluate_clf(clf, df_test_filtered)
results

 39%|███▉      | 9602/24576 [00:14<00:43, 343.32it/s] 

In [None]:
# Defects4J
_, df_test = get_vuln_safe_data(
    OUR_CONFIG.humaneval_feature_vuln_path,
    OUR_CONFIG.humaneval_feature_safe_path,
    training_indices,
)
# filter columns based on top_k
df_test_filtered = df_test[
    top_k.tolist() + ["vuln"]
]

results = evaluate_clf(clf, df_test_filtered)
results

# Baselines Transferability

In [None]:
@dataclass
class BaselineClassifier:
    path: str
    tfidf_vectorizer_path: str
    base_dataset: Dataset
    input_size: int

DEFECTS4J_KNN_BASELINE = BaselineClassifier(
    path="../ole/defects4j_knn_k_5000.pt",
    tfidf_vectorizer_path = "../artifacts/vectorizer.pkl",
    base_dataset=DEFECT_DATASET,
    input_size=5000,
)
GBUG_KNN_BASELINE = BaselineClassifier(
    path="../ole/gbug_knn_k_5000.pt",
    tfidf_vectorizer_path = "../artifacts/vectorizer.pkl",
    base_dataset=GBUG_DATASET,
    input_size=5000,
)
HUMANEVAL_KNN_BASELINE = BaselineClassifier(
    path="../ole/human_knn_k_5000.pt",
    tfidf_vectorizer_path = "../artifacts/vectorizer.pkl",
    base_dataset=HUMAN_DATASET,
    input_size=5000,
)
DEFECTS4J_RF_BASELINE = BaselineClassifier(
    path="../ole/defects4j_random_forest_k_5000.pt",
    tfidf_vectorizer_path = "../artifacts/vectorizer.pkl",
    base_dataset=DEFECT_DATASET,
    input_size=5000,
)
GBUG_RF_BASELINE = BaselineClassifier(
    path="../ole/gbug_random_forest_k_5000.pt",
    tfidf_vectorizer_path = "../artifacts/vectorizer.pkl",
    base_dataset=GBUG_DATASET,
    input_size=5000,
)
HUMANEVAL_RF_BASELINE = BaselineClassifier(
    path="../ole/human_random_forest_k_5000.pt",
    tfidf_vectorizer_path = "../artifacts/vectorizer.pkl",
    base_dataset=HUMAN_DATASET,
    input_size=5000,
)

def load_baseline_classifier(classifier: BaselineClassifier):
    with open(classifier.path, "rb") as f:
        clf = pickle.load(f)
    return clf
@dataclass
class Results:
    precision: float
    recall: float
    accuracy: float
    f1: float


@dataclass
class TransferabilityPerformance:
    on_defects: Optional[int] = None
    on_humaneval: Optional[int] = None
    on_gbug: Optional[int] = None

def calculate_f1_shift(
    results: Results,
    baseline_results: Results,
) -> dict:
    """
    Calculate the shift in perfoamcene in terms of F1 score % changed
    """
    f1_shift = (results.f1 - baseline_results.f1) / baseline_results.f1 * 100
    
    return f1_shift

def test_baseline_classifier(clf, 
                             vectorizer, 
                             df, 
                             train_indexes,
                             before_func_col="func_before", 
                             after_func_col="func_after") -> Results:
    """
    Test the baseline classifier on the test dataset.
    """

    df["tokenized_before"] = df[before_func_col].progress_apply(
        lambda x: vectorizer.transform([x]).toarray()[0]
    )
    df["tokenized_after"] = df[after_func_col].progress_apply(
        lambda x: vectorizer.transform([x]).toarray()[0]
    )
    # Pad to 5000 tokens
    df["tokenized_before"] = df["tokenized_before"].apply(
        lambda x: x[:5000] + [0] * (5000 - len(x)) if len(x) < 5000 else x[:5000]
    )
    df["tokenized_after"] = df["tokenized_after"].apply(
        lambda x: x[:5000] + [0] * (5000 - len(x)) if len(x) < 5000 else x[:5000]
    )

    df_test = df.drop(train_indexes)

    df_classical_test = pd.DataFrame()
    for row in df_test.iterrows():
        row = row[1]
        df_classical_test = pd.concat(
            [
                df_classical_test,
                pd.DataFrame(
                    {"tokens": [row["tokenized_before"].tolist()], "vuln": 1}, index=[0]
                ),
            ]
        )
        df_classical_test = pd.concat(
            [
                df_classical_test,
                pd.DataFrame(
                    {"tokens": [row["tokenized_after"].tolist()], "vuln": 0}, index=[0]
                ),
            ]
        )

    X_test = df_classical_test["tokens"].values.tolist()
    X_test = [torch.tensor(x) for x in X_test]
    y_test = df_classical_test["vuln"]

    # Get the prediction
    y_pred = clf.predict(X_test)
    precision, recall, accuracy, f1 = get_metrics(y_pred, y_test)
    logger.info(
        "Classification report:",
        precision=precision,
        recall=recall,
        accuracy=accuracy,
        f1=f1,
    )
    return Results(
        precision=precision,
        recall=recall,
        accuracy=accuracy,
        f1=f1,
    )
    



In [None]:

results_gbug_knn = test_baseline_classifier(
    load_baseline_classifier(GBUG_KNN_BASELINE),
    load_tfidf_vectorizer(GBUG_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(GBUG_DATASET.dataset_path),
    load_training_indexes(GBUG_DATASET.training_idx_path),
)

results_gbug_knn_defects = test_baseline_classifier(
    load_baseline_classifier(GBUG_KNN_BASELINE),
    load_tfidf_vectorizer(GBUG_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(DEFECT_DATASET.dataset_path),
    load_training_indexes(DEFECT_DATASET.training_idx_path),
)

results_gbug_knn_human = test_baseline_classifier(
    load_baseline_classifier(GBUG_KNN_BASELINE),
    load_tfidf_vectorizer(GBUG_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(HUMAN_DATASET.dataset_path),
    load_training_indexes(HUMAN_DATASET.training_idx_path),
)


transf_knn_gbug_on_defects= calculate_f1_shift(
    results_gbug_knn_defects,
    results_gbug_knn,
)
transf_knn_gbug_on_human= calculate_f1_shift(
    results_gbug_knn_human,
    results_gbug_knn,
)

transf_knn_gbug = TransferabilityPerformance(
    on_defects=transf_knn_gbug_on_defects,
    on_humaneval=transf_knn_gbug_on_human,
)

In [None]:
### Random Forest from Gbug
results_gbug_rf = test_baseline_classifier(
    load_baseline_classifier(GBUG_RF_BASELINE),
    load_tfidf_vectorizer(GBUG_RF_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(GBUG_DATASET.dataset_path),
    load_training_indexes(GBUG_DATASET.training_idx_path),
)

results_gbug_rf_defects = test_baseline_classifier(
    load_baseline_classifier(GBUG_RF_BASELINE),
    load_tfidf_vectorizer(GBUG_RF_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(DEFECT_DATASET.dataset_path),
    load_training_indexes(DEFECT_DATASET.training_idx_path),
)

results_gbug_rf_human = test_baseline_classifier(
    load_baseline_classifier(GBUG_KNN_BASELINE),
    load_tfidf_vectorizer(GBUG_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(HUMAN_DATASET.dataset_path),
    load_training_indexes(HUMAN_DATASET.training_idx_path),
)

transf_rf_gbug_on_defects= calculate_f1_shift(
    results_gbug_rf_defects,
    results_gbug_rf,
)
transf_rf_gbug_on_human= calculate_f1_shift(
    results_gbug_rf_human,
    results_gbug_rf,
)

transf_rf_gbug = TransferabilityPerformance(
    on_defects=transf_rf_gbug_on_defects,
    on_humaneval=transf_rf_gbug_on_human,
)

In [None]:
results_defects_knn = test_baseline_classifier(
    load_baseline_classifier(DEFECTS4J_KNN_BASELINE),
    load_tfidf_vectorizer(DEFECTS4J_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(DEFECT_DATASET.dataset_path),
    load_training_indexes(DEFECT_DATASET.training_idx_path),
)
results_defects_knn_gbug = test_baseline_classifier(
    load_baseline_classifier(DEFECTS4J_KNN_BASELINE),
    load_tfidf_vectorizer(DEFECTS4J_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(GBUG_DATASET.dataset_path),
    load_training_indexes(GBUG_DATASET.training_idx_path),
)
results_defects_knn_human = test_baseline_classifier(
    load_baseline_classifier(DEFECTS4J_KNN_BASELINE),
    load_tfidf_vectorizer(DEFECTS4J_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(HUMAN_DATASET.dataset_path),
    load_training_indexes(HUMAN_DATASET.training_idx_path),
)
transf_knn_defects_on_gbug= calculate_f1_shift(
    results_defects_knn_gbug,
    results_defects_knn,
)
transf_knn_defects_on_human= calculate_f1_shift(
    results_defects_knn_human,
    results_defects_knn,
)

transf_knn_defects = TransferabilityPerformance(
    on_gbug=transf_knn_defects_on_gbug,
    on_humaneval=transf_knn_defects_on_human,
)


In [None]:
results_defects_rf = test_baseline_classifier(
    load_baseline_classifier(DEFECTS4J_RF_BASELINE),
    load_tfidf_vectorizer(DEFECTS4J_RF_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(DEFECT_DATASET.dataset_path),
    load_training_indexes(DEFECT_DATASET.training_idx_path),
)
results_defects_rf_gbug = test_baseline_classifier(
    load_baseline_classifier(DEFECTS4J_RF_BASELINE),
    load_tfidf_vectorizer(DEFECTS4J_RF_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(GBUG_DATASET.dataset_path),
    load_training_indexes(GBUG_DATASET.training_idx_path),
)
results_defects_rf_human = test_baseline_classifier(
    load_baseline_classifier(DEFECTS4J_RF_BASELINE),
    load_tfidf_vectorizer(DEFECTS4J_RF_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(HUMAN_DATASET.dataset_path),
    load_training_indexes(HUMAN_DATASET.training_idx_path),
)
transf_rf_defects_on_gbug= calculate_f1_shift(
    results_defects_rf_gbug,
    results_defects_rf,
)
transf_rf_defects_on_human= calculate_f1_shift(
    results_defects_rf_human,
    results_defects_rf,
)

transf_rf_defects = TransferabilityPerformance(
    on_gbug=transf_rf_defects_on_gbug,
    on_humaneval=transf_rf_defects_on_human,
)

In [None]:
results_humaneval_knn = test_baseline_classifier(
    load_baseline_classifier(HUMANEVAL_KNN_BASELINE),
    load_tfidf_vectorizer(HUMANEVAL_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(HUMAN_DATASET.dataset_path),
    load_training_indexes(HUMAN_DATASET.training_idx_path),
)
results_humaneval_knn_gbug = test_baseline_classifier(
    load_baseline_classifier(HUMANEVAL_KNN_BASELINE),
    load_tfidf_vectorizer(HUMANEVAL_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(GBUG_DATASET.dataset_path),
    load_training_indexes(GBUG_DATASET.training_idx_path),
)
results_humaneval_knn_defects = test_baseline_classifier(
    load_baseline_classifier(HUMANEVAL_KNN_BASELINE),
    load_tfidf_vectorizer(HUMANEVAL_KNN_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(DEFECT_DATASET.dataset_path),
    load_training_indexes(DEFECT_DATASET.training_idx_path),
)
transf_knn_humaneval_on_gbug= calculate_f1_shift(
    results_humaneval_knn_gbug,
    results_humaneval_knn,
)
transf_knn_humaneval_on_defects= calculate_f1_shift(
    results_humaneval_knn_defects,
    results_humaneval_knn,
)

transf_knn_humaneval = TransferabilityPerformance(
    on_defects=transf_knn_humaneval_on_defects,
    on_gbug=transf_knn_humaneval_on_gbug,
)


In [None]:
results_humaneval_rf = test_baseline_classifier(
    load_baseline_classifier(HUMANEVAL_RF_BASELINE),
    load_tfidf_vectorizer(HUMANEVAL_RF_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(HUMAN_DATASET.dataset_path),
    load_training_indexes(HUMAN_DATASET.training_idx_path),
)
results_humaneval_rf_gbug = test_baseline_classifier(
    load_baseline_classifier(HUMANEVAL_RF_BASELINE),
    load_tfidf_vectorizer(HUMANEVAL_RF_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(GBUG_DATASET.dataset_path),
    load_training_indexes(GBUG_DATASET.training_idx_path),
)
results_humaneval_rf_defects = test_baseline_classifier(
    load_baseline_classifier(HUMANEVAL_RF_BASELINE),
    load_tfidf_vectorizer(HUMANEVAL_RF_BASELINE.tfidf_vectorizer_path),
    pd.read_csv(DEFECT_DATASET.dataset_path),
    load_training_indexes(DEFECT_DATASET.training_idx_path),
)
transf_rf_humaneval_on_gbug= calculate_f1_shift(
    results_humaneval_rf_gbug,
    results_humaneval_rf,
)
transf_rf_humaneval_on_defects= calculate_f1_shift(
    results_humaneval_rf_defects,
    results_humaneval_rf,
)
transf_rf_humaneval = TransferabilityPerformance(
    on_defects=transf_rf_humaneval_on_defects,
    on_gbug=transf_rf_humaneval_on_gbug,
)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

sns.set(style="whitegrid")

# Labels and values
labels = [
    "Gbug → Defects4J", "Gbug → HumanEval",
    "Defects4J → Gbug", "Defects4J → HumanEval",
    "HumanEval → Gbug", "HumanEval → Defects4J",
]

# Split values for KNN and RF
values_knn = [
    transf_knn_gbug.on_defects, transf_knn_gbug.on_humaneval,
    transf_knn_defects.on_gbug, transf_knn_defects.on_humaneval,
    transf_knn_humaneval.on_gbug, transf_knn_humaneval.on_defects,
]

values_rf = [
    transf_rf_gbug.on_defects, transf_rf_gbug.on_humaneval,
    transf_rf_defects.on_gbug, transf_rf_defects.on_humaneval,
    transf_rf_humaneval.on_gbug, transf_rf_humaneval.on_defects,
]

x = np.arange(len(labels))  # label locations
width = 0.4  # width of the bars

fig, ax = plt.subplots(figsize=(12, 7))
bars1 = ax.barh(x - width/2, values_knn, width, label='KNN', edgecolor='black')
bars2 = ax.barh(x + width/2, values_rf, width, label='Random Forest', edgecolor='black')

# Aesthetics
ax.set_title("Transferability Performance of KNN and RF Classifiers", fontsize=16, pad=15)
ax.set_xlabel("F1 Shift (%)", fontsize=12)
ax.set_yticks(x)
ax.set_yticklabels(labels, fontsize=11)
ax.set_xlim(-100, 100)
ax.axvline(0, color='gray', linewidth=0.8, linestyle='--')
ax.legend(loc='lower right', frameon=True, fontsize=11)

# Annotate values on bars
for bars in [bars1, bars2]:
    for bar in bars:
        width = bar.get_width()
        ax.annotate(f'{width:.1f}',
                    xy=(width, bar.get_y() + bar.get_height() / 2),
                    xytext=(5 if width >= 0 else -45, 0),
                    textcoords="offset points",
                    ha='left' if width >= 0 else 'right',
                    va='center',
                    fontsize=9,
                    color='black')

plt.tight_layout()
plt.show()

# Save the figure
fig.savefig("transferability_performance_pretty.png", dpi=300)
