The goal of this file is to take final results of each experiment and conduct error analysis

- We want to know the Overall Accuracy and LD
- We also want to know the same values per key field

What needs to be done:
- First need to merge jsonl files and initial metadata.jsonl to be able to see the file name is it and trace back
- We then analyse the merged jsonl file
- This should be outputed as csv files and images

## Set up

In [None]:
!pip install pandas
!pip install numpy
!pip install matplotlib
!pip install seaborn
!pip install sklearn
!pip install Levenshtein
!pip install openpyxl

In [None]:
import os
import json
import pandas as pd
import numpy as np
from Levenshtein import distance
import matplotlib.pyplot as plt
from IPython.display import display
import openpyxl

## Overall and per field values

In [None]:
# Directory paths
final_results_dir = "final_results"  # Replace with the path to your results directory
setup_name = "setup_1"  # Replace with the name of your setup or experiment
report_dir = "report"


setup_dir = os.path.join(final_results_dir, setup_name)
images_dir = os.path.join(setup_dir, "images")
merged_jsonl_dir = os.path.join(setup_dir, "merged_jsonl")

# Ensure the images directory exists
if not os.path.exists(images_dir):
    os.makedirs(images_dir)

# File paths
merged_file = os.path.join(merged_jsonl_dir, 'merged_gpt4.jsonl')

# Load merged data
with open(merged_file, 'r') as f:
    merged_data = [json.loads(line) for line in f]

# Initialize lists to store F1 scores and Levenshtein distances for each invoice
f1_scores = []
lev_distances = []

# Initialize dictionaries to store field-level F1 scores and Levenshtein distances
field_f1_scores = {}
field_lev_distances = {}
field_TP_FP_FN = {}  # Store TP, FP, FN for each field

# Iterate over the merged data to calculate metrics
for entry in merged_data:
    ground_truth = json.loads(entry['ground_truth']) # ['gt_parse']
    predictions = json.loads(entry['predictions'])
    
    # Calculate TP, FP, and FN for F1 score
    TP = sum(1 for key in ground_truth if key in predictions and ground_truth[key] == predictions[key])
    FP = sum(1 for key in predictions if key not in ground_truth or (key in ground_truth and ground_truth[key] != predictions[key]))
    FN = sum(1 for key in ground_truth if key not in predictions)
    
    # Calculate precision, recall, and F1 score for this entry
    precision = TP / (TP + FP) if TP + FP > 0 else 0
    recall = TP / (TP + FN) if TP + FN > 0 else 0
    f1 = (2 * precision * recall) / (precision + recall) if precision + recall > 0 else 0
    f1_scores.append(f1)
    
    # Calculate field-level F1 scores and Levenshtein distances
    for key in ground_truth:
        if key not in field_f1_scores:
            field_f1_scores[key] = []
            field_lev_distances[key] = []
            field_TP_FP_FN[key] = {"TP": 0, "FP": 0, "FN": 0}
        
        # Calculate field-level F1 score
        if key in predictions:
            if ground_truth[key] == predictions[key]:
                field_f1 = 1
                field_TP_FP_FN[key]["TP"] += 1
            else:
                field_f1 = 0
                field_TP_FP_FN[key]["FP"] += 1
        else:
            field_f1 = 0
            field_TP_FP_FN[key]["FN"] += 1
        field_f1_scores[key].append(field_f1)
        
        # Calculate field-level Levenshtein distance
        # print(f"Key: {key}, Ground Truth: {ground_truth[key]}, Prediction: {predictions.get(key, '')}")
        lev_distance = distance(str(ground_truth[key]), str(predictions.get(key, "")))
        field_lev_distances[key].append(lev_distance)
    
    # Calculate overall Levenshtein distance for this entry
    num_fields_in_gt = len(ground_truth)
    avg_lev_distance = sum(distance(ground_truth[key], predictions.get(key, "")) for key in ground_truth) / num_fields_in_gt if num_fields_in_gt > 0 else 0
    lev_distances.append(avg_lev_distance)

# Calculate overall metrics
mean_f1_score = np.mean(f1_scores)
mean_lev_distance = np.mean(lev_distances)

