# Logistic Regression for Code-Quality Metrics

The following is a demonstration of the use of machine learning prediction model (__Logistic Regression__) in order to establish if a correlation exists between *textual-similarity metrics* and the desired behavior of LLM-generated scripts. The dataset that was chosen for this research is [HumanEval+](https://github.com/evalplus/evalplus/releases/tag/v0.1.0), a set of 2.9 million LLM-generated Python scripts.

&NewLine;
Note: The first two steps of the experimental protocol -- functionality test and textual-metric measurement -- are not meant to be executed in this notebook due to the long duration of the process. Instead, we will use the results obtained during the research of this subject. The downloaded .zip file is to be extracted right outside the root directory of the project: https://filesender.renater.fr/?s=download&token=e2d853f8-a321-4f7a-834e-003eb0ce6356


## Experimental Protocol
The first step is to test the AI-script against the set of tests found in the canonical, human-made implementation of HumanEval tasks:

In [7]:
import os
import json
import subprocess

from pathlib import Path

def find_project_root(marker='requirements.txt'):
    path = Path().resolve()
    for parent in [path] + list(path.parents):
        if (parent / marker).exists():
            return parent
    raise FileNotFoundError(f"Could not find {marker} in any parent directories")

project_root_parent = find_project_root().parent

exp_data_path = os.path.join(project_root_parent, 'metric_exp_data')

code_path = os.path.join(exp_data_path, 'code')
results_path = os.path.join(exp_data_path, 'exp_results')

def get_functionality_test_path(dataset_name):

    target_path = os.path.join(results_path, dataset_name, 'functionality_tests')
    return target_path


def get_ai_code_path(dataset_name):
    target_path = os.path.join(code_path, dataset_name)
    return target_path


def get_metric_results_path(dataset_name):
    target_path = os.path.join(results_path, dataset_name, 'metrics_calc')
    return target_path


def get_logreg_results_path(dataset_name):
    target_path = os.path.join(results_path, dataset_name, 'metrics_logreg')
    return target_path

'''
Auxiliary functions used in the functionality testing of AI-generated code
'''

def custom_sort_key(s):
    # A sorting key used to sort strings in a length-lexicographic order (length and alphabetical order)
    return len(s), s


def code_cleanup(script, remove_assert=False, remove_exit=False):
    # Function that removes any unnecessary components of a given script (comments & tests), leaving only the code lines

    # Removing the test component of HumanEval implementation following 'METADATA' information
    if 'METADATA' in script:
        script = script.split('METADATA', 1)[0]
    elif 'def check(candidate)' in script:
        script = script.split('def check(candidate)', 1)[0]

    script_lines = script.splitlines()

    multi_line_comment = False
    comment_index = []
    assert_index = []
    empty_line_index = []
    exit_line_index = []

    for index, line in enumerate(script_lines):

        # Indexing any assert statement
        if remove_assert and 'assert' in line and line[0] == 'a':
            assert_index.append(index)
            continue

        if remove_exit and 'exit(' in line:
            exit_line_index.append(index)
            continue

        if not multi_line_comment:
            if '#' in line:
                # Indexing single-line comments
                if line.strip()[0] == '#':
                    comment_index.append(index)
                # Removing comment component of the line
                else:
                    cleaned_up_line = line.split('#', 1)[0]
                    script_lines[index] = cleaned_up_line
                continue

            # Indexing the first line of multi-line comments
            if '"""' in line or "'''" in line:
                comment_index.append(index)
                if line.count('"""') == 1 or line.count("'''") == 1:
                    multi_line_comment = True
                continue

        # Adding indexes for multi-line comments
        if multi_line_comment and ('"""' not in line and "'''" not in line):
            comment_index.append(index)
            continue

        # Indexing the last line of multi-line comments
        if multi_line_comment and ('"""' in line or "'''" in line):
            multi_line_comment = False
            comment_index.append(index)
            continue

        # Indexing new lines and blank lines
        if len(line) == 0 or line.isspace():
            empty_line_index.append(index)
            continue

    # Merging indexes for comments, empty lines and assert statements
    [comment_index.extend(indexes) for indexes in (empty_line_index, assert_index, exit_line_index)]

    # Removing all the unnecessary parts of code
    for index in sorted(comment_index, reverse=True):
        del script_lines[index]

    # Concatenating the list of script lines
    clean_script = '\n'.join(script_lines)
    return clean_script


def extract_checker(script):
    # Function that extracts the test component of HumanEval implementations

    # Extracting the 'checker' part of the HumanEval implementation
    extracted_checker = script.split('def check(', 1)[1]
    res = 'def check(' + extracted_checker

    list_lines = res.split('\n')

    del_index = []

    # Indexing empty lines, comments and useless asserts
    for index, line in enumerate(list_lines):
        if (len(line) == 0
                or line.isspace()
                or '#' in line
                or 'assert True' in line):
            del_index.append(index)

    for index in reversed(del_index):
        del list_lines[index]

    res = '\n'.join(list_lines)
    return res

'''
Main function that tests the correct functionality of all AI-scripts
'''
def test_impl_functionality(target_dataset):
    """
    Function that takes the AI-generated implementations and tests their correct functionality against the tests from
    the HumanEval implementation

    The result is saved locally in json files
    """
    ai_code_path = get_ai_code_path(target_dataset)
    humaneval_baseline_path = os.path.join(code_path, 'humaneval_baseline')
    funct_test_path_prefix = get_functionality_test_path(target_dataset)

    list_humaneval_scripts = sorted(os.listdir(humaneval_baseline_path))

    exp_continuation_started = False

    test_file_write_counter = 50

    # Experiment-resumption mechanism
    if os.path.exists(funct_test_path_prefix):
        test_file_exists = True

        # Obtaining the starting point of exp-resumption
        list_models = sorted(os.listdir(funct_test_path_prefix))
        last_tested_model = list_models[-1]
        last_model_path = os.path.join(funct_test_path_prefix, last_tested_model)

        list_model_temperatures = sorted(os.listdir(last_model_path))
        last_tested_temperature = list_model_temperatures[-1]
        tasks_folder_path = os.path.join(last_model_path, last_tested_temperature)

        list_tested_tasks = sorted(os.listdir(tasks_folder_path), key=custom_sort_key)
        last_task_name = list_tested_tasks[-1]
        last_task_path = os.path.join(tasks_folder_path, last_task_name)
        with open(last_task_path, 'r') as f:
            dict_test = json.load(f)
            if 'test_complete' in dict_test.keys() and dict_test['test_complete']:
                last_task_nb = len(list_tested_tasks)
                last_task_name = f'HumanEval_{last_task_nb}.json'
                script_starting_index = 0
            else:
                script_starting_index = len(dict_test.keys())-1

        model_name_and_temp = f'{last_tested_model}_{last_tested_temperature}'
        list_models = sorted(os.listdir(ai_code_path))
        model_temp_starting_index = list_models.index(model_name_and_temp)

        task_starting_index = int(last_task_name.split('_')[1].strip('.json'))
    
    else:
        test_file_exists = False
        script_starting_index = task_starting_index = model_temp_starting_index = 0

    list_models = sorted(os.listdir(ai_code_path))
    for model_index in range(model_temp_starting_index, len(list_models)):
        model_name_and_temp = list_models[model_index]
        model_path = os.path.join(ai_code_path, model_name_and_temp)

        print(f'Testing model: {model_name_and_temp}')

        list_tasks = sorted(os.listdir(model_path), key=custom_sort_key)

        for task_index in range(task_starting_index, len(list_tasks)):
            # Skipping Task_145 due to lack of AI-code that accomplishes the said task
            if task_index < 145:
                task_number = task_index
            else:
                task_number = task_index + 1

            task_name = f'HumanEval_{task_number}'
            model_name = model_name_and_temp.split('_')[0]
            model_temp = model_name_and_temp[-8:]

            print(f'Testing task: {task_name}')

            test_file_path = os.path.join(funct_test_path_prefix, model_name, model_temp, f'{task_name}.json')
            if os.path.exists(test_file_path):
                with open(test_file_path, 'r') as f:
                    dict_test = json.load(f)
            else:
                dict_test = {'test_complete': False}

            test_folder_path = test_file_path.rpartition('/')[0]
            if not os.path.exists(test_folder_path):
                os.makedirs(test_folder_path)

            # Recovering the HumanEval per-task functionality tests
            humaneval_file_name = list_humaneval_scripts[task_index]
            humaneval_file_path = os.path.join(humaneval_baseline_path, humaneval_file_name)

            humaneval_content = open(humaneval_file_path, 'r').read()

            checker = extract_checker(humaneval_content)

            generated_scripts_path = os.path.join(model_path, task_name)
            list_generated_scripts = sorted(os.listdir(generated_scripts_path), key=custom_sort_key)

            for script_index in range(script_starting_index, len(list_generated_scripts)):
                # Cleaning and merging the LLM-generated script with the HumanEval functionality tests
                script_name = list_generated_scripts[script_index]
                script_path = os.path.join(generated_scripts_path, script_name)
                script_content = open(script_path, 'r').read()
                cleaned_script = code_cleanup(script_content, remove_exit=True)

                merged_code = cleaned_script + '\n\n' + checker

                dict_test[script_name] = {}

                # Executing the merged script in a separate subprocess and stocking the result of the functionality test
                try:
                    subprocess.run(
                        ['python', '-c', merged_code],
                        stderr=subprocess.PIPE,
                        timeout=2,
                        check=True
                    )

                    dict_test[script_name]['successful'] = True

                except subprocess.TimeoutExpired:
                    dict_test[script_name]['successful'] = False
                    dict_test[script_name]['error_type'] = 'TimeOut'

                except subprocess.CalledProcessError as e:
                    dict_test[script_name]['successful'] = False

                    error_name_and_message = e.stderr.decode().split('\n')[-2]

                    if 'AssertionError' in error_name_and_message:
                        dict_test[script_name]['error_type'] = 'AssertionError'

                    elif ':' in error_name_and_message:
                        error_name = error_name_and_message.split(':')[0]
                        error_message = error_name_and_message.split(':')[1].strip()
                        dict_test[script_name]['error_type'] = error_name
                        dict_test[script_name]['error_message'] = error_message

                    else:
                        dict_test[script_name]['error_type'] = error_name_and_message

                # Writing the results in a json file every 50 iterations
                test_file_write_counter -= 1
                if not test_file_write_counter:
                    test_file_write_counter = 50
                    with open(test_file_path, 'w') as f:
                        json.dump(dict_test, f, indent=2)

            dict_test['test_complete'] = True

            with open(test_file_path, 'w') as f:
                json.dump(dict_test, f, indent=2)

            # Experiment resumption mechanism (i.e., reinitializing the starting index after re-launching the exp)
            if test_file_exists and not exp_continuation_started:
                script_starting_index = 0
        if test_file_exists and not exp_continuation_started:
            task_starting_index = 0
            exp_continuation_started = True
            

'''
The result of the main function is stored locally in json files
'''
func_test_path = get_functionality_test_path('ai_code')
funct_test_path = os.path.join(func_test_path, 'chatgpt', 'temp_0.8', 'HumanEval_1.json')

with open(funct_test_path, 'r') as file:
    funct_test_dict = json.load(file)

'''
Snippet of the obtained functionality test results
'''
for (key, value) in list(funct_test_dict.items())[1:8]:
    print(f'Script: {key}\n  Result: {value}\n')

Script: 0.py
  Result: {'successful': True}

Script: 1.py
  Result: {'successful': True}

Script: 2.py
  Result: {'successful': False, 'error_type': 'AssertionError'}

Script: 3.py
  Result: {'successful': False, 'error_type': 'AssertionError'}

Script: 4.py
  Result: {'successful': True}

Script: 5.py
  Result: {'successful': False, 'error_type': 'AssertionError'}

Script: 6.py
  Result: {'successful': False, 'error_type': 'NameError', 'error_message': "name 'List' is not defined. Did you mean"}



Next step is measuring the __metric__ score against __canonical implementation__ of HumanEval and save it in a *csv* file along with the __pass/fail__ label for each script: 

In [8]:
from enum import Enum
import signal
import pandas as pd
from codebleu import calc_codebleu

import io
import contextlib

stderr = io.StringIO()
with contextlib.redirect_stderr(stderr):
    # Supress warning about missing installation of a deeplearning framework
    import evaluate as ev

class Metric(Enum):
    bleu = 0
    codebleu = 1
    rouge = 2
    meteor = 3
    chrf = 4


# noinspection PyUnusedLocal
def timeout_handler(signum, frame):
    # Custom TimeOut exception used in 'test_functionality()' function
    raise TimeoutError('Execution timeout!')


# Initializing TimeOut exception
signal.signal(signal.SIGALRM, timeout_handler)


def calculate_metric(metric, baseline, generated_script, metric_calc=None):
    """
    Function that measures the LLM-script score of a given metric against the HumanEval implementation

    :param metric: integer that represents the desired metric to be used
    :param baseline: HumanEval script
    :param generated_script: LLM-generated script
    :param metric_calc: preloaded metric module
    :return: metric score
    """
    metric_name = Metric(metric).name

    score = {}

    if not generated_script:
        if metric != 1:
            return 0
        else:
            return {"codebleu": 0.0,
                    "ngram_match_score": 0.0,
                    "weighted_ngram_match_score": 0.0,
                    "syntax_match_score": 0.0,
                    "dataflow_match_score": 0.0}

    if metric == 1:
        metric_complete = False
        signal.alarm(2)
        while not metric_complete:
            try:
                score = calc_codebleu(predictions=[generated_script], references=[baseline], lang='python')
                signal.alarm(0)
                metric_complete = True
            except TimeoutError:
                print('Timeout Error')
                signal.alarm(2)

    else:
        if metric == 2:
            results = metric_calc.compute(predictions=[generated_script], references=[baseline], rouge_types=['rougeL'])
        else:
            results = metric_calc.compute(predictions=[generated_script], references=[baseline])

        if metric == 2:
            score = results['rougeL']
        elif metric == 4:
            score = results['score'] / 100
        else:
            score = results[metric_name]
    return score


def list_non_hidden_files(directory):
    # Function that returns the list of visible files from a given directory
    return [f for f in os.listdir(directory) if not f.startswith('.')]


def metric_measurement(target_dataset):
    """
    Function that iterates over the LLM-generated scripts and measures the metric score all the studied metrics
    :return: writes a csv file with the obtained score as well as pass/fail label for each AI-script
    """
    metric_folder_path = get_metric_results_path(target_dataset)
    ai_code_path = get_ai_code_path(target_dataset)
    humaneval_baseline_path = os.path.join(code_path, 'humaneval_baseline')
    functionality_test_path = get_functionality_test_path(target_dataset)

    list_models_and_temps = sorted(os.listdir(ai_code_path))
    list_humaneval_scripts = sorted(os.listdir(humaneval_baseline_path))

    # Experiment-resumption mechanism
    if not os.path.exists(metric_folder_path):
        os.mkdir(metric_folder_path)
        metric_file_exists = False
        script_starting_index = model_and_temp_starting_index = task_starting_index = metric_starting_index = 0

    else:
        # Obtaining the starting point of exp-resumption
        metric_file_exists = True

        metric_starting_index = len(os.listdir(metric_folder_path))-1
        last_tested_metric = Metric(metric_starting_index).name
        metric_folder_name = f'{last_tested_metric}_tasks'
        last_tested_metric_path = os.path.join(metric_folder_path, metric_folder_name)
        list_tested_tasks = sorted(list_non_hidden_files(last_tested_metric_path), key=custom_sort_key)
        task_starting_index = len(list_tested_tasks)-1

        task_csv_name = list_tested_tasks[0]
        current_task_path = os.path.join(last_tested_metric_path, task_csv_name)
        task_metric_df = pd.read_csv(current_task_path)

        if 'complete' in list_tested_tasks[0]:
            # Skipping to the next task if current task was complete in the previous exp
            task_starting_index += 1

            if task_starting_index == 163:
                metric_starting_index += 1
                task_starting_index = 0

                if metric_starting_index == 5:
                    print('Metric measurement complete')
                    exit(0)

            model_and_temp_starting_index = 0
            script_starting_index = 0

        else:
            last_row = task_metric_df.tail(1)
            last_row_series = last_row.iloc[0]
            last_model_and_temp = last_row_series['model&temp']
            last_script = last_row_series['script']

            task_name = task_csv_name.strip('.csv')
            model_and_temp_starting_index = list_models_and_temps.index(last_model_and_temp)
            current_model_and_temp_path = os.path.join(ai_code_path, last_model_and_temp, task_name)

            list_scripts = sorted(os.listdir(current_model_and_temp_path), key=custom_sort_key)
            script_starting_index = list_scripts.index(last_script) + 1

            if script_starting_index == len(list_scripts):
                model_and_temp_starting_index += 1
                script_starting_index = 0

    exp_continuation_started = False

    for metric_index in range(metric_starting_index, 5):
        metric_name = Metric(metric_index).name
        print(f'Analyzing metric: {metric_name}')

        target_folder_name = f'{metric_name}_tasks'
        current_metric_path = os.path.join(metric_folder_path, target_folder_name)
        if not os.path.exists(current_metric_path):
            os.mkdir(current_metric_path)

        # Preloading metric module for all metrics except CodeBLEU
        if metric_index != 1:
            metric_calc = ev.load(metric_name)
        else:
            metric_calc = None

        for task_index in range(task_starting_index, 163):
            if task_index < 145:
                task_number = task_index
            else:
                task_number = task_index + 1

            task_name = f'HumanEval_{task_number}'
            print(f'Analyzing task: {task_name}')
            task_csv_name = task_name + '.csv'
            task_csv_path = os.path.join(current_metric_path, task_csv_name)

            if os.path.exists(task_csv_path):
                task_metric_df = pd.read_csv(task_csv_path)
                task_metric = task_metric_df.to_dict('records')
            else:
                task_metric = []

            # Obtaining the HumanEval implementation as a comparison baseline
            target_humaneval = list_humaneval_scripts[task_index]
            target_humaneval_path = os.path.join(humaneval_baseline_path, target_humaneval)
            humaneval_content = open(target_humaneval_path, 'r').read()
            humaneval_script = code_cleanup(humaneval_content)

            nb_of_models_and_temps = len(list_models_and_temps)

            for model_and_temp_index in range(model_and_temp_starting_index, nb_of_models_and_temps):
                target_model_and_temp = list_models_and_temps[model_and_temp_index]
                print(f'Analyzing model and temp: {target_model_and_temp}')

                target_model_and_temp_path = os.path.join(ai_code_path, target_model_and_temp)

                target_task_path = os.path.join(target_model_and_temp_path, task_name)
                task_scripts = sorted(os.listdir(target_task_path), key=custom_sort_key)

                model_name = target_model_and_temp.split('_temp')[0]
                model_temp = target_model_and_temp[-8:]

                # Loading the functionality-test results for the current model/temp/task (used for the pass/fail label)
                target_functionality_test = os.path.join(functionality_test_path, model_name, model_temp,
                                                         f'{task_name}.json')
                with open(target_functionality_test, 'r') as f:
                    funct_test_results = json.load(f)

                file_write_counter = 100
                for script_index in range(script_starting_index, len(task_scripts)):
                    # Extracting and cleaning the LLM-generated script
                    script_name = f'{script_index}.py'
                    target_script_path = os.path.join(target_model_and_temp_path, task_name, script_name)
                    script_content = open(target_script_path).read()
                    cleaned_script = code_cleanup(script_content)

                    script_test_pass = funct_test_results[script_name]['successful']

                    # Measuring the metric score of the current script
                    score = calculate_metric(metric_index, humaneval_script, cleaned_script, metric_calc)
                    dict_entry = {'model&temp': target_model_and_temp,
                                  'script': script_name,
                                  'pass': script_test_pass}

                    if metric_index != 1:
                        dict_entry.update({'score': score})

                    else:
                        entry_addition = {'codebleu': score['codebleu'],
                                          'ngram_match_score': score['ngram_match_score'],
                                          'weighted_ngram_match_score': score['weighted_ngram_match_score'],
                                          'syntax_match_score': score['syntax_match_score'],
                                          'dataflow_match_score': score['dataflow_match_score']}
                        dict_entry.update(entry_addition)
                    task_metric.append(dict_entry)

                    # Writing the results in a csv file every 100 iterations
                    file_write_counter -= 1
                    if not file_write_counter or script_index == 199:
                        task_metric_df = pd.DataFrame.from_records(task_metric)
                        task_metric_df.to_csv(task_csv_path, index=False)
                        file_write_counter = 100

                # Experiment resumption mechanism (i.e., reinitializing the starting index after re-launching the exp)
                if metric_file_exists and not exp_continuation_started:
                    script_starting_index = 0

            if metric_file_exists and not exp_continuation_started:
                model_and_temp_starting_index = 0

            # Marking the resulting csv file as complete
            os.remove(task_csv_path)
            task_csv_name = f'{task_name}-complete.csv'
            task_csv_path = os.path.join(metric_folder_path, metric_name, task_csv_name)
            task_metric_df = pd.DataFrame.from_records(task_metric)
            task_metric_df.to_csv(task_csv_path, index=False)

        if metric_file_exists and not exp_continuation_started:
            task_starting_index = 0
            exp_continuation_started = True


'''
The metric score is saved locally in a .csv files
'''
metric_results_path = get_metric_results_path('ai_code')
example_metric_score_path = os.path.join(metric_results_path, 'bleu_tasks', 'HumanEval_1-complete.csv')
metric_results = pd.read_csv(example_metric_score_path)

print(metric_results.head(10))

         model&temp script   pass     score
0  chatgpt_temp_0.0   0.py   True  0.166768
1  chatgpt_temp_0.8   0.py   True  0.209111
2  chatgpt_temp_0.8   1.py   True  0.246753
3  chatgpt_temp_0.8   2.py  False  0.188520
4  chatgpt_temp_0.8   3.py  False  0.277207
5  chatgpt_temp_0.8   4.py   True  0.259093
6  chatgpt_temp_0.8   5.py  False  0.227023
7  chatgpt_temp_0.8   6.py  False  0.163480
8  chatgpt_temp_0.8   7.py   True  0.223758
9  chatgpt_temp_0.8   8.py  False  0.211294


Once all the __metric__ scores are measured, we can start applying *Logistic Regression* training based on the __metric score__ and the __pass/fail__ label in order to establish if a correlation exists between the two. In order to do so, we first run __100 iterations__ of machine learning and prediction per __metric__ (which gives us the scores for: *precision*, *recall*, *f1* and *accuracy*) and then we measure the *average* and *variance* values of obtained evaluation scores.


In [9]:
import copy

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report


def run_logistic_regression(target_dataset, nb_iterations = 100):
    """
    Function that runs logistic regression over the LLM-generated script results (i.e., establish if a correlation
    exists between metric score and pass/fail label)
    :return: a json file with scores for precision, recall, f1, accuracy
    """
    metric_score_path = get_metric_results_path(target_dataset)

    logreg_results_path = get_logreg_results_path(target_dataset)
    logreg_iterations_path = os.path.join(logreg_results_path, 'iterations')

    os.makedirs(logreg_iterations_path, exist_ok=True)

    for item in sorted(os.listdir(metric_score_path)):
        logreg_dict = {'decision_boundary': 0.5}
        logreg_test_pred_dict = {}
        # Iterate over csv files with the metric score and the pass/fail label of LLM-generated scripts
        if '.csv' in item:
            metric_name = item

            metric_path = os.path.join(metric_score_path, metric_name)
            metric_df = pd.read_csv(metric_path)

            metric_name = metric_name.split('.')[0]
            logreg_file_name = f'{metric_name}_logreg_iterations.json'
            test_pred_file_name = f'{metric_name}_logreg_test_pred.json'
            logreg_file_path = os.path.join(logreg_iterations_path, logreg_file_name)
            test_pred_file_path = os.path.join(logreg_iterations_path, test_pred_file_name)

            print(f'Analyzing metric: {metric_name}')

            if 'codebleu' in metric_name:
                x = metric_df[['codebleu']]
            else:
                x = metric_df[['score']]
            y = metric_df['pass']

            # Run 100 iterations of logistic regression with different split of train/test datasets
            for i in range(nb_iterations):

                if i % 5 == 0:
                    print(f'Iteration {i+1}')

                x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.25)

                logreg = LogisticRegression(random_state=16)
                logreg.fit(x_train, y_train)
                y_pred = logreg.predict(x_test)

                # Saving the classification evaluation results (precision, recall, f1, accuracy)
                logreg_results = classification_report(y_test, y_pred, target_names=['fail', 'pass'], output_dict=True)
                logreg_dict[f'iter_{i+1}'] = logreg_results

                # Saving the ground truth labels (pass/fail) and the predict labels
                logreg_test_pred_dict[f'iter_{i + 1}'] = {}
                logreg_test_pred_dict[f'iter_{i+1}']['y_test'] = y_test.tolist()
                logreg_test_pred_dict[f'iter_{i+1}']['y_pred'] = y_pred.tolist()

            with open(logreg_file_path, 'w') as f:
                json.dump(logreg_dict, f, indent=2)
            with open(test_pred_file_path, 'w') as f:
                json.dump(logreg_test_pred_dict, f)


