In [None]:
%pip install transformers datasets evaluate

In [None]:
!git clone https://github.com/Suresoft-GLaDOS/SBFL
%cd SBFL
%pip install -r requirements.txt
%pip install setuptools
!python setup.py install
%cd ../

In [3]:
import SBFL.sbfl.utils as sbfl_utils
import SBFL.sbfl.base as sbfl_base
import matplotlib.pyplot as plt
from itertools import product
from pprint import pprint
from pathlib import Path
import tensorflow as tf
from tqdm import tqdm
import seaborn as sns
import pandas as pd
import numpy as np
import transformers
import collections
import pickle
import random
import shutil
import json
import glob
import time
import re
import os
import gc

In [None]:
PATH_GOOGLE_DRIVE = '/content/drive/MyDrive/'
TOKENIZER         = transformers.AutoTokenizer.from_pretrained("microsoft/codebert-base")
CODEBERT_BASE     = transformers.TFRobertaModel.from_pretrained("microsoft/codebert-base")
MAX_LEN_TOKEN     = 1024
BATCH_SIZE        = 1
TEST_SIZE         = 0.15
USE_TPU           = False

In [None]:
if __name__ == "__main__":
    """
    X: coverage data
    y: test results
    """
    X = np.array([
        [1,0,1], # coverage of test t0
        [1,0,1], # coverage of test t1
        [1,1,1]  # coverage of test t2
    ], dtype=bool)

    y = np.array([
        0, # t0: PASS
        0, # t1: FAIL
        0  # t2: PASS
    ], dtype=bool)

    """
    Calculate the suspiciousness scores
    """
    sbfl = sbfl_base.SBFL(formula='Ochiai')
    print(sbfl.fit_predict(X, y))

In [None]:
with open(f'{PATH_GOOGLE_DRIVE}/dataset_dict.pickle', "rb") as f:
    dataset_dict = pickle.load(f)

# shuffle dataset_dict
dataset_dict = pd.DataFrame(dataset_dict).sample(frac=1).to_dict('list')


dataset_dict_x   = {key: dataset_dict[key] for key in dataset_dict if key in ['input_ids', 'attention_mask', 'attention_spectrum']}
dataset_dict_y   = {key: dataset_dict[key] for key in dataset_dict if key in ['start_token_idx', 'end_token_idx']}
dataset_spectrum = {key: dataset_dict[key] for key in dataset_dict if key in ['dct_weight_error_suspicious', 'lst_line_num_error']}


# Creating tf.data.Dataset after parse all data
tf_dataset = tf.data.Dataset.from_tensor_slices((dataset_dict_x, dataset_dict_y)).batch(8)
next(iter(tf_dataset))

In [None]:
def create_model(num_spectrum_formulas):
    input_ids = tf.keras.layers.Input(shape=(MAX_LEN_TOKEN,), name = 'input_ids', dtype=tf.int32)
    attention_mask = tf.keras.layers.Input(shape=(MAX_LEN_TOKEN,), name = 'attention_mask', dtype=tf.int32)
    attention_spectrum = tf.keras.layers.Input(shape=(MAX_LEN_TOKEN, num_spectrum_formulas), name = 'attention_spectrum', dtype=tf.float32)

    CODEBERT_BASE = transformers.TFRobertaModel.from_pretrained("microsoft/codebert-base")
    output_codebert_base = CODEBERT_BASE([input_ids, attention_mask]).last_hidden_state
    output_concat = tf.keras.layers.Lambda(lambda x: tf.concat([x[0], x[1]], -1))([output_codebert_base, attention_spectrum])

    output_concat = tf.keras.layers.Dense(1024, use_bias=False)(output_concat)
    output_concat = tf.keras.layers.Dense(512, use_bias=False)(output_concat)
    output_concat = tf.keras.layers.Dense(64, use_bias=False)(output_concat)
    output_concat = tf.keras.layers.Dense(16, use_bias=False)(output_concat)


    start_logits = tf.keras.layers.Dense(1, use_bias=False)(output_concat)
    start_logits = tf.keras.layers.Flatten()(start_logits)
    start_probs  = tf.keras.layers.Activation(tf.keras.activations.softmax, name="start_token_idx")(start_logits)


    end_logits = tf.keras.layers.Dense(1, use_bias=False)(output_concat)
    end_logits = tf.keras.layers.Flatten()(end_logits)
    end_probs  = tf.keras.layers.Activation(tf.keras.activations.softmax, name="end_token_idx")(end_logits)



    model = tf.keras.Model(
        inputs = [input_ids, attention_mask, attention_spectrum],
        outputs= [start_probs, end_probs],
    )
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
    optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)


    tf.keras.mixed_precision.set_global_policy("mixed_float16")
    model.compile(optimizer=optimizer, loss=[loss, loss])
    return model