# Print overall metrics
print(f"Overall F1 Score: {mean_f1_score:.4f} (Based on {len(merged_data)} invoices)")
print(f"Overall Levenshtein Distance: {mean_lev_distance:.4f} (Based on {len(merged_data)} invoices)")

# Print field-level metrics
print("\nField-Level F1 Scores:")
for key, scores in field_f1_scores.items():
    TP = field_TP_FP_FN[key]["TP"]
    FP = field_TP_FP_FN[key]["FP"]
    FN = field_TP_FP_FN[key]["FN"]
    precision = TP / (TP + FP) if TP + FP > 0 else 0
    recall = TP / (TP + FN) if TP + FN > 0 else 0
    print(f"{key}: {np.mean(scores):.4f} (TP = {TP}, FP = {FP}, FN = {FN}, precision = {precision:.4f}, recall = {recall:.4f}, Total: {TP + FP + FN} invoices)")

print("\nField-Level Levenshtein Distances:")
for key, distances in field_lev_distances.items():
    print(f"{key}: {np.mean(distances):.4f}")

# Define the order of keys for plotting
ordered_keys = [
    "vendor_name",
    "invoice_date",
    "invoice_number",
    "total_amount",
    "charge_period_start_date",
    "charge_period_end_date",
    "mpan",
    "account_number"
]

# Calculate accuracy per key
accuracy_per_key = pd.Series({key: field_TP_FP_FN[key]["TP"] / (field_TP_FP_FN[key]["TP"] + field_TP_FP_FN[key]["FP"] + field_TP_FP_FN[key]["FN"]) for key in ordered_keys})

# Calculate average Levenshtein distance per key
lev_distance_per_key = pd.Series({key: np.mean(field_lev_distances[key]) for key in ordered_keys})

# Plot the accuracy per key, adjust the plot, and save as image
accuracy_per_key.plot(kind='bar', figsize=(12, 7))
plt.ylabel('Accuracy')
plt.title('Accuracy per Key')
plt.xticks(rotation=45)  # Rotate the key names by 45 degrees
plt.tight_layout()  # Adjust the layout to ensure everything fits
plt.savefig(os.path.join(images_dir, f'accuracy_per_key_{setup_name}.png'))
plt.show()

# Plot the Levenshtein distance per key, adjust the plot, and save as image
lev_distance_per_key.plot(kind='bar', figsize=(12, 7))
plt.ylabel('Average Levenshtein Distance')
plt.title('Average Levenshtein Distance per Key')
plt.xticks(rotation=45)  # Rotate the key names by 45 degrees
plt.tight_layout()  # Adjust the layout to ensure everything fits
plt.savefig(os.path.join(images_dir, f'lev_distance_per_key_{setup_name}.png'))
plt.show()

# Create DataFrames to store the results for each sheet
df_overall = pd.DataFrame(columns=["Metric", "Value", "Description"])
df_field_f1 = pd.DataFrame(columns=["Field", "F1 Score", "TP", "FP", "FN", "Precision", "Recall", "Total Invoices"])
df_field_ld = pd.DataFrame(columns=["Field", "Levenshtein Distance"])

# Add overall metrics to the df_overall DataFrame
df_overall.loc[len(df_overall)] = ["Overall F1 Score", f"{mean_f1_score:.4f}", f"Based on {len(merged_data)} invoices"]
df_overall.loc[len(df_overall)] = ["Overall Levenshtein Distance", f"{mean_lev_distance:.4f}", f"Based on {len(merged_data)} invoices"]

# Add field-level F1 scores to the df_field_f1 DataFrame
for key, scores in field_f1_scores.items():
    TP = field_TP_FP_FN[key]["TP"]
    FP = field_TP_FP_FN[key]["FP"]
    FN = field_TP_FP_FN[key]["FN"]
    precision = TP / (TP + FP) if TP + FP > 0 else 0
    recall = TP / (TP + FN) if TP + FN > 0 else 0
    value = f"{np.mean(scores):.4f}"
    df_field_f1.loc[len(df_field_f1)] = [key, value, TP, FP, FN, f"{precision:.4f}", f"{recall:.4f}", TP + FP + FN]