def divide_by(input_dict, divide = 100):
    # Function that divides the obtained scores for the average calculation
    input_dict['pass']['precision'] /= divide
    input_dict['pass']['recall'] /= divide
    input_dict['pass']['f1-score'] /= divide

    input_dict['fail']['precision'] /= divide
    input_dict['fail']['recall'] /= divide
    input_dict['fail']['f1-score'] /= divide

    input_dict['accuracy'] /= divide

    input_dict['macro avg']['precision'] /= divide
    input_dict['macro avg']['recall'] /= divide
    input_dict['macro avg']['f1-score'] /= divide

    input_dict['weighted avg']['precision'] /= divide
    input_dict['weighted avg']['recall'] /= divide
    input_dict['weighted avg']['f1-score'] /= divide


def logreg_average_variance(target_dataset, nb_iterations = 100):
    # Function that measures the average and variance values of the 100 logreg-iteration results
    logreg_results_path = get_logreg_results_path(target_dataset)
    logreg_iterations_path = os.path.join(logreg_results_path, 'iterations')

    metrics_template = {'precision': 0.0, 'recall': 0.0, 'f1-score': 0.0, 'support': 0}
    logreg_template = {'pass': copy.deepcopy(metrics_template),
                       'fail': copy.deepcopy(metrics_template),
                       'accuracy': 0.0,
                       'macro avg': copy.deepcopy(metrics_template),
                       'weighted avg': copy.deepcopy(metrics_template)}

    for file_name in sorted(os.listdir(logreg_iterations_path)):
        if 'iterations' in file_name:
            current_file_path = os.path.join(logreg_iterations_path, file_name)
            with open(current_file_path, 'r') as f:
                logreg_dict = json.load(f)

            metric_name = file_name.split('_')[0]
            logreg_file_name = f'{metric_name}_logreg_avg-var.json'
            avg_var_folder_path = logreg_iterations_path.rpartition('/')[0]
            logreg_file_path = os.path.join(avg_var_folder_path, logreg_file_name)

            logreg_avg_var = {
                "decision_boundary": 0.5,
                'average': copy.deepcopy(logreg_template),
                'variance': copy.deepcopy(logreg_template)}

            for iteration in list(logreg_dict.keys())[1:]:
                logreg_avg_var['average']['pass']['precision'] += logreg_dict[iteration]['pass']['precision']
                logreg_avg_var['average']['pass']['recall'] += logreg_dict[iteration]['pass']['recall']
                logreg_avg_var['average']['pass']['f1-score'] += logreg_dict[iteration]['pass']['f1-score']
                logreg_avg_var['average']['pass']['support'] += logreg_dict[iteration]['pass']['support']

                logreg_avg_var['average']['fail']['precision'] += logreg_dict[iteration]['fail']['precision']
                logreg_avg_var['average']['fail']['recall'] += logreg_dict[iteration]['fail']['recall']
                logreg_avg_var['average']['fail']['f1-score'] += logreg_dict[iteration]['fail']['f1-score']
                logreg_avg_var['average']['fail']['support'] += logreg_dict[iteration]['fail']['support']

                logreg_avg_var['average']['accuracy'] += logreg_dict[iteration]['accuracy']

                logreg_avg_var['average']['macro avg']['precision'] += logreg_dict[iteration]['macro avg']['precision']
                logreg_avg_var['average']['macro avg']['recall'] += logreg_dict[iteration]['macro avg']['recall']
                logreg_avg_var['average']['macro avg']['f1-score'] += logreg_dict[iteration]['macro avg']['f1-score']
                logreg_avg_var['average']['macro avg']['support'] += logreg_dict[iteration]['macro avg']['support']

                logreg_avg_var['average']['weighted avg']['precision'] += (
                    logreg_dict)[iteration]['weighted avg']['precision']
                logreg_avg_var['average']['weighted avg']['recall'] += (
                    logreg_dict)[iteration]['weighted avg']['recall']
                logreg_avg_var['average']['weighted avg']['f1-score'] += (
                    logreg_dict)[iteration]['weighted avg']['f1-score']
                logreg_avg_var['average']['weighted avg']['support'] += (
                    logreg_dict)[iteration]['weighted avg']['support']

            divide_by(logreg_avg_var['average'], nb_iterations)

            for iteration in list(logreg_dict.keys())[1:]:
                logreg_avg_var['variance']['pass']['precision'] += abs(logreg_avg_var['average']['pass']['precision'] -
                                                                       logreg_dict[iteration]['pass']['precision'])
                logreg_avg_var['variance']['pass']['recall'] += abs(logreg_avg_var['average']['pass']['recall'] -
                                                                    logreg_dict[iteration]['pass']['recall'])
                logreg_avg_var['variance']['pass']['f1-score'] += abs(logreg_avg_var['average']['pass']['f1-score'] -
                                                                      logreg_dict[iteration]['pass']['f1-score'])
                logreg_avg_var['variance']['pass']['support'] = logreg_avg_var['average']['pass']['support']

                logreg_avg_var['variance']['fail']['precision'] += abs(
                    logreg_avg_var['average']['fail']['precision'] -
                    logreg_dict[iteration]['fail']['precision'])
                logreg_avg_var['variance']['fail']['recall'] += abs(
                    logreg_avg_var['average']['fail']['recall'] -
                    logreg_dict[iteration]['fail']['recall'])
                logreg_avg_var['variance']['fail']['f1-score'] += abs(
                    logreg_avg_var['average']['fail']['f1-score'] -
                    logreg_dict[iteration]['fail']['f1-score'])
                logreg_avg_var['variance']['fail']['support'] = logreg_avg_var['average']['fail']['support']

                logreg_avg_var['variance']['accuracy'] += abs(logreg_avg_var ['average']['accuracy'] -
                                                              logreg_dict[iteration]['accuracy'])

                logreg_avg_var['variance']['macro avg']['precision'] += abs(
                    logreg_avg_var['average']['macro avg']['precision'] -
                    logreg_dict[iteration]['macro avg']['precision'])
                logreg_avg_var['variance']['macro avg']['recall'] += abs(
                    logreg_avg_var['average']['macro avg']['recall'] -
                    logreg_dict[iteration]['macro avg']['recall'])
                logreg_avg_var['variance']['macro avg']['f1-score'] += abs(
                    logreg_avg_var['average']['macro avg']['f1-score'] -
                    logreg_dict[iteration]['macro avg']['f1-score'])
                logreg_avg_var['variance']['macro avg']['support'] = logreg_avg_var['average']['macro avg']['support']

                logreg_avg_var['variance']['weighted avg']['precision'] += abs(
                    logreg_avg_var['average']['weighted avg']['precision'] -
                    logreg_dict[iteration]['weighted avg']['precision'])
                logreg_avg_var['variance']['weighted avg']['recall'] += abs(
                    logreg_avg_var['average']['weighted avg']['recall'] -
                    logreg_dict[iteration]['weighted avg']['recall'])
                logreg_avg_var['variance']['weighted avg']['f1-score'] += abs(
                    logreg_avg_var['average']['weighted avg']['f1-score'] -
                    logreg_dict[iteration]['weighted avg']['f1-score'])
                logreg_avg_var['variance']['weighted avg']['support'] = (
                    logreg_avg_var)['average']['weighted avg']['support']

            with open(logreg_file_path, 'w') as f:
                json.dump(logreg_avg_var, f, indent=2)


