In [None]:
!pip install javalang networkx matplotlib pydot graphviz transformers torch
!apt-get install -y graphviz

Collecting javalang
  Downloading javalang-0.13.0-py3-none-any.whl.metadata (805 bytes)
Downloading javalang-0.13.0-py3-none-any.whl (22 kB)
Installing collected packages: javalang
Successfully installed javalang-0.13.0
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
graphviz is already the newest version (2.42.2-6ubuntu0.1).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [None]:
import os
import javalang
import networkx as nx
import matplotlib.pyplot as plt
from graphviz import Digraph
from transformers import RobertaTokenizer, RobertaModel
import torch
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


In [None]:
from google.colab import files
uploaded = files.upload()

import zipfile
with zipfile.ZipFile("Promisedataset-main.zip", "r") as zip_ref:
    zip_ref.extractall("Promisedataset")


Saving Promisedataset-main.zip to Promisedataset-main.zip


In [None]:
import os

# Check the current working directory
print("Current working directory:", os.getcwd())

# List the contents of the 'Promisedataset' directory
extracted_dir = 'Promisedataset'
for root, dirs, files in os.walk(extracted_dir):
    for name in dirs:
        print("Directory:", os.path.join(root, name))
    for name in files:
        print("File:", os.path.join(root, name))

Current working directory: /content
Directory: Promisedataset/Promisedataset-main
File: Promisedataset/Zoom.lnk
File: Promisedataset/Promisedataset-main/ant.rar
File: Promisedataset/Promisedataset-main/Jedit.rar
File: Promisedataset/Promisedataset-main/synapse.rar
File: Promisedataset/Promisedataset-main/poi 3.13 src.rar
File: Promisedataset/Promisedataset-main/poi-1.5.0 src.rar
File: Promisedataset/Promisedataset-main/xalan.rar
File: Promisedataset/Promisedataset-main/lucene.rar
File: Promisedataset/Promisedataset-main/log4j.rar


In [None]:
# Unrar if necessary
!apt-get install unrar

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
unrar is already the newest version (1:6.1.5-1).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [None]:
import os
import subprocess

def extract_rar_files(directory):
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".rar"):
                file_path = os.path.join(root, file)
                output_dir = os.path.join(root, os.path.splitext(file)[0])
                if not os.path.exists(output_dir):
                    os.makedirs(output_dir)
                subprocess.run(['unrar', 'x', file_path, output_dir])

extract_rar_files('Promisedataset/Promisedataset-main')

In [None]:
import os

# Dictionary to store project-wise Java files
project_files = {}

def list_java_files(directory):
    for root, dirs, files in os.walk(directory):
        # Filter to include only ant-1.5, ant-1.6, ant-1.7 directories
        if 'ant-1.5' in root or 'ant-1.6' in root or 'ant-1.7' in root:
            project_name = os.path.basename(root)
            if project_name not in project_files:
                project_files[project_name] = []
            for file in files:
                if file.endswith(".java"):
                    project_files[project_name].append(os.path.join(root, file))

# Update the directory path to your Promisedataset directory
list_java_files('Promisedataset/Promisedataset-main')

# Display the project-wise Java files for only the targeted versions
for project, files in project_files.items():
    if files:  # Only display projects with Java files
        print(f"Project: {project}")
        for file in files:
            print(f"  File: {file}")
        print(f"Total Java Files in {project}: {len(files)}\n")


Project: src
  File: Promisedataset/Promisedataset-main/ant/ant/ant-1.6/apache-ant-1.6.0/src/etc/testcases/core/loaderref/src/Task1.java
  File: Promisedataset/Promisedataset-main/ant/ant/ant-1.7/apache-ant-1.7.0/src/tests/antunit/taskdefs/optional/junit/src/ExampleTest.java
  File: Promisedataset/Promisedataset-main/ant/ant/ant-1.7/apache-ant-1.7.0/src/etc/testcases/core/loaderref/src/Task1.java
  File: Promisedataset/Promisedataset-main/ant/ant/ant-1.7/apache-ant-1.7.0/src/etc/testcases/taskdefs/rmic/src/AntTimestamp.java
  File: Promisedataset/Promisedataset-main/ant/ant/ant-1.7/apache-ant-1.7.0/src/etc/testcases/taskdefs/rmic/src/RemoteTimestampImpl.java
  File: Promisedataset/Promisedataset-main/ant/ant/ant-1.7/apache-ant-1.7.0/src/etc/testcases/taskdefs/rmic/src/RemoteTimestamp.java