# Add field-level Levenshtein distances to the df_field_ld DataFrame
for key, distances in field_lev_distances.items():
    value = f"{np.mean(distances):.4f}"
    df_field_ld.loc[len(df_field_ld)] = [key, value]

# Define the directory to save the report
report_dir = os.path.join(setup_dir, "report")

# Ensure the directory exists
if not os.path.exists(report_dir):
    os.makedirs(report_dir)

# Save the DataFrames to different sheets of an Excel file
report_file_path = os.path.join(report_dir, f"metrics_{setup_name}.xlsx")
with pd.ExcelWriter(report_file_path, engine='openpyxl') as writer:
    df_overall.to_excel(writer, sheet_name='Overall', index=False)
    df_field_f1.to_excel(writer, sheet_name='Field level F1 score', index=False)
    df_field_ld.to_excel(writer, sheet_name='Field level LD', index=False)

print(f"Metrics saved to {report_file_path}")

## Invoice level Analysis

In [None]:
import os
import json
import pandas as pd
import numpy as np
from Levenshtein import distance

# Directory paths
csv_dir = os.path.join(setup_dir, "csv")

# Ensure the csv directory exists
if not os.path.exists(csv_dir):
    os.makedirs(csv_dir)

# File paths
merged_file = os.path.join(merged_jsonl_dir, 'merged_gpt4.jsonl')
results_file = os.path.join(csv_dir, 'results.csv')

# Load merged data
with open(merged_file, 'r') as f:
    merged_data = [json.loads(line) for line in f]

# Get all possible keys
all_keys = set()
for entry in merged_data:
    ground_truth = json.loads(entry['ground_truth'])
    predictions = json.loads(entry['predictions'])
    all_keys.update(ground_truth.keys(), predictions.keys())

# Initialize a dictionary to store the results
results = {key: [] for key in all_keys}
results["file_name"] = []
results["overall_f1"] = []
results["overall_lev_distance"] = []

# For each key, initialize a list to store its Levenshtein distance
for key in all_keys:
    results[f"{key}_lev_distance"] = []

print("Results saved to setup directory:", setup_dir)

# Evaluate each invoice
for entry in merged_data:
    print(f"File name: {entry['file_name']}")
    
    ground_truth = json.loads(entry['ground_truth'])
    predictions = json.loads(entry['predictions'])
    
    print("Ground Truth:", json.dumps(ground_truth, indent=2))
    print("Prediction:", json.dumps(predictions, indent=2))
    
    TP = 0
    FP = 0
    FN = 0
    total_lev_distance = 0
    
    for key in all_keys:
        gt_value = str(ground_truth.get(key, ""))
        pred_value = str(predictions.get(key, ""))
        
        # Calculate Levenshtein distance for the key
        lev_dist = distance(gt_value, pred_value)
        total_lev_distance += lev_dist
        results[f"{key}_lev_distance"].append(lev_dist)
        
        if key in ground_truth:
            if gt_value == pred_value and gt_value:  # Both values are equal and not empty
                print(f"TP - Correct prediction for {key}: {pred_value}")
                results[key].append("TP")
                TP += 1
            elif pred_value:  # Prediction has a value but doesn't match ground truth
                print(f"FP - Incorrect prediction for {key}: predicted {pred_value}, actual {gt_value}")
                results[key].append("FP")
                FP += 1
            else:  # Ground truth has a value but prediction doesn't
                print(f"FN - Missing prediction for {key}: actual {gt_value}")
                results[key].append("FN")
                FN += 1
        else:
            if pred_value:  # Prediction has a value but key is not in ground truth
                print(f"FP - Useless prediction for {key}: predicted {pred_value}")
                results[key].append("FP")
                FP += 1
            else:
                results[key].append(np.nan)  # No value for this key in both ground truth and prediction

    # Calculate overall F1 score for the invoice
    precision = TP / (TP + FP) if TP + FP > 0 else 0
    recall = TP / (TP + FN) if TP + FN > 0 else 0
    f1 = (2 * precision * recall) / (precision + recall) if precision + recall > 0 else 0
    avg_lev_distance = total_lev_distance / len(ground_truth)
    
    print(f"TP: {TP}, FP: {FP}, FN: {FN}")
    print(f"Overall F1 Score for this file: {f1}")
    print(f"Overall Levenshtein Distance for this file: {avg_lev_distance}")
    print("\n")
    
    results["file_name"].append(entry['file_name'])
    results["overall_f1"].append(f1)
    results["overall_lev_distance"].append(avg_lev_distance)