def metric_name_to_title(metric_name):
    # Function that returns the name of a metric used in the confusion matrix representation
    title = ''
    match metric_name:
        case 'bleu':
            title = 'BLEU'
        case 'codebleu':
            title = 'CodeBLEU'
        case 'rouge':
            title = 'ROUGE'
        case 'meteor':
            title = 'METEOR'
        case 'chrf':
            title = 'ChrF'
    return title


def format_logreg_results(logreg_dict):
    row_format = '{:<12} {:>10.2f} {:>10.2f} {:>10.2f} {:>10}'
    row_format_accuracy = '{:<33}  {:>10.2f} {:>10}'

    formated_rows = []

    for label in ['pass', 'fail']:
        row = row_format.format(
            label,
            logreg_dict[label]['precision'],
            logreg_dict[label]['recall'],
            logreg_dict[label]['f1-score'],
            int(logreg_dict[label]['support'])
        )
        formated_rows.append(row)

    accuracy_row = row_format_accuracy.format(
        'accuracy',
        logreg_dict['accuracy'],
        int(logreg_dict['pass']['support'] + logreg_dict['fail']['support'])
    )

    for avg_label in ['macro avg', 'weighted avg']:
        avg_row = row_format.format(
            avg_label,
            logreg_dict[avg_label]['precision'],
            logreg_dict[avg_label]['recall'],
            logreg_dict[avg_label]['f1-score'],
            int(logreg_dict[avg_label]['support'])
        )
        formated_rows.append(avg_row)
    formated_rows.insert(2, f'\n{accuracy_row}')
    return '\n'.join(formated_rows)


