In [1]:
import pandas as pd
import json
import glob
import os
from sklearn.metrics import precision_score, recall_score, f1_score
import re
import ast

In [2]:
def load_jsonl_to_dataframes(directory="."):
    """
    Load all .jsonl files in the specified directory and its subdirectories into DataFrames,
    storing them in a dictionary with filenames as keys.

    Args:
    directory (str): The directory to search for .jsonl files. Defaults to the current directory.

    Returns:
    dict: A dictionary where each key is the filename and the value is the corresponding DataFrame.
    """
    data_frames = {}
    # Search for all .jsonl files in the directory and subdirectories
    for file_path in glob.glob(os.path.join(directory, '**/*.jsonl'), recursive=True):
        data = []
        with open(file_path, 'r') as file:
            for line in file:
                data.append(json.loads(line))
        # Create a DataFrame from list of dictionaries
        df = pd.DataFrame(data)
        # Extract filename without extension as the key
        filename = os.path.splitext(os.path.basename(file_path))[0]
        data_frames[filename] = df
    return data_frames

In [3]:
# Example usage:
dataframes = load_jsonl_to_dataframes()  # Use the function for the current directory
file_keys = dataframes.keys()
file_key_list = list(file_keys)
exp_name = file_key_list[0]
df_output = dataframes[exp_name]

In [4]:
def find_symptom_string(main_string):

    main_string = str(main_string)
    
    # Regular expression to find the dictionaries in the string
    pattern1 = "\\{'.*':\\s*'\\w*',\\s\\s'shown_signs':.*\\s*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\s.*\\}\\}"
    pattern2 = "\\{.*\\}"
    
    # Find all matches in the main string
    match1 = re.findall(pattern1, main_string)
    # Find all matches in the main string
    match2 = re.findall(pattern2, main_string)

    final_matches = match1 + match2
    
    return final_matches

In [5]:
def find_animal_string(main_string):

    main_string = str(main_string)

    pattern = r"animal:\s*(\w+)"

    match = re.findall(pattern, main_string)
    
    return match

def check_exact_match(test_answer, response):
    
    # Compare dictionaries
    exact_match = test_answer == response

    return exact_match

def check_animal_match(test_answer, response):

    test_answer_extract = find_symptom_string(test_answer)
    test_answer_dict = ast.literal_eval(test_answer_extract[0])
    test_animal = test_answer_dict['animal']

    response_animal = find_animal_string(response)
    
    # Compare the animal fields
    animal_match = response_animal == response_animal

    return animal_match

def get_first_dict(string_data):
    
    start_idx = string_data.find("{")
    end_idx = string_data.find("}", start_idx) + 1
    first_dict_str = string_data[start_idx:end_idx]
    
    # Count the number of '{' and '}' in first_dict_str
    open_braces_count = first_dict_str.count("{")
    close_braces_count = first_dict_str.count("}")
    
    # If the number of '{' is more than '}', add '}' at the end of first_dict_str
    while open_braces_count > close_braces_count:
        first_dict_str += "}"
        close_braces_count += 1
    
    return first_dict_str


In [6]:
def check_shown_signs_accuracy(test_answer, response):
    
    test_answer_extract = find_symptom_string(test_answer)
    test_answer_dict = ast.literal_eval(test_answer_extract[0])

    response = get_first_dict(response)
    response_symptoms = find_symptom_string(response)

    # Check if response_symptoms is None or an empty list
    if not response_symptoms:
        return 0  # Return 0% accuracy if there are no symptoms in the response
    
    test_signs = test_answer_dict.get('shown_signs', {})
    
    response_signs = ast.literal_eval(response_symptoms[0])
    
    # Calculate the accuracy of shown_signs
    correct_matches = 0
    total_signs = len(test_signs)
    
    for sign, value in test_signs.items():
        if sign in response_signs and response_signs[sign] == value:
            correct_matches += 1
    
    # Calculate accuracy as a percentage
    accuracy = (correct_matches / total_signs) * 100 if total_signs > 0 else 0

    return accuracy