# Convert the results to a DataFrame and save as a CSV file
df = pd.DataFrame(results)
df.to_csv(results_file, index=False)

# Print the first 5 rows of the DataFrame
print(df.head())

## Key field level Analysis

In [None]:
import pandas as pd
import json
import os

# Directory paths
setup_dir = os.path.join(final_results_dir, setup_name)
merged_jsonl_dir = os.path.join(setup_dir, "merged_jsonl")
csv_dir = os.path.join(setup_dir, "csv")

# File paths
merged_file = os.path.join(merged_jsonl_dir, 'merged_gpt4.jsonl')

# Load merged results
with open(merged_file, 'r') as f:
    merged_results = [json.loads(line) for line in f]

# Print the setup name
print(f"Setup: {setup_name}\n")

# Specify the keys you are interested in
keys_of_interest = ["vendor_name", "invoice_date", "invoice_number", "total_amount", "charge_period_start_date", "charge_period_end_date", "mpan", "account_number"]

# Initialize a dictionary to store the results
results = {key: {"file_name": [], "ground_truth": [], "prediction": [], "error_type": []} for key in keys_of_interest}

# Adjust pandas display settings
pd.set_option('display.float_format', lambda x: '%.0f' % x)

# Compare predictions and ground truths
for result in merged_results:
    ground_truth = json.loads(result['ground_truth'])
    predictions = json.loads(result['predictions'])
    
    for key in keys_of_interest:
        if key in ground_truth or key in predictions:
            results[key]["file_name"].append(result['file_name'])
            results[key]["ground_truth"].append(ground_truth.get(key, "N/A"))
            
            if key in predictions:
                if predictions[key] == ground_truth.get(key):
                    results[key]["prediction"].append("Correct")
                    results[key]["error_type"].append("TP")
                else:
                    results[key]["prediction"].append(f"{predictions[key]}")
                    results[key]["error_type"].append("FP")
            else:
                results[key]["prediction"].append("Missing")
                results[key]["error_type"].append("FN")

# Convert the results to a DataFrame and save as a CSV file for each key
for key in keys_of_interest:
    df = pd.DataFrame(results[key])
    results_file = os.path.join(csv_dir, f'results_{key}.csv')
    df.to_csv(results_file, index=False)

print("Results saved to setup directory:", setup_dir)

# Load the CSV file for each key and print the first 5 rows
for key in keys_of_interest:
    print(f"Results for {key}:")
    results_file = os.path.join(csv_dir, f'results_{key}.csv')
    results_df = pd.read_csv(results_file)
    # Filter out rows where the prediction is "Correct"
    error_rows = results_df.loc[results_df['prediction'] != 'Correct']
    display(error_rows)
    print("\n")
    
    # Provide a summary for each key
    total_items = len(results_df)
    errors = len(error_rows)
    accuracy = (total_items - errors) / total_items
    TP_count = len(results_df.loc[results_df['error_type'] == 'TP'])
    FP_count = len(results_df.loc[results_df['error_type'] == 'FP'])
    FN_count = len(results_df.loc[results_df['error_type'] == 'FN'])
    
    print(f"Summary for {key}:")
    print(f"Total items: {total_items}")
    print(f"Number of errors: {errors}")
    print(f"Accuracy: {accuracy:.2f}")
    print(f"TP: {TP_count}")
    print(f"FP: {FP_count}")
    print(f"FN: {FN_count}")
    print("========================================\n")


## Per vendor metrics

In [None]:
import os
import json

# Directory paths
setup_dir = os.path.join(final_results_dir, setup_name)
merged_jsonl_dir = os.path.join(setup_dir, "merged_jsonl")