original_dataset = 'ai_code'
number_iterations = 50
run_logistic_regression(original_dataset, number_iterations)
logreg_average_variance(original_dataset, number_iterations)


Analyzing metric: bleu
Iteration 1


KeyboardInterrupt: 

### Evaluation metrics explanation

![image info](./images/metric_formulas/precision.png)

&NewLine;
Among predicted labels, how many of them are correct.
The above formula is for predictions of 'Positive' (or '*Pass*' in our case).
However, the same formula can also apply to the '*Fail*' prediction: TN / (TN + FN)

___
![image info](./images/metric_formulas/recall.png)

&NewLine;
Among the existing labels, how many were correctly predicted (e.g., how many actual *Pass* scripts were predicted as such)
___
![image info](./images/metric_formulas/f1-score.png)

&NewLine;
F1-score measures a balance between *Precision* and *Recall*. This metric indicates the model's per-label performance.
___
![image info](./images/metric_formulas/accuracy.png)

&NewLine;
Overall number of correct predictions. It is important to note that this metric is only relevant when the dataset is balanced (i.e., equivalent number of *Pass/Fail* labels) and when correctly predicting both labels is equally important, which is not always the case.
___
&NewLine;
&NewLine;

__Support__ = number of ground-truth *Pass/Fail* labels
___