model = create_model(num_spectrum_formulas = 2)
model.summary()

In [None]:
len_dataset = len(dataset_dict_x['input_ids'])
size_split = round(len_dataset * TEST_SIZE)
num_split = round(len_dataset / size_split)


NEW_TOKENIZER = transformers.AutoTokenizer.from_pretrained("mahdifar/codeflaws-tokenizer")


def spit_dct_to_train_test(dct, rng_slct):
    train_dct, test_dct = {}, {}
    for key in dct:
        train_dct[key] = [lst for idx, lst in enumerate(dct[key]) if not idx in rng_slct]
        test_dct[key]  = [lst for idx, lst in enumerate(dct[key]) if idx in rng_slct]
    return train_dct, test_dct



def chk_top_k_prd_model(model, dataset, spectrum_prediction, lst_compare, tokenizer):

    # Store in the np.array() to use with np.cumsum() & find line_num for each tokens
    arr_input_ids = np.asarray(list(dataset.unbatch().map(lambda x, y: x['input_ids'])))

    # Store in the np.array() to create range start_end
    true_arr_start_token_idxs = np.asarray(list(dataset.unbatch().map(lambda x, y: y['start_token_idx'])))
    true_arr_end_token_idxs = np.asarray(list(dataset.unbatch().map(lambda x, y: y['end_token_idx'])))

    # create a range from start to end for true answer
    true_arr_start_end_token_idxs = np.hstack([true_arr_start_token_idxs[:, None], true_arr_end_token_idxs[:, None]])

    for idx_1, rng_true in enumerate(true_arr_start_end_token_idxs):
        # Handling to insert last element to range
        rng_true[-1] += 1
        tmp_rng_true = np.arange(*rng_true)
        end_line_token_id = tokenizer.encode('\n')[1]
        # Find each token of input_ids exists in which line
        arr_lines_input_ids = (arr_input_ids[idx_1] == end_line_token_id).cumsum()
        cnt_true_len_line = np.unique(arr_lines_input_ids, return_counts=True)
        dct_cnt_true_len_line = dict(zip(*cnt_true_len_line))
        true_lines_error = arr_lines_input_ids[tmp_rng_true]
        true_line_error = collections.Counter(true_lines_error).most_common(1)[0][0]
        dct_tmp_store_result = {}

        for model_name, model_predict in models.items():
            # Get the prediction of the model on the test_dataset
            pred_arr_start_token_idxs, pred_arr_end_token_idxs = model_predict

            # Index of sorting predict array
            pred_arr_start_token_idxs = np.argsort(-pred_arr_start_token_idxs, axis=1)
            pred_arr_end_token_idxs = np.argsort(-pred_arr_end_token_idxs, axis=1)

            # create a range from start to end for prediction answer
            pred_arr_start_end_token_idxs = np.dstack([pred_arr_start_token_idxs[..., None], pred_arr_end_token_idxs[..., None]])

            lst_dct_store_pred = []
            for idx_2, rng_pred in enumerate(pred_arr_start_end_token_idxs[idx_1]):
                # Handling to insert last element to range
                rng_pred[-1] += 1
                tmp_rng_pred = np.arange(*rng_pred)
                pred_lines_error = arr_lines_input_ids[tmp_rng_pred]
                cnt_pred_len_line = np.unique(pred_lines_error, return_counts=True)
                dct_cnt_pred_len_line = dict(zip(*cnt_pred_len_line))
                lst_dct_store_pred.append(dct_cnt_pred_len_line)


            num_pred_model_prediction = 0
            for dct_store_pred in lst_dct_store_pred:
                dct_pred_div_true = {k: v/dct_cnt_true_len_line[k] for k,v in dct_store_pred.items()}
                sum_dct_pred_div_true_vals = sum(dct_pred_div_true.values())
                len_true_prediction_base_tokens = dct_pred_div_true.get(true_line_error, 0)
                if len_true_prediction_base_tokens > 0.8:
                    num_pred_model_prediction += (sum_dct_pred_div_true_vals / len(dct_pred_div_true.values()))
                    break
                else:
                    if sum_dct_pred_div_true_vals < 1:
                        sum_dct_pred_div_true_vals = 1
                    num_pred_model_prediction += sum_dct_pred_div_true_vals
            dct_tmp_store_result[model_name] = num_pred_model_prediction


        dct_weight_error_suspicious = spectrum_prediction['dct_weight_error_suspicious'][idx_1]
        arr_line_num_error = np.array(spectrum_prediction['lst_line_num_error'][idx_1])
        for formula in dct_weight_error_suspicious:
            # If code dose not have any error
            if (
                (arr_line_num_error == [0]) or
                (dct_weight_error_suspicious[formula] == {}) or
                (arr_line_num_error[0] not in dct_weight_error_suspicious[formula])):
                # Set maximum line_number instead of "np.nan" (because we need to check all lines)
                dct_tmp_store_result[formula] = arr_lines_input_ids.max()
            else:
                rank_lines = pd.Series(dct_weight_error_suspicious[formula]).rank(method='average', ascending=False)
                line_num_error = arr_line_num_error[0]
                dct_tmp_store_result[formula] = rank_lines[line_num_error]
        lst_compare.append(dct_tmp_store_result)