In [7]:
def check_shown_signs_metrics(test_answer, response):
    
    test_answer_extract = find_symptom_string(test_answer)
    test_answer_dict = ast.literal_eval(test_answer_extract[0])
    response_symptoms = find_symptom_string(response)
    
    # Extract shown_signs from both dictionaries
    test_signs = test_answer_dict.get('shown_signs', {})
    
    # Check if response_symptoms is None or an empty list
    if not response_symptoms:
        return 0, 0, 0

    response_symptoms_first = get_first_dict(response_symptoms[0])

    # Handle case where response_symptoms is an empty list
    if response_symptoms:
        response_signs = ast.literal_eval(response_symptoms_first)
    else:
        response_signs = set()
    
    # Get all unique signs
    all_signs = set(test_signs.keys()).union(set(response_signs.keys()))
    
    # Create lists for true values and predicted values
    y_true = [test_signs.get(sign, 0) for sign in all_signs]
    y_pred = [response_signs.get(sign, 0) for sign in all_signs]
    y_pred = [x if x in (-1, 0, 1) else 3 for x in y_pred]
    
    # Calculate precision, recall, and F1 score
    precision = precision_score(y_true, y_pred, average='weighted', zero_division=1)
    recall = recall_score(y_true, y_pred, average='weighted', zero_division=1)
    f1 = f1_score(y_true, y_pred, average='weighted', zero_division=1)

    return precision, recall, f1

In [8]:
def check_missing_extra_signs(test_answer, response):
    
    test_answer_extract = find_symptom_string(test_answer)
    test_answer_dict = ast.literal_eval(test_answer_extract[0])

    response = get_first_dict(response)
    response_symptoms = find_symptom_string(response)
    
    # Extract shown_signs from both dictionaries
    test_signs = set(test_answer_dict.get('shown_signs', {}))
    
    # Handle case where response_symptoms is an empty list
    if response_symptoms:
        response_signs = set(ast.literal_eval(response_symptoms[0]))
    else:
        response_signs = set()
    
    # Identify missing and extra signs
    missing_signs = test_signs - response_signs
    extra_signs = response_signs - test_signs
    
    return missing_signs, extra_signs, len(missing_signs), len(extra_signs)

In [9]:
def check_result_for_df(dataframe, exp_name):
    # Initialize lists to store the results
    exact_match_results = []
    animal_match_results = []
    shown_signs_accuracy_results = []
    shown_signs_precision_results = []
    shown_signs_recall_results = []
    shown_signs_f1_score_results = []
    missing_signs_results = []
    extra_signs_results = []
    num_missing_signs_results = []
    num_extra_signs_results = []

    # Iterate over each row in the dataframe
    for i in range(len(dataframe)):
        
        test_answer = dataframe['test_answer'][i]
        response = dataframe['response'][i]

        exact_match = check_exact_match(test_answer, response)
        animal_match = check_animal_match(test_answer, response)
        shown_signs_accuracy = check_shown_signs_accuracy(test_answer, response)
        precision, recall, f1 = check_shown_signs_metrics(test_answer, response)
        missing_signs, extra_signs, num_missing_signs, num_extra_signs = check_missing_extra_signs(test_answer, response)

        # Append the results to the lists
        exact_match_results.append(exact_match)
        animal_match_results.append(animal_match)
        shown_signs_accuracy_results.append(shown_signs_accuracy)
        shown_signs_precision_results.append(precision)
        shown_signs_recall_results.append(recall)
        shown_signs_f1_score_results.append(f1)
        missing_signs_results.append(list(missing_signs))
        extra_signs_results.append(list(extra_signs))
        num_missing_signs_results.append(num_missing_signs)
        num_extra_signs_results.append(num_extra_signs)

    # Create a new dataframe with the original data and the results
    result_df = pd.DataFrame({
        'exp_name': exp_name,
        'exact_match': exact_match_results,
        'animal_match': animal_match_results,
        'shown_signs_accuracy': shown_signs_accuracy_results,
        'shown_signs_precision': shown_signs_precision_results,
        'shown_signs_recall': shown_signs_recall_results,
        'shown_signs_f1_score': shown_signs_f1_score_results,
        'missing_signs': missing_signs_results,
        'extra_signs': extra_signs_results,
        'num_missing_signs': num_missing_signs_results,
        'num_extra_signs': num_extra_signs_results
    })
    
    return result_df

In [10]:
def calculate_average_metrics(result_df):
    """
    Calculate the average metrics for a given result dataframe.
    
    Args:
    result_df (pd.DataFrame): The result dataframe containing metrics.
    
    Returns:
    pd.DataFrame: A dataframe containing the average metrics.
    """
    average_metrics = {
        'exact_match': result_df['exact_match'].mean(),
        'animal_match': result_df['animal_match'].mean(),
        'shown_signs_accuracy': result_df['shown_signs_accuracy'].mean(),
        'shown_signs_precision': result_df['shown_signs_precision'].mean(),
        'shown_signs_recall': result_df['shown_signs_recall'].mean(),
        'shown_signs_f1_score': result_df['shown_signs_f1_score'].mean(),
        'num_missing_signs': result_df['num_missing_signs'].mean(),
        'num_extra_signs': result_df['num_extra_signs'].mean()
    }
    
    average_metrics_df = pd.DataFrame(average_metrics, index=[result_df['exp_name'].iloc[0]])
    
    return average_metrics_df