&NewLine;
&NewLine;
#### Macro VS Weighted average:
- Macro is the standard average value of the precision, recall, and F1-score, without taking into account the number of samples in each label. It treats all labels equally.
- Weighted is the average of the precision, recall, and F1-score which is weighted by the number of samples in each label. It accounts for the imbalance in the number of samples per label.

The following are the results of the *LinearRegression* training/testing done previously in the notebook.

In [10]:
def display_logreg_avg_var(target_dataset, metric):
    metric_name = Metric(metric).name
    metric_title = metric_name_to_title(metric_name)

    logreg_results_path = get_logreg_results_path(target_dataset)
    logreg_avg_var_path = os.path.join(logreg_results_path, f'{metric_name}_logreg_avg-var.json')

    with open(logreg_avg_var_path, 'r') as f:
        logreg_dict = json.load(f)

    first_section = True

    print(f'Logistic Regression results for \"{metric_title}\" metric (average and variance):\n')
    for dict_key in list(logreg_dict.keys())[1:]:
        print(f'{dict_key}:')
        print(f"{' ':<12} {'precision':>10} {'recall':>10} {'f1-score':>10} {'support':>10}")
        print(format_logreg_results(logreg_dict[dict_key]))
        if first_section:
            print('-' * 60)
            first_section = False