def create_only_nlp_model():
    input_ids = tf.keras.layers.Input(shape=(MAX_LEN_TOKEN,), name = 'input_ids', dtype=tf.int32)
    attention_mask = tf.keras.layers.Input(shape=(MAX_LEN_TOKEN,), name = 'attention_mask', dtype=tf.int32)
    attention_spectrum = tf.keras.layers.Input(shape=(MAX_LEN_TOKEN, 2), name = 'attention_spectrum', dtype=tf.float32)

    CODEBERT_BASE = transformers.TFRobertaModel.from_pretrained("microsoft/codebert-base")
    output_codebert_base = CODEBERT_BASE([input_ids, attention_mask]).last_hidden_state


    output_concat = tf.keras.layers.Dense(1024, use_bias=False)(output_codebert_base)
    output_concat = tf.keras.layers.Dense(512, use_bias=False)(output_concat)
    output_concat = tf.keras.layers.Dense(64, use_bias=False)(output_concat)
    output_concat = tf.keras.layers.Dense(16, use_bias=False)(output_concat)

    start_logits = tf.keras.layers.Dense(1, use_bias=False)(output_concat)
    start_logits = tf.keras.layers.Flatten()(start_logits)
    start_probs  = tf.keras.layers.Activation(tf.keras.activations.softmax, name="start_token_idx")(start_logits)

    end_logits = tf.keras.layers.Dense(1, use_bias=False)(output_concat)
    end_logits = tf.keras.layers.Flatten()(end_logits)
    end_probs  = tf.keras.layers.Activation(tf.keras.activations.softmax, name="end_token_idx")(end_logits)


    model = tf.keras.Model(
        inputs = [input_ids, attention_mask, attention_spectrum],
        outputs= [start_probs, end_probs],
    )
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
    optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)
    tf.keras.mixed_precision.set_global_policy("mixed_float16")
    model.compile(optimizer=optimizer, loss=[loss, loss])
    return model


