# This file compares the teamtat annotation with Extraction performed (Json)

In [1]:
from sklearn.metrics import precision_score, recall_score, f1_score
from difflib import SequenceMatcher
import numpy as np
import json
import os
import xml.etree.ElementTree as ET 
import pandas as pd
import re

## Loading Teamtat Annotation as dataframe

In [2]:
#Teamtat Annotation
annotation_df = pd.read_csv("data/150_papers_json.csv")[["id", "first_num", "output"]]
annotation_df = annotation_df.sort_values(by = ['first_num'])

In [3]:
def str_toJson(string):
    ##The json output from Llama dataframe was not in correct json format
    #We will change the single quotes to double quotes
    # We will change the None to null
    json_string = string.replace("None", "null")

    try:
        # Try to load the JSON string
        json_object = json.loads(json_string)
        return json_object
    except json.JSONDecodeError as e:
        # Catch JSONDecodeError if the string is not valid JSON
        print(f"Error decoding JSON: {e}")
        return None
    except Exception as e:
        # Catch any other exceptions
        print(f"An error occurred: {e}")
        return None

In [4]:
annotation_df['output'] = annotation_df['output'].apply(str_toJson)
annotation_df

Unnamed: 0,id,first_num,output
0,0_54,0,{'perovskite_composition': 'Cs 0.05 FA 0.85 MA...
1,1_22,1,"{'perovskite_composition': None, 'electron_tra..."
2,2_75,2,{'perovskite_composition': 'dibutylammonium le...
3,3_52,3,{'perovskite_composition': 'Cs0.05(MA0.10FA0.8...
4,4_26,4,{'perovskite_composition': '(MAPbBr3)0.05(FAPb...
...,...,...,...
144,145_31,145,{'perovskite_composition': 'Cs0.05(MA0.17FA0.8...
145,146_36,146,{'perovskite_composition': 'Cs0.05(MA0.05FA0.9...
146,147_41,147,{'perovskite_composition': 'formamidinium lead...
147,148_26,148,{'perovskite_composition': 'Cs0.05FA0.85MA0.10...


## Loading in JSON extraction

In [5]:
## extraction performed by Llama (Daniel)
# Read JSON from a file
with open("data/deepseek_newschema_OG.json", 'r') as f:
    extraction = json.load(f)

extraction_df = pd.DataFrame(list(extraction.items()), columns=['paper_num', 'output'])
extraction_df['paper_num'] = pd.to_numeric(extraction_df['paper_num'])
extraction_df = extraction_df.sort_values('paper_num')
extraction_df["output"]


77     {'perovskite_composition': 'FAPbI3', 'electron...
125    {'perovskite_composition': 'FA1-x MAx PbI3', '...
8      {'perovskite_composition': '(BA)₂PbI₄', 'elect...
32     {'perovskite_composition': 'Cs5(MA0.10FA0.90)P...
29     {'perovskite_composition': '(MAPbBr3)0.05(FAPb...
                             ...                        
4      {'perovskite_composition': 'Cs0.1FA0.6MA0.3Sn0...
38     {'perovskite_composition': 'Cs 0.05 (MA 0.17 F...
89     {'perovskite_composition': 'Cs 0.05 (MA 0.05 F...
120    {'perovskite_composition': 'FAPbI3', 'electron...
85     {'perovskite_composition': 'Cs 0.05 FA 0.85 MA...
Name: output, Length: 129, dtype: object

#### Analyzing these outputs for a bit

In [6]:
outputs_annotated = annotation_df["output"]
for i in outputs_annotated:
    # print(i.keys())
    for key in i.keys():
        if key.startswith('test'):
            print(i[key].keys())
            # if 'efficiency_cont' in i[key].keys():
            #     print(f"control{i[key]['efficiency_control']}, cont{i[key]['efficiency_cont']}")

##Founding: THE 4 entity that is to compare is PEROVSKITE COMPOSITION, ETL, HTL, STRUCTURE

##Founding in stability: efficiency_control is wrong, All value is None, so ignore. 
##['stability_type', 'passivating_molecule', 'humidity', 'temperature', 'time', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc', 'efficiency_tret'] is common across every stability entity
## 'efficiency_cont' are included or not

dict_keys(['stability_type', 'passivating_molecule', 'humidity', 'temperature', 'time', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc', 'efficiency_control', 'efficiency_tret'])
dict_keys(['stability_type', 'passivating_molecule', 'humidity', 'temperature', 'time', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc', 'efficiency_control', 'efficiency_tret'])
dict_keys(['stability_type', 'passivating_molecule', 'humidity', 'temperature', 'time', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc', 'efficiency_control', 'efficiency_tret', 'efficiency_cont'])
dict_keys(['stability_type', 'passivating_molecule', 'humidity', 'temperature', 'time', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc', 'efficiency_control', 'efficiency_tret'])
dict_keys(['stability_type', 'passivating_molecule', 'humidity', 'temperature', 'time', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc', 'efficiency_control', 'efficiency_tret'])
dict_keys(['stability_type', 'p

In [7]:
def include_passivating(dictionary):
    ##In extraction json, realized that some extraction has passivating molecule that is NOT included in its stability testing. 
    ## Since passivating molecule (if exist) needs to be in stability testing (nexted dictionary), we will transfer the information and spit out a cleaned dictionary. 
    if "passivating_molecule" in dictionary.keys():
        passivating = dictionary['passivating_molecule']
        del dictionary['passivating_molecule']
        
        for entity in dictionary.keys():
            if entity.startswith('test'):
                # print(i['entity'])
                if type(dictionary[entity]) == dict:
                    if 'passivating_molecule' in i['test_1']:
                        continue
                    else:
                        # print("Have to include passivating molecule in tests")
                        dictionary[entity]['passivating_molecule'] = passivating
        
    return dictionary

In [8]:
extraction_df['output'] = extraction_df['output'].apply(include_passivating)
extraction_df

Unnamed: 0,paper_num,output
77,0,"{'perovskite_composition': 'FAPbI3', 'electron..."
125,1,"{'perovskite_composition': 'FA1-x MAx PbI3', '..."
8,2,"{'perovskite_composition': '(BA)₂PbI₄', 'elect..."
32,3,{'perovskite_composition': 'Cs5(MA0.10FA0.90)P...
29,4,{'perovskite_composition': '(MAPbBr3)0.05(FAPb...
...,...,...
4,144,{'perovskite_composition': 'Cs0.1FA0.6MA0.3Sn0...
38,145,{'perovskite_composition': 'Cs 0.05 (MA 0.17 F...
89,146,{'perovskite_composition': 'Cs 0.05 (MA 0.05 F...
120,147,"{'perovskite_composition': 'FAPbI3', 'electron..."


In [9]:
outputs_extracted = extraction_df["output"]
for i in outputs_extracted:
    print(i.keys())
    for key in i.keys():
        if key.startswith('test'):
            if type(i[key]) == dict:
                print(i[key].keys())

dict_keys(['perovskite_composition', 'electron_transport_layer', 'pin_nip_structure', 'hole_transport_layer', 'test_1'])
dict_keys(['test_name', 'temperature', 'time', 'humidity', 'control_efficiency', 'treatment_efficiency', 'passivating_molecule', 'control_pce', 'control_voc', 'treated_pce', 'treated_voc'])
dict_keys(['perovskite_composition', 'electron_transport_layer', 'pin_nip_structure', 'hole_transport_layer', 'test_1'])
dict_keys(['test_name', 'temperature', 'time', 'humidity', 'control_efficiency', 'treatment_efficiency', 'passivating_molecule', 'control_pce', 'control_voc', 'treated_pce', 'treated_voc'])
dict_keys(['perovskite_composition', 'electron_transport_layer', 'pin_nip_structure', 'hole_transport_layer', 'test_1'])
dict_keys(['test_name', 'temperature', 'time', 'humidity', 'control_efficiency', 'treatment_efficiency', 'passivating_molecule', 'control_pce', 'control_voc', 'treated_pce', 'treated_voc'])
dict_keys(['perovskite_composition', 'electron_transport_layer', 'p

In [10]:
## Convert all numerical data into float for both
def convert_numeric(dictionary):
    numerical_key = ['temperature', 'time', 'humidity', 'efficiency_cont', 'efficiency_tret', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc', 'control_efficiency', 'treatment_efficiency']

    translation_table = str.maketrans('', '', 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%^&*()')
    for key in dictionary.keys():
        if (key.startswith('test')) & (type(dictionary[key]) == dict):
            for entity in dictionary[key].keys():
                if entity in numerical_key:
                    if isinstance(dictionary[key][entity], str): 
                        substitute = re.sub(r'\D', '', dictionary[key][entity])
                        if len(substitute) != 0:
                            numerical_value = float(substitute)
                            dictionary[key][entity] = numerical_value
                        else:
                            dictionary[key][entity] = None
    return dictionary

In [11]:
extraction_df['output'] = extraction_df['output'].apply(convert_numeric)
annotation_df['output'] = annotation_df['output'].apply(convert_numeric)


In [12]:
## Do any json manipulation here 


## Merging dataframe

In [13]:
evaluate_df = annotation_df.merge(extraction_df, left_on='first_num', right_on='paper_num')[["paper_num", "output_x",'output_y']]
evaluate_df.columns = ['paper_num', 'annotation', 'extracted']
evaluate_df

Unnamed: 0,paper_num,annotation,extracted
0,0,{'perovskite_composition': 'Cs 0.05 FA 0.85 MA...,"{'perovskite_composition': 'FAPbI3', 'electron..."
1,1,"{'perovskite_composition': None, 'electron_tra...","{'perovskite_composition': 'FA1-x MAx PbI3', '..."
2,2,{'perovskite_composition': 'dibutylammonium le...,"{'perovskite_composition': '(BA)₂PbI₄', 'elect..."
3,3,{'perovskite_composition': 'Cs0.05(MA0.10FA0.8...,{'perovskite_composition': 'Cs5(MA0.10FA0.90)P...
4,4,{'perovskite_composition': '(MAPbBr3)0.05(FAPb...,{'perovskite_composition': '(MAPbBr3)0.05(FAPb...
...,...,...,...
124,144,"{'perovskite_composition': None, 'electron_tra...",{'perovskite_composition': 'Cs0.1FA0.6MA0.3Sn0...
125,145,{'perovskite_composition': 'Cs0.05(MA0.17FA0.8...,{'perovskite_composition': 'Cs 0.05 (MA 0.17 F...
126,146,{'perovskite_composition': 'Cs0.05(MA0.05FA0.9...,{'perovskite_composition': 'Cs 0.05 (MA 0.05 F...
127,147,{'perovskite_composition': 'formamidinium lead...,"{'perovskite_composition': 'FAPbI3', 'electron..."


### We will first compare paper #0 

In [14]:
evaluation_0 = evaluate_df[evaluate_df['paper_num'] == 0]
evaluation_0

Unnamed: 0,paper_num,annotation,extracted
0,0,{'perovskite_composition': 'Cs 0.05 FA 0.85 MA...,"{'perovskite_composition': 'FAPbI3', 'electron..."


In [15]:
annotation_0 = evaluation_0['annotation'].values[0]
extraction_0 = evaluation_0['extracted'].values[0]

In [16]:
def compare_data(labeled_data, extracted_data, numerical_tolerance=0.01):
    results = {}
    total_fields = len(labeled_data)
    matched_fields = 0
    numerical_differences = []

    # print(labeled_data['structure_pin_nip'] == extracted_data['pin_nip_structure'])
    # print(extracted_data['pin_nip_structure'])

    ##Compare 'perovskite_composition', 'electron_transport_layer', 'pin_nip_structure', 'hole_transport_layer'
    common_key_annotated = ['perovskite_composition', 'electron_transport_layer', 'hole_transport_layer', 'structure_pin_nip']
    commo_key_extracted = ['perovskite_composition', 'electron_transport_layer', 'hole_transport_layer', 'pin_nip_structure']
    for key_i in range(len(common_key_annotated)):

        if (labeled_data[common_key_annotated[key_i]]!=None) & (extracted_data[commo_key_extracted[key_i]]==None):
            results[common_key_annotated[key_i]] == 'Not found'
        elif (labeled_data[common_key_annotated[key_i]]==None) & (extracted_data[commo_key_extracted[key_i]]!=None):
            results[common_key_annotated[key_i]] == 'halucinating'
        
        elif labeled_data[common_key_annotated[key_i]] == extracted_data[commo_key_extracted[key_i]]:
            matched_fields += 1
            results[common_key_annotated[key_i]] = "Match"
        else:
            similarity = SequenceMatcher(None, labeled_data[common_key_annotated[key_i]].lower(), extracted_data[commo_key_extracted[key_i]].lower()).ratio()
            if similarity > 0.8:  # Threshold for similarity
                matched_fields += 1
                results[common_key_annotated[key_i]] = f"Fuzzy Match ({similarity:.2f})"
            else:
                results[common_key_annotated[key_i]] = f"Mismatch ({similarity:.2f})"

    stability_entity_annotated = ['stability_type', 'temperature', 'time', 'humidity', 'passivating_molecule', 'efficiency_cont', 'efficiency_tret', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc']
    stability_entity_extracted = ['test_name', 'temperature', 'time', 'humidity', 'passivating_molecule','control_efficiency', 'treatment_efficiency', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc']

    ##Find list of stability related keys in annotated
    stability_key_annotated = []
    for key_annotated in labeled_data.keys():
        if key_annotated.startswith('test'):
            stability_key_annotated.append(key_annotated)

    for key_extract in extracted_data.keys():
        if (key_extract.startswith('test')) & (isinstance(extracted_data[key_extract], dict)):
            results[key_extract] = {}
            for stability_annotated in stability_key_annotated:
                for entity_i in range(len(stability_entity_annotated)):
                    if (stability_entity_annotated[entity_i] in labeled_data[stability_annotated]) & (stability_entity_extracted[entity_i] in extracted_data[key_extract]):
                        entity_annotated_value = labeled_data[stability_annotated][stability_entity_annotated[entity_i]]
                        entity_extracted_value = extracted_data[key_extract][stability_entity_extracted[entity_i]]
                        if (entity_annotated_value!=None) & (entity_extracted_value==None):
                            results[key_extract[stability_entity_extracted[entity_i]]] == 'Not found'
                        elif (entity_annotated_value==None) & (entity_extracted_value!=None):
                            results[key_extract[stability_entity_extracted[entity_i]]] == 'halucinating'
                        elif isinstance(entity_annotated_value, (int, float)) and isinstance(entity_extracted_value, (int, float)):
                            value_num = entity_annotated_value
                            extract_num = entity_extracted_value
                            if abs(value_num - extract_num) <= numerical_tolerance * abs(value_num):
                                matched_fields += 1
                                numerical_differences.append(abs(value_num - extract_num))
                                matched_fields += 1
                                results[key_extract][stability_entity_extracted[entity_i]] = 'Numerical Match'
                            else:
                                results[key_extract][stability_entity_extracted[entity_i]] = 'Numerical MisMatch'
                        elif isinstance(entity_annotated_value, (str)) and isinstance(entity_extracted_value, (str)):
                            similarity = SequenceMatcher(None, entity_annotated_value.lower(), entity_extracted_value.lower()).ratio()
                            if similarity > 0.8:  # Threshold for similarity
                                matched_fields += 1
                                results[key_extract][stability_entity_extracted[entity_i]] = f"Fuzzy Match ({similarity:.2f})"
                            else:
                                results[key_extract][stability_entity_extracted[entity_i]] = f"Mismatch ({similarity:.2f})"
            total_fields += len(results[key_extract])

    accuracy = matched_fields / total_fields
    mean_absolute_error = np.mean(numerical_differences) if numerical_differences else None

    
    
    return {
        "results": results,
        "accuracy": accuracy,
        "mean_absolute_error": mean_absolute_error,
        "matched_fields": matched_fields,
        "total_fields": total_fields
    }


In [17]:
comparison_result = compare_data(annotation_0, extraction_0)
print(comparison_result)

{'results': {'perovskite_composition': 'Mismatch (0.35)', 'electron_transport_layer': 'Match', 'hole_transport_layer': 'Mismatch (0.09)', 'structure_pin_nip': 'Match', 'test_1': {'test_name': 'Mismatch (0.73)', 'temperature': 'Numerical MisMatch', 'time': 'Numerical MisMatch', 'passivating_molecule': 'Mismatch (0.11)', 'treatment_efficiency': 'Numerical MisMatch', 'control_pce': 'Numerical MisMatch', 'treated_pce': 'Numerical MisMatch', 'treated_voc': 'Numerical MisMatch'}}, 'accuracy': 0.15384615384615385, 'mean_absolute_error': None, 'matched_fields': 2, 'total_fields': 13}


In [70]:
annotation_0['test_1']['humidity'] == None

True

In [19]:
extraction_0

{'perovskite_composition': 'FAPbI3',
 'electron_transport_layer': 'C60',
 'pin_nip_structure': 'PIN',
 'hole_transport_layer': 'TiO2',
 'test_1': {'test_name': 'ISOS-D',
  'temperature': 85,
  'time': 1500,
  'humidity': None,
  'control_efficiency': 21.4,
  'treatment_efficiency': 23.94,
  'passivating_molecule': 'CMAI',
  'control_pce': 21.4,
  'control_voc': 1.092,
  'treated_pce': 23.94,
  'treated_voc': 1.149}}

## First, we will evaluate one pair of papers and adjust specific formatting

In [20]:
##0th paper prediction
subset_llama

NameError: name 'subset_llama' is not defined

In [581]:
#The actual label
clean_label_data['0']

{'ISOSL3': {'control_pce': '24',
  'efficiency_tret': '95%',
  'time': '1200',
  'treated_pce': '26.9',
  'treated_voc': '1.18'},
 'structure_pin_nip': 'PIN',
 'passivating_molecule': '4Cl-BZS',
 'perovskite_composition': 'Cs 0.05 FA 0.85 MA 0.1 PbI 3',
 'hole_transport_layer': '2PACz and Me-4PACz',
 'electron_transport_layer': 'C60'}

#### Evaluation in parts
- numerical data 
- text data (molecule)
- stability
    - Change how to parse xml
    - Change the output of the model as ID but no specification on number


In [582]:
## We need precision and recall for EACH variable
## For each variable, calculate the F1 score - There is F1 score for each variable
## Take a weighted average ***For now, just take the average. 

Variables (number)
- control_pce
- treatment_pce
- control_voc
- treatment_voc

Variable (text)
- structure_pin_nip
- passivating_molecule
- perovskite_composition
- hole_transport_layer
- electron_transport_layer

Stability (later)

In [21]:
def check_float(value):
    for char in value:
        if (char == ".") | (char.isdigit()):
            continue
        else:
            return False
    return True


In [22]:
import re

def check_float(value):
    """Returns True if value can be converted to a float, otherwise False."""
    try:
        float(value)
        return True
    except ValueError:
        return False

def clean_numeric_value(value):
    """
    Cleans a numeric value by removing non-numeric characters except decimals.
    Ensures the output is either a float or a string that can be converted.
    """
    if value is None:
        return None
    
    value = str(value).strip()  # Convert to string and remove leading/trailing spaces
    cleaned_value = re.sub(r"[^\d.]", "", value)  # Remove all non-numeric characters except "."

    return cleaned_value if check_float(cleaned_value) else None

def numerical_comparison(id, label_annotation, extraction_annotation, numerical_tolerance=0.1):
    """
    Compares numerical values with a tolerance, handling different formatting issues.
    """
    if id not in extraction_annotation:
        return "FN"  # False Negative: Missing extracted value
    
    label_data = label_annotation.get(id)
    extract_data = extraction_annotation.get(id)

    # Clean and convert numeric values
    label_data = clean_numeric_value(label_data)
    extract_data = clean_numeric_value(extract_data)

    if label_data is None or extract_data is None:
        return "FN"  # False Negative: If either value is invalid

    label_data = float(label_data)
    extract_data = float(extract_data)

    # Apply numerical tolerance check
    if abs(label_data - extract_data) <= numerical_tolerance * abs(label_data):
        return "TP"  # True Positive: Correct numerical extraction
    else:
        return "FP"  # False Positive: Incorrect numerical extraction


In [23]:
from difflib import SequenceMatcher

def text_comparison(id, label_annotation, extraction_annotation, text_similarity_threshold=0.8):
    """Compares text values using string similarity matching."""
    common_key_annotated = ['perovskite_composition', 'electron_transport_layer', 'hole_transport_layer', 'structure_pin_nip']
    commo_key_extracted = ['perovskite_composition', 'electron_transport_layer', 'hole_transport_layer', 'pin_nip_structure']

    # Handle special case for structure_pin_nip
    key_to_check = "pin_nip_structure" if id == "structure_pin_nip" else id

    # If the key is missing in the extracted annotation, return False Negative
    if key_to_check not in extraction_annotation:
        return "FN"

    label_data = label_annotation.get(id, "")
    extract_data = extraction_annotation.get(key_to_check, "")

    # Convert lists to strings if necessary
    if isinstance(label_data, list):
        label_data = " ".join(map(str, label_data))  # Convert list to string
    if isinstance(extract_data, list):
        extract_data = " ".join(map(str, extract_data))  # Convert list to string

    # Ensure values are strings
    if not isinstance(label_data, str) or not isinstance(extract_data, str):
        return "FP"  # If data is still not a string, return False Positive

    # Compute similarity score
    similarity = SequenceMatcher(None, label_data.lower(), extract_data.lower()).ratio()

    return "TP" if similarity > text_similarity_threshold else "FP"

        


In [24]:
def stability_comparison(id, label_annotation, extraction_annotation):
    # Ensure "stability_tests" exists and is a list
    if "stability_tests" not in extraction_annotation or not isinstance(extraction_annotation["stability_tests"], list):
        return "FN"  # No stability test data found

    # If "stability_tests" is empty, return False Negative
    if len(extraction_annotation["stability_tests"]) == 0:
        return "FN"

    for dictionary in extraction_annotation["stability_tests"]:
        # Skip if "test_name" is missing or invalid
        if "test_name" not in dictionary or not isinstance(dictionary["test_name"], str):
            continue
        
        test_name = dictionary["test_name"]
        if id[4] == test_name[-1]:  # Match last character of ID and test name
            return "TP"  # True Positive: Test correctly extracted

    return "FN"  # No match found

In [25]:
def safe_division(numerator, denominator):
    """Returns division result, or 0 if the denominator is zero."""
    return numerator / denominator if denominator != 0 else 0

In [587]:
# def safe_division(numerator, denominator):
#     """Returns division result, or 0 if the denominator is zero."""
#     return numerator / denominator if denominator != 0 else 0

# def compare_json(labeled_data, extracted_data):
#     """
#     Compare labeled and extracted JSON data for correctness.

#     TP: Correct value extracted by LLM.
#     FN: LLM didn't extract this variable.
#     FP: LLM extracted a value, but it was incorrect.
#     """
    
#     numerical_variables = ["control_pce", "treated_pce", "control_voc", "treated_voc"]
#     text_variables = ["structure_pin_nip", "passivating_molecule", "perovskite_composition", 
#                       "electron_transport_layer", "hole_transport_layer"]
    
#     # Initialize comparison dictionaries
#     numerical_dict = {var: {"TP": 0, "FP": 0, "FN": 0} for var in numerical_variables}
#     text_dict = {var: {"TP": 0, "FP": 0, "FN": 0} for var in text_variables}
#     stability_dict = {
#         "ISOS-D": {"TP": 0, "FP": 0, "FN": 0},
#         "ISOS-L": {"TP": 0, "FP": 0, "FN": 0},
#         "ISOS-T": {"TP": 0, "FP": 0, "FN": 0}
#     }

#     for key, label_value in labeled_data.items():
#         if key not in extracted_data:
#             print(f"Extraction was not performed. Paper num: {key}")
#             continue
        
#         extracted_value = extracted_data[key]

#         for id, label in label_value.items():
#             # Handle numerical values
#             if id in numerical_variables:
#                 if isinstance(label, str) and label.replace(".", "").isdigit():
#                     result = numerical_comparison(id, label_value, extracted_value)
#                     numerical_dict[id][result] += 1

#             # Handle text values
#             elif id in text_variables:
#                 if isinstance(label, str):
#                     result = text_comparison(id, label_value, extracted_value)
#                     text_dict[id][result] += 1

#             # Handle stability tests
#             elif "ISOS" in id:
#                 result = stability_comparison(id, label_value, extracted_value)
#                 stability_type = f"ISOS-{id[4]}"  # Extract stability type
#                 if stability_type in stability_dict:
#                     stability_dict[stability_type][result] += 1

#     # Merge all results
#     combined_dict = {**numerical_dict, **text_dict, **stability_dict}
#     print("Performance for each variable in dictionary:", combined_dict)

#     # Compute precision, recall, and F1-score
#     variable_list, precision_list, recall_list, f1_list = [], [], [], []
#     for variable, performance in combined_dict.items():
#         TP, FP, FN = performance["TP"], performance["FP"], performance["FN"]
        
#         precision = safe_division(TP, TP + FP)
#         recall = safe_division(TP, TP + FN)
#         f1 = safe_division(2 * precision * recall, precision + recall)

#         variable_list.append(variable)
#         precision_list.append(precision)
#         recall_list.append(recall)
#         f1_list.append(f1)

#     return variable_list, precision_list, recall_list, f1_list

In [111]:
from difflib import SequenceMatcher

def text_comparison(id, label_annotation, extraction_annotation, text_similarity_threshold=0.8):
    """Compares text values using string similarity matching."""
    # Handle special case for structure_pin_nip
    key_to_check = "pin_nip_structure" if id == "structure_pin_nip" else id

    # If the key is missing in the extracted annotation, return False Negative
    if (label_annotation[id]!=None) & (extraction_annotation[key_to_check]==None):
        # print(f"FN, {label_annotation[id]}, {extraction_annotation[key_to_check]}")
        return "FN"
    elif (label_annotation[id]==None) & (extraction_annotation[key_to_check]!=None):
        # print(f"TN, {label_annotation[id]}, {extraction_annotation[key_to_check]}")
        return "TN"

    label_data = label_annotation.get(id, "")
    extract_data = extraction_annotation.get(key_to_check, "")

    # Convert lists to strings if necessary
    if isinstance(label_data, list):
        label_data = " ".join(map(str, label_data))  # Convert list to string
    if isinstance(extract_data, list):
        extract_data = " ".join(map(str, extract_data))  # Convert list to string

    # Ensure values are strings
    if not isinstance(label_data, str) or not isinstance(extract_data, str):
        # print(f"FP, {label_annotation[id]}, {extraction_annotation[key_to_check]}")
        return "FP"  # If data is still not a string, return False Positive

    # Compute similarity score
    similarity = SequenceMatcher(None, label_data.lower(), extract_data.lower()).ratio()

    if similarity > text_similarity_threshold:
        # print(f"TP, {label_annotation[id]}, {extraction_annotation[key_to_check]}")
        return 'TP'
    else:
        return "FP"


In [121]:
def compare_json(df):
    """
    Compare labeled and extracted JSON data for correctness.

    TP: Correct value extracted by LLM.
    FN: LLM didn't extract this variable.
    FP: LLM extracted a value, but it was incorrect.
    TN: LLM halucinated and returned value that was not extracted
    """
    
    text_variables = ['perovskite_composition', 'electron_transport_layer', 'hole_transport_layer', 'structure_pin_nip']

    
    stability_entity_annotated = ['stability_type', 'temperature', 'time', 'humidity', 'passivating_molecule', 'efficiency_cont', 'efficiency_tret', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc']
    stability_entity_extracted = ['test_name', 'temperature', 'time', 'humidity', 'passivating_molecule','control_efficiency', 'treatment_efficiency', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc']
    
    # Initialize comparison dictionaries
    text_dict = {var: {"TP": 0, "FP": 0, "FN": 0, "TN": 0} for var in text_variables}
    stability_dict = {var: {"TP": 0, "FP": 0, "FN": 0, "TN": 0} for var in stability_entity_annotated}

    # for id, label in extracted_value.items():
    #     if 'test' in id:
    #         stability_dict[id] = {"TP": 0, "FP": 0, "FN": 0}

    # stability_entity_annotated = ['stability_type', 'temperature', 'time', 'humidity', 'passivating_molecule', 'efficiency_cont', 'efficiency_tret', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc']
    # stability_entity_extracted = ['test_name', 'temperature', 'time', 'humidity', 'passivating_molecule','control_efficiency', 'treatment_efficiency', 'control_pce', 'treated_pce', 'control_voc', 'treated_voc']

    for row in df.itertuples():       
        label_value = row.annotation
        extracted_value = row.extracted

        # print(label_value)
        # print(extracted_value)

        for id, label in label_value.items():
            if ('test' in id) and (isinstance(label_value[id], dict)):
                ##Plan for stability test
                '''
                For each stability condition in annotation, 
                    Create a list(match_dictionary) where each element will be dictionary
                    Pair them with stability condition in extracted
                        In nested function, calculate the similarity dictionary {stabilitytype: TP......}
                        match_dictionary.append(dictionary returned from the function)
                    
                    Across element in match_dictionary, find the one that has the highest match (implementation brainstorming atm)

                    These highest match value will be increamented in stability_dict
                '''
                matched = 0
                for extract_id, extract_label in extracted_value.items():
                    if ('test' in extract_id) and (isinstance(extracted_value[extract_id], dict)):
                        matched += 1
                        # print(label)
                        # print(extract_label)

                if matched == 0:
                    #No stability were extracted, we will add stability_unmatched
                        ##We need to account for if there was NO stability extracted. 
                    for key in stability_dict:
                        if 'FN' in stability_dict[key]:
                            stability_dict[key]['FN'] += 1

                
            else:  
                result = text_comparison(id, label_value, extracted_value)
                text_dict[id][result] += 1


                
                

            # # Handle numerical values
            # if id in numerical_variables:
            #     if isinstance(label, str) and label.replace(".", "").isdigit():
            #         result = numerical_comparison(id, label_value, extracted_value)
            #         numerical_dict[id][result] += 1

    #         # Handle text values
    #         elif id in text_variables:
    #             if isinstance(label, str):
    #                 result = text_comparison(id, label_value, extracted_value)
    #                 text_dict[id][result] += 1

    #         # Handle stability tests
    #         elif "ISOS" in id:
    #             result = stability_comparison(id, label_value, extracted_value)
    #             stability_type = f"ISOS-{id[4]}"  # Extract stability type
    #             if stability_type in stability_dict:
    #                 stability_dict[stability_type][result] += 1

    # # Merge all results
    # combined_dict = {**numerical_dict, **text_dict, **stability_dict}
    # print("Performance for each variable in dictionary:", combined_dict)

    # # Compute precision, recall, and F1-score
    # variable_list, precision_list, recall_list, f1_list = [], [], [], []
    # for variable, performance in combined_dict.items():
    #     TP, FP, FN = performance["TP"], performance["FP"], performance["FN"]
        
    #     precision = safe_division(TP, TP + FP)
    #     recall = safe_division(TP, TP + FN)
    #     f1 = safe_division(2 * precision * recall, precision + recall)

    #     variable_list.append(variable)
    #     precision_list.append(precision)
    #     recall_list.append(recall)
    #     f1_list.append(f1)

    # return variable_list, precision_list, recall_list, f1_list

In [122]:
text_dict = compare_json(evaluate_df)
text_dict

{'stability_type': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'temperature': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'time': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'humidity': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'passivating_molecule': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'efficiency_cont': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'efficiency_tret': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'control_pce': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'treated_pce': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'control_voc': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}, 'treated_voc': {'TP': 0, 'FP': 0, 'FN': 0, 'TN': 0}}
{'stability_type': {'TP': 0, 'FP': 0, 'FN': 1, 'TN': 0}, 'temperature': {'TP': 0, 'FP': 0, 'FN': 1, 'TN': 0}, 'time': {'TP': 0, 'FP': 0, 'FN': 1, 'TN': 0}, 'humidity': {'TP': 0, 'FP': 0, 'FN': 1, 'TN': 0}, 'passivating_molecule': {'TP': 0, 'FP': 0, 'FN': 1, 'TN': 0}, 'efficiency_cont': {'TP': 0, 'FP': 0, 'FN': 1, 'TN': 0}, 'efficiency_tret': {'TP': 0, 'FP': 0, 'FN': 1, 'TN': 0}, 'control_pce': {'TP': 0,

In [114]:
variables, precisions, recalls, f1s = compare_json(evaluate_df)

set()
set()
set()
set()
set()
set()
set()
set()
set()
set()
set()
set()


TypeError: cannot unpack non-iterable NoneType object

## Calculate Macro f1 score

In [589]:
def macro_f1(f1_list, weight = None):
    if weight == None:
        #If no weight given, do unweighted average of f1 score
        return sum(f1_list) / len(f1_list)
    total_f1 = 0
    for i in range(len(f1_list)):
        total_f1 += (f1_list[i] * weight[i])
    return total_f1 / sum(weight)
    


In [590]:
## The macro f1 score unweighted
macro_f1(f1s)

0.599084595959596

### This concludes the pipeline of evaluating extraction quality

# Todo:
- stability evaluation
    - Need to reiterate on the teamtat annotation

- Putting weights on F1 Score

## Stability annotation brainstorm

- We will have as at most 5 realation (if there is an overlap, choose the stability test that offers more info)
    - Exp there is ISOS L1 and ISOS L2. If there is more iformation in IsosL2, use that as a test and compare on ISOSL that llama extracted (if any)
- We will iterate through different test recorded on annotation and compared it with suitable stability extracted on Llama