# Textual metrics
bleu = 0
codebleu = 1
rouge = 2
meteor = 3
chrf = 4

display_logreg_avg_var(original_dataset, metric=bleu)

FileNotFoundError: [Errno 2] No such file or directory: '/home/zara/textual-metrics/metric_exp_data/exp_results/ai_code/metrics_logreg/bleu_logreg_avg-var.json'

In [None]:
display_logreg_avg_var(original_dataset, metric=codebleu)

In [None]:
display_logreg_avg_var(original_dataset, metric=rouge)

In [None]:
display_logreg_avg_var(original_dataset, metric=meteor)

In [None]:
display_logreg_avg_var(original_dataset, metric=chrf)

## Analysis of predictive models' efficiency
The results obtained above point to the fact that, no matter the chosen *textual-metric*, the predictive models struggle at correctly predicting the actual labels:

At a first glance, the models seem to correctly predict the *Fail* label, with precision, recall and f1-scores values ranging between 80%-97%. However, this is potentially due to the highly __unbalanced__ nature of the dataset: this can be seen in the difference of the *support* value for both labels. Roughly 77% of AI-generated scripts do not pass the tests, either due to undesired behavior or execution errors.

The predictive models' struggle becomes mo obvious when observing the results for predicting the *Pass* label:
- Bad *Precision* ~55%-65%: meaning that almost half of the *Pass* predictions are actually *Fail*
- Even worse *Recall* ~15%-25%: more than 3/4 of *Pass* scripts are labeled as *Fail*
- High variance: the trained models have highly varying degrees of correctly predicting the *Pass* label