# File paths
merged_file = os.path.join(merged_jsonl_dir, 'merged_gpt4.jsonl')
vendors_file = os.path.join(merged_jsonl_dir, 'vendors.jsonl')

# Load merged data
with open(merged_file, 'r') as f:
    merged_data = [json.loads(line) for line in f]

# Extract vendor names and write to vendors.jsonl
with open(vendors_file, 'w') as vf:
    for entry in merged_data:
        ground_truth = json.loads(entry['ground_truth'])
        vendor_name = ground_truth['vendor_name']
        entry['vendor_name'] = vendor_name
        vf.write(json.dumps(entry) + '\n')

In [None]:
import os
import json
import pandas as pd
import numpy as np

# Directory paths
setup_dir = os.path.join(final_results_dir, setup_name)
merged_jsonl_dir = os.path.join(setup_dir, "merged_jsonl")
csv_dir = os.path.join(setup_dir, "csv")

# Ensure the csv directory exists
if not os.path.exists(csv_dir):
    os.makedirs(csv_dir)

# File paths
vendors_file = os.path.join(merged_jsonl_dir, 'vendors.jsonl')
results_file = os.path.join(csv_dir, 'results_by_vendor.csv')

# Load data from vendors.jsonl
with open(vendors_file, 'r') as f:
    data = [json.loads(line) for line in f]

# Group data by vendor_name for easier processing
grouped_data = {}
for entry in data:
    vendor_name = entry['vendor_name']
    if vendor_name not in grouped_data:
        grouped_data[vendor_name] = []
    grouped_data[vendor_name].append(entry)

# Initialize a dictionary to store results for each vendor
vendor_results = {}

# Process each vendor's data
for vendor_name, vendor_data in grouped_data.items():
    # Initialize dictionaries to store field-level F1 scores and overall F1 scores
    field_f1_scores = {}
    overall_f1_scores = []
    field_TP_FP_FN = {}  # Store TP, FP, FN for each field

    # Iterate over the vendor data to calculate metrics
    for entry in vendor_data:
        ground_truth = json.loads(entry['ground_truth'])
        predictions = json.loads(entry['predictions'])
        
        TP = 0
        FP = 0
        FN = 0
        
        # Calculate F1 scores for each field
        for key in ground_truth:
            if key not in field_f1_scores:
                field_f1_scores[key] = []
                field_TP_FP_FN[key] = {"TP": 0, "FP": 0, "FN": 0}
            
            # Check if prediction matches ground truth
            if key in predictions:
                if ground_truth[key] == predictions[key]:
                    field_f1 = 1
                    field_TP_FP_FN[key]["TP"] += 1
                    TP += 1
                else:
                    field_f1 = 0
                    field_TP_FP_FN[key]["FP"] += 1
                    FP += 1
            else:
                field_f1 = 0
                field_TP_FP_FN[key]["FN"] += 1
                FN += 1
            field_f1_scores[key].append(field_f1)
        
        # Calculate overall F1 score for the invoice
        precision = TP / (TP + FP) if TP + FP > 0 else 0
        recall = TP / (TP + FN) if TP + FN > 0 else 0
        f1 = (2 * precision * recall) / (precision + recall) if precision + recall > 0 else 0
        overall_f1_scores.append(f1)

    # Store results for the vendor
    vendor_results[vendor_name] = {
        'invoices_count': len(vendor_data),
        'field_f1_scores': field_f1_scores,
        'overall_f1': np.mean(overall_f1_scores)
    }

# Convert the results to a DataFrame for easier saving and visualization
results_list = []
for vendor_name, metrics in vendor_results.items():
    result = {
        'vendor_name': vendor_name,
        'invoices_count': metrics['invoices_count'],
        'overall_f1': metrics['overall_f1']
    }
    for key, scores in metrics['field_f1_scores'].items():
        result[f"{key}_f1"] = np.mean(scores)
    results_list.append(result)

df = pd.DataFrame(results_list)
df.to_csv(results_file, index=False)

print(f"Results saved to: {results_file}")