In [11]:
def update_and_save_compiled_results(avg_result_df, filename='compiled_results.csv'):
    """
    Update the compiled results DataFrame with new results and save it locally.
    If the file already exists, it updates the DataFrame; otherwise, it creates a new one.

    Args:
    result_df (pd.DataFrame): The result DataFrame containing the new results.
    filename (str): The filename for the compiled results CSV file.

    Returns:
    None
    """
    # Check if the file exists
    if os.path.exists(filename):
        compiled_results_df = pd.read_csv(filename)
    else:
        # Create an empty DataFrame with the specified columns
        compiled_results_df = pd.DataFrame(columns=[
            'exp_num', 'epoch_num', 'train/test', 'exact_match', 'animal_match',
            'shown_signs_accuracy', 'shown_signs_precision', 'shown_signs_recall',
            'shown_signs_f1_score', 'num_missing_signs', 'num_extra_signs'
        ])
    
    # Extract expnum, epochnum, and train/test from the exp_name
    exp_name = avg_result_df.index[0]
    parts = exp_name.split('_')
    epoch_num = parts[-1].replace('ep', '')
    exp_num = parts[-2].replace('exp', '')
    train_test = parts[-4]
    model_name = parts[0]
    
    # Create a dictionary with the new data
    new_data = {
        'exp_num': exp_num,
        'epoch_num': epoch_num,
        'train/test': train_test,
        'model': model_name,
        'exact_match': round(avg_result_df.at[exp_name, 'exact_match'], 2),
        'animal_match': round(avg_result_df.at[exp_name, 'animal_match'], 2),
        'shown_signs_accuracy': round(avg_result_df.at[exp_name, 'shown_signs_accuracy'], 2),
        'shown_signs_precision': round(avg_result_df.at[exp_name, 'shown_signs_precision'], 2),
        'shown_signs_recall': round(avg_result_df.at[exp_name, 'shown_signs_recall'], 2),
        'shown_signs_f1_score': round(avg_result_df.at[exp_name, 'shown_signs_f1_score'], 2),
        'num_missing_signs': round(avg_result_df.at[exp_name, 'num_missing_signs'], 2),
        'num_extra_signs': round(avg_result_df.at[exp_name, 'num_extra_signs'], 2)
    }
    
    # Convert the dictionary to a DataFrame
    new_df = pd.DataFrame(new_data, index=[exp_name])
    
    # Update the compiled results DataFrame
    compiled_results_df = pd.concat([compiled_results_df, new_df], ignore_index=True)

    compiled_results_df = compiled_results_df.drop_duplicates(ignore_index=True)
    
    # Save the updated compiled results DataFrame to CSV
    compiled_results_df.to_csv(filename, index=False)

    return compiled_results_df

In [12]:
for i in range(len(file_key_list)):
    try:
        exp_name = file_key_list[i]
        df_output = dataframes[exp_name]
        result_df = check_result_for_df(df_output, exp_name)
        avg_result_df = calculate_average_metrics(result_df)
        compiled_results_df = update_and_save_compiled_results(avg_result_df)
        print(f'Results added for : {exp_name}')
    except Exception as e:
        print(f'Failed to process {exp_name}')

  compiled_results_df = pd.concat([compiled_results_df, new_df], ignore_index=True)


Results added for : pythia_test_results_exp0_ep10
Results added for : pythia_test_results_exp0_ep20
Results added for : pythia_test_results_exp0_ep3
Results added for : pythia_test_results_exp1_ep10
Results added for : pythia_test_results_exp1_ep20
Results added for : pythia_test_results_exp1_ep3
Results added for : pythia_test_results_exp2_ep10
Results added for : pythia_test_results_exp2_ep20
Results added for : pythia_test_results_exp2_ep3
Results added for : pythia_test_results_exp3_ep10
Results added for : pythia_test_results_exp3_ep3
Results added for : pythia_test_results_exp4_ep20
Results added for : pythia_test_results_exp4_ep3
Results added for : pythia_test_results_exp5_ep10
Results added for : pythia_test_results_exp5_ep20
Results added for : pythia_test_results_exp5_ep3
Results added for : pythia_train_results_exp0_ep10
Results added for : pythia_train_results_exp0_ep20
Results added for : pythia_train_results_exp0_ep3
Results added for : pythia_train_results_exp1_ep10
Res