___
## Confusion Matrix
A more visual way of representing the results of the predictive models is the *Confusion Matrix*, which displays the ratio of the correct/wrong predictions.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import metrics

def generate_confusion_matrix(dataset_name, nb_iterations=100, font_size=14):
    # Generate the confusion matrix based on the ground truth and predicted labels of pass/fail
    logreg_res_folder_path = os.path.join(results_path, dataset_name, 'metrics_logreg', 'iterations')
    matrix_folder_path = f'./images/confusion_matrix/'

    os.makedirs(matrix_folder_path, exist_ok=True)

    # Generate the confusion matrix per code-quality metric
    for logreg_result in sorted(os.listdir(logreg_res_folder_path)):
        if 'test_pred' in logreg_result:
            metric_name = logreg_result.split('_')[0]
            matrix_title = metric_name_to_title(metric_name)

            print(f'Generating confusion matrix for metric: {metric_name}')

            if 'v2' in logreg_result:
                file_name = f'{metric_name}_v2.png'
            else:
                file_name = f'{metric_name}.png'

            image_file_path = os.path.join(matrix_folder_path, file_name)

            test_pred_file_path = os.path.join(logreg_res_folder_path, logreg_result)
            with open(test_pred_file_path, 'r') as f:
                test_pred_dict = json.load(f)

            # Cumulate the 100 iteration confusion matrices
            cumulative_confusion_matrix = np.zeros((2, 2), dtype=int)

            for iteration in range(nb_iterations):
                current_iteration = f'iter_{iteration+1}'
                y_test = test_pred_dict[current_iteration]['y_test']
                y_pred = test_pred_dict[current_iteration]['y_pred']

                current_confusion_matrix = metrics.confusion_matrix(y_test, y_pred)
                cumulative_confusion_matrix += current_confusion_matrix

            average_confusion_matrix = cumulative_confusion_matrix / nb_iterations

            total_predictions = cumulative_confusion_matrix.sum()

            # Convert each element to percentage
            percentage_confusion_matrix = (average_confusion_matrix / total_predictions) * nb_iterations * 100

            class_names = ['fail', 'pass']
            fig, ax = plt.subplots(figsize=(8, 6))
            tick_marks = np.arange(len(class_names))
            plt.xticks(tick_marks, class_names, fontsize=font_size)
            plt.yticks(tick_marks, class_names, fontsize=font_size)

            # Create the heatmap
            sns.heatmap(pd.DataFrame(percentage_confusion_matrix), annot=True, cmap='YlGnBu', fmt='.2f',
                        xticklabels=class_names, yticklabels=class_names, ax=ax,
                        annot_kws={"size": font_size})
            ax.xaxis.set_label_position('top')
            plt.title(f'{matrix_title}', y=1.05, fontsize=font_size + 2)  # Adjust the title position
            plt.ylabel('Actual label', fontsize=font_size)
            plt.xlabel('Predicted label', fontsize=font_size)

            plt.tight_layout()  # Adjust layout to prevent clipping

            # Save and display the figure
            fig.savefig(image_file_path, dpi=96)
            plt.close(fig)