Total Java Files in src: 6

Project: tar
  File: Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/main/org/apache/tools/tar/TarUtils.java
  File: Promisedataset/Promisedataset-m

In [None]:
import os
import glob

# Define the paths for train, validation, and test sets
train_path = "Promisedataset/Promisedataset-main/ant/ant/ant-1.5/"
validation_path = "Promisedataset/Promisedataset-main/ant/ant/ant-1.6/"
test_path = "Promisedataset/Promisedataset-main/ant/ant/ant-1.7/"

# Function to get Java file paths
def get_java_files(directory):
    return glob.glob(os.path.join(directory, "**/*.java"), recursive=True)

# Load Java files into datasets
train_files = get_java_files(train_path)
validation_files = get_java_files(validation_path)
test_files = get_java_files(test_path)

print(f"Loaded {len(train_files)} training files.")
print(f"Loaded {len(validation_files)} validation files.")
print(f"Loaded {len(test_files)} test files.")


Loaded 727 training files.
Loaded 906 validation files.
Loaded 1891 test files.


### **CFG Analysis**


Explanation of the Code
CFG Generation (generate_cfg):

Parses the Java source code using javalang.
Constructs a control-flow graph (CFG) using networkx.
Embedding Generation (generate_cfg_embeddings):

Uses the pre-trained CodeBERT model to create embeddings for each node in the CFG.
Each node's label is tokenized, passed through the model, and aggregated.
Aggregate CFG Embeddings (aggregate_vectors_for_cfg):

Computes the mean embedding across all CFG nodes to represent the file.
Saving Embeddings:

Embeddings for all test files are stored in a NumPy array (cfg_vectors_np) and saved as cfg_test_set_vectors.npy.

In [None]:
# Initialize tokenizer and model
tokenizer = RobertaTokenizer.from_pretrained("microsoft/codebert-base")
model = RobertaModel.from_pretrained("microsoft/codebert-base")
model.eval()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/498 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