def create_nlp_plus_spectrum_model(num_spectrum_formulas = 2):
    input_ids = tf.keras.layers.Input(shape=(MAX_LEN_TOKEN,), name = 'input_ids', dtype=tf.int32)
    attention_mask = tf.keras.layers.Input(shape=(MAX_LEN_TOKEN,), name = 'attention_mask', dtype=tf.int32)
    attention_spectrum = tf.keras.layers.Input(shape=(MAX_LEN_TOKEN, num_spectrum_formulas), name = 'attention_spectrum', dtype=tf.float32)

    CODEBERT_BASE = transformers.TFRobertaModel.from_pretrained("microsoft/codebert-base")
    output_codebert_base = CODEBERT_BASE([input_ids, attention_mask]).last_hidden_state
    output_concat = tf.keras.layers.Lambda(lambda x: tf.concat([x[0], x[1]], -1))([output_codebert_base, attention_spectrum])

    output_concat = tf.keras.layers.Dense(1024, use_bias=False)(output_concat)
    output_concat = tf.keras.layers.Dense(512, use_bias=False)(output_concat)
    output_concat = tf.keras.layers.Dense(64, use_bias=False)(output_concat)
    output_concat = tf.keras.layers.Dense(16, use_bias=False)(output_concat)


    start_logits = tf.keras.layers.Dense(1, use_bias=False)(output_concat)
    start_logits = tf.keras.layers.Flatten()(start_logits)
    start_probs  = tf.keras.layers.Activation(tf.keras.activations.softmax, name="start_token_idx")(start_logits)


    end_logits = tf.keras.layers.Dense(1, use_bias=False)(output_concat)
    end_logits = tf.keras.layers.Flatten()(end_logits)
    end_probs  = tf.keras.layers.Activation(tf.keras.activations.softmax, name="end_token_idx")(end_logits)


    model = tf.keras.Model(
        inputs = [input_ids, attention_mask, attention_spectrum],
        outputs= [start_probs, end_probs],
    )
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
    optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)
    tf.keras.mixed_precision.set_global_policy("mixed_float16")
    model.compile(optimizer=optimizer, loss=[loss, loss])
    return model



lst_compare_different_dataset = []
for i_ns in range(num_split):

    # Select range for split dataset to train & test
    rng_slct = range(i_ns*size_split, (i_ns+1)*size_split)


    # Split dict_dataset to train and test base index of range_select
    train_dataset_dict_x, test_dataset_dict_x = spit_dct_to_train_test(dataset_dict_x, rng_slct)
    train_dataset_dict_y, test_dataset_dict_y = spit_dct_to_train_test(dataset_dict_y, rng_slct)


    # Get test_split of dataset_spectrum to compare with predict of model
    _, test_dataset_spectrum = spit_dct_to_train_test(dataset_spectrum, rng_slct)


    # Creating tf.data.Dataset of train & test
    train_tf_dataset = tf.data.Dataset.from_tensor_slices((train_dataset_dict_x, train_dataset_dict_y)).batch(BATCH_SIZE)
    test_tf_dataset = tf.data.Dataset.from_tensor_slices((test_dataset_dict_x, test_dataset_dict_y)).batch(BATCH_SIZE)


    nlp_plus_spectrum_model = create_nlp_plus_spectrum_model(num_spectrum_formulas=2)
    # Train the model with train_tf_dataset
    nlp_plus_spectrum_model.fit(train_tf_dataset, epochs=4, validation_data=test_tf_dataset)
    # Get the prediction of the model on the test_dataset
    nlp_plus_spectrum_model_predict = nlp_plus_spectrum_model.predict(test_tf_dataset)
    tf.keras.backend.clear_session()
    gc.collect()
    del nlp_plus_spectrum_model


    only_nlp_model = create_only_nlp_model()
    # Train the model with train_tf_dataset
    only_nlp_model.fit(train_tf_dataset, epochs=4, validation_data=test_tf_dataset)
    # Get the prediction of the model on the test_dataset
    only_nlp_model_predict = only_nlp_model.predict(test_tf_dataset)
    tf.keras.backend.clear_session()
    gc.collect()
    del only_nlp_model


    models = {
        'only_nlp_model'  : only_nlp_model_predict,
        'nlp_plus_spectrum_model' : nlp_plus_spectrum_model_predict,
    }

    # Predict The result by test_tf_dataset and find top_k
    chk_top_k_prd_model(models, dataset = test_tf_dataset,
                        spectrum_prediction = test_dataset_spectrum,
                        lst_compare = lst_compare_different_dataset,
                        tokenizer = NEW_TOKENIZER)