"""
Note: In order to visualize the generated confusion matrices, you have to start editing the next Markdown section and simply exit in order for the IDE to load the images.
"""
generate_confusion_matrix(original_dataset, nb_iterations=number_iterations)

![image info](./images/confusion_matrix/bleu.png) ![image info](./images/confusion_matrix/codebleu.png)
![image info](./images/confusion_matrix/rouge.png) ![image info](./images/confusion_matrix/meteor.png)
![image info](./images/confusion_matrix/chrf.png)

## Confusion matrix analysis

This way of representing the prediction results gives us a more global overview of the trained models' efficiency, which points to an overwhelming under-performance. The first noticeable aspect is the quantity of True Negatives (~75%), which is consistent with the previous statement about the unbalanced nature of the dataset. The second noticeable aspect is the quantity of False Negatives, meaning scripts that __pass__ the tests but are predicted as *Fail*. The percentage of False Negatives is roughly 15%-18%, which is on average 3 to 4 times more than True Positives. The last note-worthy detail is the ratio between True Positives and False Positives, which points to the fact that 1/3 of scripts predicted as *Pass* actually __fail__ the tests.


Given these results, it is safe to assume that the training of Logistic Regression models failed. This could be due to the highly unbalanced nature of the dataset -- 77% of failing scripts -- which can lead to a "negative" training of predictive models. However, it is possible that the nature of dataset is not the only cause of the underwhelming results. Among the metrics that qualify the predictive efficiency of trained models, we can notice that the *variance* for the *Pass* predictions is very high, meaning that the models' capability of predicting the *Pass* label is highly unstable. This could potentially point to the fact that there is no definite correlation between the textual-metric score and the script's test result.

___
## Textual-metrics and programming languages

Textual-metrics were initially created for qualifying machine-generated natural language (e.g., machine translation, automatic summarization). Although there are some similarities between natural and programming languages, there are some key differences that could make textual-metrics unsuitable for *code qualification*. While both natural and programming languages have a predefined structure and syntactical rules, the latter is usually stricter and requires a more robust formulation; for example, missing parenthesis or a semicolon will make a script unable to compile, while in natural text it can simply change the meaning of a sentence without completely breaking it. Such minute details would have very little impact on the textual difference between a reference and a prediction, which can lead to textual-metrics being too __lax__ when qualifying code.

*Example of an AI-generated script that contains natural text outside a comment which leads to a compilation error*

![image info](./images/ai_code_errors/python_error.PNG)

On the other hand, in contrast with natural text, programming languages give more freedom in the naming of variables and functions. While human developers choose meaningful and coherent names for variables and functions, the parser will accept any name as long as it respects a limited amount of rules (e.g., no white spaces or special characters). Furthermore, the names of variables and functions have virtually no impact on the execution logic; nonetheless, such differences between a prediction and a reference will have a significant impact on the textual similarity. This aspect can lead to textual-similarity metrics being too __strict__ when qualifying code.

*Example of identical prediction and reference with different names of variables and function*

![image info](./images/ai_code_errors/codebleu_example.PNG)

___
___
___
### No duplicate scripts section
Given these results, it is safe to assume that the training of Logistic Regression models failed. This could be due to the highly unbalanced nature of the dataset: 77% of failing scripts. However, another aspect that could negatively impact the training is the presence of duplicate scripts among AI-generated code. Due to the "nature" of LLMs, the way it generates the response, it is possible to obtain the exact same answer for a given prompt, even across different models and temperatures. The presence of duplicate data could have a negative impact on the trained models -- a phenomenon known as *Overfitting* -- which is why the experimental protocol was repeated on a subset of the original dataset in which all the duplicate scripts are removed.

___

## Duplicate-free subset (to be deleted)

Before continuing with the experimental protocol, it is worth noting that after the removal of duplicate scripts, the size of the obtained subset has shrunk to 1.42 million scripts, and its unbalanced nature is even more pronounced, with a fail rate of 87%.

In [None]:
"""
Repeating the experimental protocol on the duplicate-free dataset
"""
distinct_dataset = 'ai_code_distinct'
number_iterations = 50
run_logistic_regression(distinct_dataset, number_iterations)
logreg_average_variance(distinct_dataset, number_iterations)

In [None]:
display_logreg_avg_var(distinct_dataset, metric=bleu)

In [None]:
display_logreg_avg_var(distinct_dataset, metric=codebleu)

In [None]:
display_logreg_avg_var(distinct_dataset, metric=rouge)

In [None]:
display_logreg_avg_var(distinct_dataset, metric=meteor)

In [None]:
display_logreg_avg_var(distinct_dataset, metric=chrf)

In [None]:
generate_confusion_matrix(distinct_dataset, nb_iterations=number_iterations)

![image info](./images/confusion_matrix/ai_code_distinct/bleu.png) ![image info](./images/confusion_matrix/ai_code_distinct/codebleu.png)
![image info](./images/confusion_matrix/ai_code_distinct/rouge.png) ![image info](./images/confusion_matrix/ai_code_distinct/meteor.png)
![image info](./images/confusion_matrix/ai_code_distinct/chrf.png)