RobertaModel(
  (embeddings): RobertaEmbeddings(
    (word_embeddings): Embedding(50265, 768, padding_idx=1)
    (position_embeddings): Embedding(514, 768, padding_idx=1)
    (token_type_embeddings): Embedding(1, 768)
    (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (encoder): RobertaEncoder(
    (layer): ModuleList(
      (0-11): 12 x RobertaLayer(
        (attention): RobertaAttention(
          (self): RobertaSdpaSelfAttention(
            (query): Linear(in_features=768, out_features=768, bias=True)
            (key): Linear(in_features=768, out_features=768, bias=True)
            (value): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (output): RobertaSelfOutput(
            (dense): Linear(in_features=768, out_features=768, bias=True)
            (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
            (dr

In [None]:
def generate_simulated_cfg(ast):
    cfg = nx.DiGraph()
    node_id_counter = [0]

    def add_nodes_edges(node, parent_id=None):
        if not isinstance(node, javalang.ast.Node):
            return

        current_id = node_id_counter[0]
        node_id_counter[0] += 1

        label = node.__class__.__name__
        cfg.add_node(current_id, label=label)

        if parent_id is not None:
            cfg.add_edge(parent_id, current_id)

        for child in node.children:
            if isinstance(child, list):
                for sub_child in child:
                    add_nodes_edges(sub_child, current_id)
            elif isinstance(child, javalang.ast.Node):
                add_nodes_edges(child, current_id)

    add_nodes_edges(ast)
    return cfg

def extract_paths_from_cfg(cfg):
    paths = []
    for start_node in cfg.nodes():
        for end_node in cfg.nodes():
            if start_node != end_node:
                try:
                    all_paths = list(nx.all_simple_paths(cfg, source=start_node, target=end_node, cutoff=10))
                    paths.extend(all_paths)
                except nx.NetworkXNoPath:
                    continue
    return paths

def path_to_sequence(cfg, path):
    path_tokens = [cfg.nodes[node]['label'] for node in path]
    sequence = ' '.join(path_tokens)
    return sequence

def generate_embeddings_for_paths(paths, cfg):
    embeddings = []
    for path in paths:
        sequence = path_to_sequence(cfg, path)
        if not sequence.strip():
            continue
        try:
            inputs = tokenizer(sequence, return_tensors='pt', truncation=True, max_length=512)
            with torch.no_grad():
                outputs = model(**inputs)
            embedding = outputs.last_hidden_state.mean(dim=1).squeeze()
            embeddings.append(embedding)
        except Exception as e:
            print(f"Error generating embedding for sequence: {sequence}\nError: {e}")
            continue
    return embeddings

def aggregate_embeddings(embeddings):
    if embeddings:
        aggregated_embedding = torch.stack(embeddings).mean(dim=0)
        return aggregated_embedding
    else:
        return None


In [None]:
import numpy as np

def process_java_files(file_paths, df, filename_column):
    embeddings = {}
    processed_files = []

    for file_path in file_paths:
        print(f"Processing {file_path}")
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                code = file.read()
            tokens = list(javalang.tokenizer.tokenize(code))
            parser = javalang.parser.Parser(tokens)
            ast = parser.parse()
        except Exception as e:
            print(f"Error parsing {file_path}: {e}")
            continue

        cfg = generate_simulated_cfg(ast)
        if cfg.number_of_nodes() == 0:
            print(f"CFG has no nodes for {file_path}")
            continue

        paths = extract_paths_from_cfg(cfg)
        if not paths:
            print(f"No paths extracted from CFG for {file_path}")
            continue

        path_embeddings = generate_embeddings_for_paths(paths, cfg)
        file_embedding = aggregate_embeddings(path_embeddings)

        if file_embedding is not None:
            embeddings[file_path] = file_embedding.numpy()
            processed_files.append(file_path)
            print(f"Generated embedding for {file_path}")
        else:
            print(f"No embeddings generated for {file_path}")
            continue

    # Convert embeddings to NumPy array
    embedding_array = np.array(list(embeddings.values()))
    return embedding_array, processed_files, df


In [None]:
def get_defect_labels(processed_files, df, filename_column):
    labels = []
    for file_path in processed_files:
        file_name = os.path.basename(file_path)
        file_name_no_ext = os.path.splitext(file_name)[0]
        label_row = df[df[filename_column].str.contains(file_name_no_ext, na=False, regex=False)]
        if not label_row.empty:
            bug_value = label_row['bug'].values[0]
            if pd.isna(bug_value):  # Check if the value is NaN
                labels.append(0)  # Default to 0 if NaN
            else:
                labels.append(int(bug_value))
        else:
            labels.append(0)  # Default label if no match is found
    return labels


In [None]:
# Upload the CSV files for ANT versions 1.5, 1.6, and 1.7
from google.colab import files
uploaded = files.upload()


Saving ant_1.7.csv to ant_1.7.csv
Saving ant-1.5.csv to ant-1.5.csv
Saving ant-1.6-Unified.csv to ant-1.6-Unified.csv


In [None]:
import pandas as pd


# Load the CSV files into DataFrames
ant_15_df = pd.read_csv('ant-1.5.csv')
ant_16_df = pd.read_csv('ant-1.6-Unified.csv')
ant_17_df = pd.read_csv('ant_1.7.csv')

# Filter relevant columns
ant_15_relevant = ant_15_df[['Name', 'bug']]
ant_16_relevant = ant_16_df[['Name', 'bug']]
ant_17_relevant = ant_17_df[['name', 'bug']]


In [None]:
import javalang

# Function to tokenize the Java source code and check if valid for further processing
def tokenize_source_code(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            code = file.read()
        tokens = list(javalang.tokenizer.tokenize(code))
        return tokens, True
    except Exception as e:
        print(f"Error tokenizing {file_path}: {e}")
        return [], False

# Function to select and categorize Java files
def select_java_files(file_paths):
    simple_files = []
    # Removed moderate_files logic
    for file_path in file_paths:
        tokens, valid = tokenize_source_code(file_path)
        if valid:
            token_count = len(tokens)
            if token_count < 50:  # Only process files with fewer than 500 tokens
                simple_files.append(file_path)
            # Commented out the moderate file logic
            # elif 500 <= token_count <= max_token_count:
            #    moderate_files.append(file_path)
    # Only return simple files
    print(f"Selected {len(simple_files)} simple files out of {len(file_paths)} total files.")
    return simple_files


In [None]:
# Filter train, validation, and test files
train_selected_files = select_java_files(train_files)
test_selected_files = select_java_files(test_files)

X_train, train_processed_files, ant_15_relevant = process_java_files(train_selected_files, ant_15_relevant, 'Name')
X_test, test_processed_files, ant_17_relevant = process_java_files(test_selected_files, ant_17_relevant, 'name')



Error tokenizing Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/etc/testcases/taskdefs/fixcrlf/expected/Junk8.java: Could not process token at "", line 13: 
Error tokenizing Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/etc/testcases/taskdefs/fixcrlf/input/Junk9.java: Could not process token at "", line 13: 
Selected 64 simple files out of 727 total files.
Error tokenizing Promisedataset/Promisedataset-main/ant/ant/ant-1.7/apache-ant-1.7.0/src/etc/testcases/taskdefs/fixcrlf/expected/Junk8.java: Could not process token at "", line 13: 
Error tokenizing Promisedataset/Promisedataset-main/ant/ant/ant-1.7/apache-ant-1.7.0/src/etc/testcases/taskdefs/fixcrlf/input/Junk9.java: Could not process token at "", line 13: 
Selected 153 simple files out of 1891 total files.
Processing Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/main/org/apache/tools/mail/ErrorInQuitException.java
Generated embedding for Promisedataset

### **AST + CFG Analysis**

In [None]:
import numpy as np

# Function to generate AST from Java file
def generate_ast(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            code = file.read()
        tokens = list(javalang.tokenizer.tokenize(code))
        parser = javalang.parser.Parser(tokens)
        tree = parser.parse()
        return tree
    except Exception as e:
        print(f"Error generating AST for {file_path}: {e}")
        return None

# Function to get root-to-leaf paths in the AST
def root_to_leaf_paths(node):
    paths = []
    def traverse(current_node, current_path):
        current_path.append(current_node.__class__.__name__)
        children = list(filter(lambda x: isinstance(x, javalang.ast.Node), current_node.children))
        if not children:
            paths.append(list(current_path))
        for child in children:
            traverse(child, current_path)
        current_path.pop()
    traverse(node, [])
    return paths

# Function to generate embeddings for AST traversal paths
def generate_ast_embeddings(ast):
    paths = root_to_leaf_paths(ast)
    embeddings = []
    for path in paths:
        sequence = " ".join(path)
        if not sequence.strip():
            continue
        try:
            inputs = tokenizer(sequence, return_tensors="pt", truncation=True, max_length=512)
            with torch.no_grad():
                outputs = model(**inputs)
            embedding = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
            embeddings.append(embedding)
        except Exception as e:
            print(f"Error generating embedding for sequence {sequence}: {e}")
    return np.mean(embeddings, axis=0) if embeddings else None


In [None]:
def process_ast_files(file_paths, df, filename_column):
    embeddings = {}
    for file_path in file_paths:
        print(f"Processing AST for {file_path}")
        try:
            ast = generate_ast(file_path)
            if ast:
                embedding = generate_ast_embeddings(ast)
                if embedding is not None:
                    embeddings[file_path] = embedding
                    print(f"Generated AST embedding for {file_path}")
                else:
                    print(f"No AST embedding generated for {file_path}")
            else:
                print(f"Skipping {file_path} due to AST generation error.")
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            continue

    return embeddings


In [None]:
# Ensure that 'train_selected_files' and 'test_selected_files' are defined from your previous code
# And 'train_processed_files' and 'test_processed_files' are the files successfully processed for CFG

# Process AST embeddings for training set
X_ast_train, y_ast_train, ast_train_files = process_ast_files(train_processed_files, ant_15_relevant, 'Name')

# Process AST embeddings for test set
X_ast_test, y_ast_test, ast_test_files = process_ast_files(train_processed_files, ant_17_relevant, 'name')

print(f"AST Training set size: {X_ast_train.shape[0]} samples")
print(f"AST Test set size: {X_ast_test.shape[0]} samples")


Processing AST for Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/etc/testcases/taskdefs/optional/depend/src4/test/Outer.java
Generated AST embedding for Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/etc/testcases/taskdefs/optional/depend/src4/test/Outer.java
Processing AST for Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/etc/testcases/taskdefs/optional/depend/src5/B.java
Generated AST embedding for Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/etc/testcases/taskdefs/optional/depend/src5/B.java
Processing AST for Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/etc/testcases/taskdefs/optional/depend/src5/A.java
Generated AST embedding for Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/etc/testcases/taskdefs/optional/depend/src5/A.java
Processing AST for Promisedataset/Promisedataset-main/ant/ant/ant-1.5/apache-ant-1.5.2/src/etc/testcase

In [None]:
# Initialize the classifier
clf_ast = RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced')

# Train the classifier on AST embeddings
clf_ast.fit(X_ast_train, y_ast_train)

# Predict on AST test set
y_ast_test_pred = clf_ast.predict(X_ast_test)

# Evaluate
print("AST Test Set Evaluation:")
print(f"Accuracy: {accuracy_score(y_ast_test, y_ast_test_pred):.4f}")
print(f"Precision (weighted): {precision_score(y_ast_test, y_ast_test_pred, average='weighted', zero_division=0):.4f}")
print(f"Recall (weighted): {recall_score(y_ast_test, y_ast_test_pred, average='weighted', zero_division=0):.4f}")
print(f"F1 Score (weighted): {f1_score(y_ast_test, y_ast_test_pred, average='weighted', zero_division=0):.4f}")


AST Test Set Evaluation:
Accuracy: 0.9788
Precision (weighted): 0.9580
Recall (weighted): 0.9788
F1 Score (weighted): 0.9683


In [None]:
def get_defect_labels_for_files(file_paths, df, filename_column):
    labels = {}
    for file_path in file_paths:
        file_name = os.path.basename(file_path)
        file_name_no_ext = os.path.splitext(file_name)[0]
        label_row = df[df[filename_column].str.contains(file_name_no_ext, na=False, regex=False)]
        if not label_row.empty:
            bug_value = label_row['bug'].values[0]
            if pd.isna(bug_value):  # Check if the value is NaN
                labels[file_path] = 0  # Default to 0 if NaN
            else:
                labels[file_path] = int(bug_value)
        else:
            labels[file_path] = 0  # Default label if no match is found
    return labels


In [None]:
# Check if the number of samples is the same
if X_train.shape[0] == X_ast_train.shape[0]:
    print("Number of training samples in X_train and X_ast_train is the same.")
else:
    print("Number of training samples in X_train and X_ast_train is different.")

# Check if the labels are the same
if np.array_equal(y_train, y_ast_train):
    print("Training labels are the same for CFG and AST embeddings.")
else:
    print("Warning: Training labels differ between CFG and AST embeddings.")

# Similarly for the test set
if X_test.shape[0] == X_ast_test.shape[0]:
    print("Number of test samples in X_test and X_ast_test is the same.")
else:
    print("Number of test samples in X_test and X_ast_test is different.")

if np.array_equal(y_test, y_ast_test):
    print("Test labels are the same for CFG and AST embeddings.")
else:
    print("Warning: Test labels differ between CFG and AST embeddings.")


Number of training samples in X_train and X_ast_train is the same.
Training labels are the same for CFG and AST embeddings.
Number of test samples in X_test and X_ast_test is the same.
Test labels are the same for CFG and AST embeddings.


In [None]:
# Combine embeddings for the training set
X_train_combined = np.concatenate((X_train, X_ast_train), axis=1)
y_train_combined = y_train  # Labels are the same

# Combine embeddings for the test set
X_test_combined = np.concatenate((X_test, X_ast_test), axis=1)
y_test_combined = y_test  # Labels are the same


In [None]:
# Initialize the classifier
clf_combined = RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced')

# Train the classifier
clf_combined.fit(X_train_combined, y_train_combined)


In [None]:
# Predict on the test set
y_combined_test_pred = clf_combined.predict(X_test_combined)

# Evaluate
print("Combined Embeddings Test Set Evaluation:")
print(f"Accuracy: {accuracy_score(y_test_combined, y_combined_test_pred):.4f}")
print(f"Precision (weighted): {precision_score(y_test_combined, y_combined_test_pred, average='weighted', zero_division=0):.4f}")
print(f"Recall (weighted): {recall_score(y_test_combined, y_combined_test_pred, average='weighted', zero_division=0):.4f}")
print(f"F1 Score (weighted): {f1_score(y_test_combined, y_combined_test_pred, average='weighted', zero_division=0):.4f}")


Combined Embeddings Test Set Evaluation:
Accuracy: 0.9752
Precision (weighted): 0.9579
Recall (weighted): 0.9752
F1 Score (weighted): 0.9665
