In [None]:
import pandas as pd
import os
import httpx
import json

from openai import OpenAI

openai_api_key = "openai-key"

client = OpenAI(
    api_key=openai_api_key,
    http_client=httpx.Client(proxies=proxies)
)

In [None]:
import requests

def open_ai_chat_completion(messages, model="gpt-4o-mini"):
    url = 'https://api.openai.com/v1/chat/completions'
    headers = {'content-type': 'application/json', "Authorization": "Bearer f{openai_api_key}"}
    data = {
        "model": model,
        "messages": messages
    }
    response = requests.post(url, headers=headers, data=json.dumps(data))

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code} - {response.text}")    

In [None]:
from src.utils import Generator
generator = Generator()

# Create the prompts:

1) generate the prompt for classification
2) generate the prompt for relation classification
3) generate the prompt for multlabel

In [None]:
import json

# Open the JSON file
with open(os.path.join("llm", "mappings", "task_description.json"), 'r', encoding='utf-8') as file:
    task_descriptions = json.load(file)

# Open the JSON file
with open(os.path.join("llm", "mappings", "label_annotation.json"), 'r', encoding='utf-8') as file:
    label_readable_mapping = json.load(file)

# task_descriptions['climatext_wiki'] = task_descriptions['climatext']
# task_descriptions['climatext_10k'] = task_descriptions['climatext']
# task_descriptions['climatext_claim'] = task_descriptions['climatext']

In [None]:
template="""Prompt Template:
[STARTPROMPT]
You are a text classifier. Your task is to label the provided input based on the criteria outlined below.

Task: Describe the classification task clearly here (e.g., determine if a paragraph is [specific] or [non-specific]).

Labels and Definitions:

    [Label 1]: Include a definition for this label.
    [Label 2]: Include a definition for this label.
    [Additional Labels]: Include definitions as necessary.

Formatting: Your answer should be formatted in this way to ensure consistency:
```
Label: [Insert Label]
Explanation: [Provide a brief, clear explanation justifying the label chosen]
```

Example of well-formatted answer:
```
Label: [specific]
Explanation: The paragraph provides specific details about a project that the company plans to implement, including measurable goals.
```

Input text: [[Insert Text Here]]
[ENDPROMPT]"""

In [None]:
import json
import os

def save_dict(dict_data, filename):
    """
    Save the dictionary to a JSON file, appending only new keys. Warn if a key is already present and not saved.
    
    Parameters:
    dict_data (dict): The dictionary to save.
    filename (str): The name of the file where the dictionary will be saved.
    """
    # Check if file exists, load its content if it does
    if os.path.exists(filename):
        with open(filename, 'r') as file:
            try:
                existing_data = json.load(file)
            except json.JSONDecodeError:
                existing_data = {}
    else:
        existing_data = {}

    # Prepare the data to save
    new_data = {}
    for key, value in dict_data.items():
        if key in existing_data:
            print(f"Warning: Key '{key}' already exists, not saving it.")
        else:
            new_data[key] = value

    # If there are new keys, update the file
    if new_data:
        existing_data.update(new_data)
        with open(filename, 'w') as file:
            json.dump(existing_data, file, indent=4)
        print(f"New keys saved: {list(new_data.keys())}")
    else:
        print("No new keys to save.")

In [None]:
prompts = dict() 

# classification: make sure to use the dataset from ClimaINS_ours
for dataset_name in set(generator.dataset_builder.datasets.keys())-{'ClimaINS_ours'}:
    print(dataset_name)
    description = task_descriptions[dataset_name]['description']
    prompt = "description: \n" + description + "\n\n"
    prompt += "Labels: \n"
    
    for label in task_descriptions[dataset_name]['labels'].keys():
        
        if dataset_name in label_readable_mapping.keys():
            prompt += "- ["+label_readable_mapping[dataset_name]['labels'][str(label)] + "]: "
        else:
            prompt += "- ["+label + "]: "
        
        prompt += task_descriptions[dataset_name]['labels'][str(label)] + "\n"
        
    response = open_ai_chat_completion(
          model="gpt-4o",
          messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "Could you create a prompt for LLMs (GPT-4 or Llama 3) to make them behave as a Classifier on the following task : \n {} \n\n Make sure to add formatting instruction so the output can easily be parsed and give an example of well formatted answer. Juste give me the prompt. It should be a prompt for one prediction only (not multiple predictions). For the input sentence add this placeholder: [[Insert Text Here]]. Start the prompt with [STARTPROMPT] and end it with [ENDPROMPT]\n\n Here is a template that you should follow:\n{}".format(prompt, template)}
          ])
    
    prompts[dataset_name] = response['choices'][0]['message']['content']

save_dict(prompts, "llm/prompts_climatext.json")

In [None]:
prompts = dict() 

# relation classification:
for dataset_name in ['climateFEVER_evidence', 'climaQA', 'lobbymap_stance']:
    print(dataset_name)
    description = task_descriptions[dataset_name]['description']
    prompt = "description: \n" + description + "\n\n"
    prompt += "Labels: \n"
    
    for label in task_descriptions[dataset_name]['labels'].keys():
        
        if dataset_name in label_readable_mapping.keys():
            prompt += "- ["+label_readable_mapping[dataset_name]['labels'][str(label)] + "]: "
        else:
            prompt += "- ["+label + "]: "
        
        prompt += task_descriptions[dataset_name]['labels'][str(label)] + "\n"
    
    response = client.chat.completions.create(
          model="gpt-4o",
          messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "Could you create a prompt for LLMs (GPT-4 or Llama 3) to make them behave as a Classifier on the following task : \n {} \n\n Make sure to add formatting instruction so the output can easily be parsed and give an example of well formatted answer. Juste give me the prompt. It should be a prompt for one prediction only (not multiple predictions). The tasks rely on the relation between 2 texts: a text and a query. For the input sentence add this placeholder: [[Insert Text Here]] and [[Insert Query Here]]. Start the prompt with [STARTPROMPT] and end it with [ENDPROMPT]\n\n Here is a template that you should follow:\n{}".format(prompt, template)}
          ]
        )
    
    prompts[dataset_name] = response.choices[0].message.content

save_dict(prompts, "llm/prompts.json")

In [None]:
prompts = dict() 

# multilabel classification:
for dataset_name in generator.dataset_builder.multilabel_datasets.keys():
    print(dataset_name)

    # prepare the prompt zero-shot first
    # system_prompt = "You are an annotator for NLP tasks related to climate-change. You will be provided with the description of a tasks. Please follow the instructions."
    description = task_descriptions[dataset_name]['description']
    prompt = "description: \n" + description + "\n\n"
    prompt += "Labels: \n"
    
    for label in task_descriptions[dataset_name]['labels'].keys():
        
        if dataset_name in label_readable_mapping.keys():
            prompt += "- ["+label_readable_mapping[dataset_name]['labels'][str(label)] + "]: "
        else:
            prompt += "- ["+label + "]: "
        
        prompt += task_descriptions[dataset_name]['labels'][str(label)] + "\n"
        
    response = client.chat.completions.create(
          model="gpt-4o",
          messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "Could you create a prompt for LLMs (GPT-4 or Llama 3) to make them behave as a Classifier on the following task : \n {} \n\n Make sure to add formatting instruction so the output can easily be parsed and give an example of well formatted answer. Juste give me the prompt. The task is multilabel. For the input sentence add this placeholder: [[Insert Text Here]]. Start the prompt with [STARTPROMPT] and end it with [ENDPROMPT].\n\n Here is a template that you should follow {}".format(prompt, template)}
          ]
        )
    
    prompts[dataset_name] = response.choices[0].message.content

save_dict(prompts, "llm/prompts.json")

## Create the cot version (inversion between label and explanation):

In [None]:
from zero_shot import load_dict
import re

def invert_labels_explanations(text):
    pattern = r"(Label:.*?\nExplanation:.*?)\n"
    def swap_label_explanation(match):
        label_explanation = match.group(1).split("\n")
        if len(label_explanation) == 2:
            label = label_explanation[0]
            explanation = label_explanation[1]
            return f"{explanation}\n{label}\n"
        return match.group(1)  # Return original if it does not match the expected format
    
    swapped_text = re.sub(pattern, swap_label_explanation, text)
    
    return swapped_text


prompts = load_dict("llm/prompts.json")
cot_prompts = prompts.copy()

for k in cot_prompts.keys():
    cot_prompts[k] = invert_labels_explanations(cot_prompts[k])

save_dict(cot_prompts, "llm/prompts_cot.json")

# Set-up

In [None]:
def parse_dataset_name(dataset_name):
    if dataset_name == "climateFEVER_claim":
        return "climateFEVER claim (our split)"
    elif dataset_name == "climateFEVER_claim_agg":
        return "climateFEVER claim (ours split, aggregated)"
    elif dataset_name == "climateFEVER_claim_climabench_agg":
        return "climateFEVER claim (climabench split, aggregated)"
    elif dataset_name == "climateFEVER_evidence_ours":
        return "climateFEVER evidence (our split)"
    elif dataset_name == "climateFEVER_evidence_climabench":
        return "climateFEVER evidence (climabench split)"
    elif dataset_name == "netzero_reduction_duplicated":
        return "Net-Zero/Reduction (with duplicates)"
    elif dataset_name == "netzero_reduction":
        return "Net-Zero/Reduction"
    elif dataset_name == "ClimaINS_ours":
        return "ClimaINS (our split)"
    elif dataset_name == "climateFEVER_evidence":
        return "climateFEVER evidence"
    elif dataset_name == "climateBUG_data":
        return "climateBUG-data"
    elif dataset_name == "climate_commitments_actions":
        return "Commitments And Actions"
    elif dataset_name == "climate_detection":
        return "ClimateBERT's Climate detection"
    elif dataset_name == "climate_specificity":
        return "Climate Specificity"
    elif dataset_name == "climate_sentiment":
        return "climate sentiment"
    elif dataset_name == "climate_tcfd_recommendations":
        return "Climate TCFD recommendations"
    elif dataset_name == "esgbert_category_water":
        return "esgbert Water"
    elif dataset_name == "esgbert_category_forest":
        return "esgbert Forest"
    elif dataset_name == "esgbert_category_biodiversity":
        return "esgbert Biodiversity"
    elif dataset_name == "esgbert_category_nature":
        return "esgbert Nature"
    elif dataset_name == "environmental_claims":
        return "Environmental Claims"
    elif dataset_name == "green_claims":
        return "Green Claims"
    elif dataset_name == "green_claims_3":
        return "Implicit/Explicit Green Claims"
    elif dataset_name == "contrarian_claims":
        return "CC-Contrarian Claims"
    elif dataset_name == "esgbert_e":
        return "ESGBERT E"
    elif dataset_name == "esgbert_s":
        return "ESGBERT S"
    elif dataset_name == "esgbert_g":
        return "ESGBERT G"
    elif dataset_name == "gw_stance_detection":
        return "Global-Warming Stance (GWSD)"
    elif dataset_name == "sustainable_signals_review":
        return "SUSTAINABLESIGNALS reviews"
    elif dataset_name == "lobbymap_stance":
        return "LobbyMap (Stance)"
    elif dataset_name == "lobbymap_query":
        return "LobbyMap (Query)"
    elif dataset_name == "lobbymap_pages":
        return "LobbyMap (Pages)"
    elif dataset_name == "lobbymap_query_p":
        return "LobbyMap (Page)"
    elif dataset_name == "lobbymap_query_stance":
        return "LobbyMap (Stance)"
    elif dataset_name == "climatext":
        return "climatext (Wiki-doc)"
    elif dataset_name == "climatext_wiki":
        return "climatext (Wikipedia)"
    elif dataset_name == "climatext_10k":
        return "climatext (10k)"
    elif dataset_name == "climatext_claim":
        return "climatext (claim)"
    elif dataset_name == "esgbert_action500":
        return "ESGBERT action500"
    else:
        return dataset_name

In [None]:
# Test set to use as a whole :
copy_full_test_set = [
    'netzero_reduction',
    'climate_specificity',
    'climate_sentiment',
    'climate_commitments_actions',
    'climate_detection',
    'climate_tcfd_recommendations',
    'environmental_claims',
    'climateFEVER_claim',
    'sustainable_signals_review',
    'esgbert_e',
    'esgbert_s',
    'esgbert_g',
    'esgbert_action500',
    'esgbert_category_water',
    'esgbert_category_forest',
    'esgbert_category_biodiversity',
    'esgbert_category_nature',
    'green_claims',
    'green_claims_3',
    'climateStance',
    'climateEng',
    'gw_stance_detection',
    'climateFEVER_evidence'
    'logicClimate',
    'ClimaINS_ours',
]

not_done = ['ClimaINS', "climateFEVER_evidence_climabench"]

# Test set to limit to 1000 samples:
large_dataset_to_downsample = [
    'sciDCC',
    'contrarian_claims',
    'lobbymap_stance',
    'lobbymap_query',
    'lobbymap_pages',
    'climaQA',
    'climatext',
    'ClimaTOPIC',
    'climateBUG_data'
]

In [None]:
use_cot = True
use_gpt4 = False

In [None]:
import os
import json
import re

from zero_shot import load_dict, extract_prompt, update_question, map_lobbymap_stance, prepare_content


# Open the JSON file
with open(os.path.join("llm", "mappings", "task_description.json"), 'r', encoding='utf-8') as file:
    task_descriptions = json.load(file)

# Open the JSON file
with open(os.path.join("llm", "mappings", "label_annotation.json"), 'r', encoding='utf-8') as file:
    label_readable_mapping = json.load(file)

prompts = load_dict("llm/prompts_cot.json") if use_cot else load_dict("llm/prompts.json")

In [None]:
import re
def check_prompt_format(prompt):
    # Define a combined regex pattern for Label followed by Explanation
    combined_pattern = r"Explanation:\s*.*\nLabel:\s*\[.*?\]"

    # Search for the combined pattern in the prompt
    combined_match = re.search(combined_pattern, prompt)

    # Check if both Label and Explanation exist in the correct format
    if combined_match:
        return True
    else:
        return False
    
for dataset_name in prompts.keys():    
    if not check_prompt_format(prompts[dataset_name]):
        print("#"*15)
        print(dataset_name, "Wrongly formatted '''label: explaination:''' instruct")
        print("#"*15)
        print(prompts[dataset_name])
        print("#"*15, "\n\n")
    elif "[[Insert Text Here]]" not in prompts[dataset_name]:
        print("#"*15)
        print(dataset_name, "Missing [[Insert Text Here]]")
        print("#"*15)
        print(prompts[dataset_name])
        print("#"*15, "\n\n")
    elif ("[[Insert Query Here]]" not in prompts[dataset_name]) and (dataset_name in ['climateFEVER_evidence', 'climaQA', 'lobbymap_stance']):
        print("#"*15)
        print(dataset_name, "Missing [[Insert Query Here]]")
        print("#"*15)
        print(prompts[dataset_name])
        print("#"*15, "\n\n")

### initialize function

In [None]:
def files_create(file_name, api_key):
  url = 'https://api.openai.com/v1/files'
  headers = {
      'Authorization': f'Bearer {api_key}'
  }
  data = {
      'purpose': 'batch'
  }

  # Open the file and send the request
  with open(file_name, 'rb') as f:
      files = {'file': (file_name, f)}
      response = requests.post(url, headers=headers, files=files, data=data)

  # Check for a successful response
  if response.status_code == 200:
      print("File uploaded successfully!")
      return response.json()
  else:
      print(f"Error: {response.status_code} - {response.text}")

In [None]:
def batches_create(file_id, api_key):
    # Define the API endpoint for creating a batch job
    url = 'https://api.openai.com/v1/batches'

    # Define the headers
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Content-Type': 'application/json'
    }

    # Define the data for the batch job
    data = {
        "input_file_id": file_id,  # Replace with your actual file ID
        "endpoint": "/v1/chat/completions",
        "completion_window": "24h"
    }

    # Send the POST request
    response = requests.post(url, headers=headers, data=json.dumps(data))

    # Check for a successful response
    if response.status_code == 200:
        print("Batch job created successfully!")
        return response.json()
    else:
        print(f"Error: {response.status_code} - {response.text}")


In [None]:
def batches_retrieve(batch_id, api_key):
    url = f'https://api.openai.com/v1/batches/{batch_id}'

    # Set up the headers with your API key
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Content-Type': 'application/json'
    }

    # Make the GET request
    response = requests.get(url, headers=headers)

    # Check for a successful response
    if response.status_code == 200:
        print("Batch details retrieved successfully!")
        return response.json()  # Return the response data
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

In [None]:
import requests

def download_file_content(file_id, api_key, output_file):
    # Define the API endpoint for retrieving the file content
    url = f'https://api.openai.com/v1/files/{file_id}/content'

    # Set up the headers with your API key
    headers = {
        'Authorization': f'Bearer {api_key}'
    }

    # Make the GET request
    response = requests.get(url, headers=headers, stream=True)

    # Check for a successful response
    if response.status_code == 200:
        # Open the output file in write mode and save the content
        with open(output_file, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"File content saved to {output_file}")
    else:
        print(f"Error: {response.status_code} - {response.text}")

## Launch batch

In [None]:
dataset_to_run = ['climatext_10k']

In [None]:
batch_jobs = dict()

for dataset_name in dataset_to_run:
    print(dataset_name)
    
    # read dataset
    if dataset_name == "ClimaINS.pkl":
        continue
    
    if dataset_name in ["lobbymap_stance", "lobbymap_query"]:
        prompts[dataset_name+"_origin"] = prompts[dataset_name]
        dataset_name = dataset_name+"_origin"

    if dataset_name in done:
        print("pass")
        continue
    
    test = pd.read_parquet(os.path.join("data", "llm_green_nlp_tasks", f"{dataset_name}.pkl"))

    if "clean_text" not in test.columns:
        test['clean_text'] = test['text']
        
    tasks = []
    
    for index, row in test.iterrows():        
        task = {
            "custom_id": f"task-{index}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o" if use_gpt4 else "gpt-4o-mini",
                "temperature": 0.1,
                "seed": 42,
                "messages": [
                    {
                        "role": "system",
                        "content": "You are an AI annotator for NLP tasks related to climate-change. You will be provided with the description of a tasks. Please follow the instructions."
                    },
                    {
                        "role": "user",
                        "content": prepare_content(row, dataset_name, task_descriptions, prompts)
                    }
                ],
            }
        }
        
        tasks.append(task)
   
    file_name = f"llm/tasks/{dataset_name}.json"
    
    with open(file_name, 'w') as file:
        for obj in tasks:
            file.write(json.dumps(obj) + '\n')
            
    batch_file = files_create(file_name=file_name, api_key=openai_api_key)
    batch_job = batches_create(file_id=batch_file['id'], api_key=openai_api_key)
    batch_jobs[dataset_name] = batch_job

In [None]:
import json

with open("batch.json", "w") as json_file:
    json.dump(batch_jobs, json_file, indent=4)

In [None]:
# ### LOBBYMAP (one shot)

# test = pd.read_csv('llm/dataset/lobbymap.csv', sep="\t")
# dataset_name = "lobbymap"

# batch_jobs = dict()

# tasks = []

# for index, row in test.iterrows():        
#     task = {
#         "custom_id": f"task-{index}",
#         "method": "POST",
#         "url": "/v1/chat/completions",
#         "body": {
#             # This is what you would have in your Chat Completions API call
#             "model": "gpt-4o-mini",
#             "temperature": 0.1,
#             "seed": 42,
#             "messages": [
#                 {
#                     "role": "system",
#                     "content": "You are an AI annotator for NLP tasks related to climate-change. You will be provided with the description of a tasks. Please follow the instructions."
#                 },
#                 {
#                     "role": "user",
#                     "content": prepare_content(row, dataset_name, task_descriptions, prompts)
#                 }
#             ],
#         }
#     }
    
#     tasks.append(task)

# # Creating the file

# # file_name = f"llm/tasks/{dataset_name}.json"

# # with open(file_name, 'w') as file:
# #     for obj in tasks:
# #         file.write(json.dumps(obj) + '\n')
    
# # batch_file = client.files.create(
# #     file=open(file_name, "rb"),
# #     purpose="batch"
# #     )

# # batch_job = client.batches.create(
# #     input_file_id=batch_file.id,
# #     endpoint="/v1/chat/completions",
# #     completion_window="24h"
# #     )

# # batch_jobs[dataset_name] = batch_job

In [None]:
for dataset_name in batch_jobs.keys():
    print(dataset_name, "-", batches_retrieve(batch_jobs[dataset_name]['id'], api_key=openai_api_key)['status'])

In [None]:
for dataset_name in batch_jobs.keys():
    batch_job = batches_retrieve(batch_jobs[dataset_name]['id'], api_key=openai_api_key)
    if batch_job['status'] == "completed":
        
        result_file_id = batch_job['output_file_id']
        # result_file_name = 
        # result_file_name = f"llm/outputs/gpt-4o/{dataset_name}.jsonl" if use_gpt4 else f"llm/outputs/{dataset_name}.jsonl"
        result_file_name = f"llm/outputs/cot_full/{dataset_name}.jsonl" if use_cot else f"llm/outputs/full/{dataset_name}.jsonl" 

        download_file_content(result_file_id, openai_api_key, result_file_name)

## Compute performance

In [None]:
# done = [dataset_name for dataset_name in batch_jobs.keys()]
done = ['climatext_10k']

In [None]:
import re

def parse_label_explanation(text):
    try:
        # Regular expression to extract Label and Explanation
        label_pattern = r'Label:\s*(.*)'
        explanation_pattern = r'Explanation:\s*(.*)'

        # Find the label
        label_match = re.search(label_pattern, text)
        label = label_match.group(1) if label_match else None

        # Find the explanation
        explanation_match = re.search(explanation_pattern, text, re.DOTALL)
        explanation = explanation_match.group(1).strip() if explanation_match else None
        
        label = label.replace('[', "").replace(']', "").strip()

        label = label.replace('Climate solutions won’t work, Climate policies are harmful', "Climate solutions won’t work, Climate policies (mitigation or adaptation) are harmful")
        label = label.replace('Climate solutions won’t work, One country is negligible', 'Climate solutions won’t work, Climate policies are ineffective/flawed')

    except Exception as e:
        label = None
        explanation = e

    return label, explanation

In [None]:
from sklearn.metrics import classification_report
from src.logger import bootstrap_confidence_interval
import numpy as np

In [None]:
label_readable_mapping['lobbymap_pages'] = {'labels': 
                                                {
                                                    '1': 'The page contains one or more evidence about the stance of the company regarding any of the policy mentioned above',
                                                    '0': 'The page does not contain evidence about the stance of the company regarding any of the policy'
                                                }
                                            }   

In [None]:
if use_cot:
    perf_file_path = "experiment_results/performances/performances_cot_llm.csv"
else:
    perf_file_path = "experiment_results/performances/performances_llm.csv"

In [None]:
from ast import literal_eval
from sklearn.preprocessing import MultiLabelBinarizer

model_type = "gpt-4o" if use_gpt4 else "gpt-4o-mini"
performance_type = "f1_score"

if os.path.exists(perf_file_path):
    performances = pd.read_csv(perf_file_path)
else:
    performances = pd.DataFrame()

# set(prompts.keys())-{"lobbymap_query", "logicClimate"}
for dataset_name in batch_jobs.keys():#dataset_to_run:
    
    # Loading data from saved file
    results = []
    #result_file_name = f"llm/outputs/gpt-4o/{dataset_name}.jsonl" if use_gpt4 else f"llm/outputs/{dataset_name}.jsonl"
    result_file_name = f"llm/outputs/cot_full/{dataset_name}.jsonl" if use_cot else f"llm/outputs/full/{dataset_name}.jsonl"

    with open(result_file_name, 'r') as file:
        for line in file:
            # Parsing the JSON string into a dict and appending to the list of results
            json_object = json.loads(line.strip())
            results.append(json_object["response"]['body']["choices"][0]['message']['content'])
    
    labels = []
    explainations = []
    
    for result in results:
        label, explanation = parse_label_explanation(result)
        labels += [label]
        explainations += [explanation]
    
    # test = pd.read_parquet(os.path.join("doccano", "random", "parquet", f"{dataset_name}.pkl"))
    test = pd.read_parquet(os.path.join("data", "llm_green_nlp_tasks", f"{dataset_name}.pkl"))
        
    if dataset_name == "lobbymap_pages":
        test['label'] = 1 * test['label']

    test['gpt-4o-mini_label'] = labels
    test['gpt-4o-mini_explanation'] = explainations

    if dataset_name == "lobbymap_query_origin":
        test = test[test['query'].astype(str) != "[None]"]
        dataset_name = "lobbymap_query"
    if dataset_name == "lobbymap_stance_origin":
        test = test[test['query'].astype(str) != "None"]
        test.rename(columns={'stance':'label'}, inplace=True)
        dataset_name = "lobbymap_stance"
    
    print(dataset_name)
    
    if dataset_name in ["logicClimate", "lobbymap_query"]:
        if dataset_name == "logicClimate":
            y_true = test['label'].apply(literal_eval)
            y_pred = test['gpt-4o-mini_label'].apply(lambda x: x.split(","))
        elif dataset_name == "lobbymap_query":
            y_true = test['query'].apply(lambda x: [map_lobbymap_stance[e] for e in x])
            y_pred = test['gpt-4o-mini_label'].apply(lambda x: [e.strip() for e in x.split(",")])   
        
        # Initialize the MultiLabelBinarizer
        mlb = MultiLabelBinarizer()
        
        # Fit the binarizer and transform the labels
        y_true_binarized = mlb.fit_transform(y_true)
        y_pred_binarized = mlb.transform(y_pred)
        
        report = classification_report(
                    y_pred=y_pred_binarized, 
                    y_true=y_true_binarized,
                    target_names=mlb.classes_,
                    zero_division=0.0,
                    output_dict=True 
                )
        
        f1_lower, f1_upper = bootstrap_confidence_interval(y_pred=y_pred_binarized, y_true=y_true_binarized, num_bootstrap_samples=100)
        print(report['macro avg']['f1-score'], f1_lower, f1_upper)
        
    else:
        if dataset_name in label_readable_mapping:
            label2id = {v.lower(): k for k, v in label_readable_mapping[dataset_name]['labels'].items()}

            if dataset_name == 'contrarian_claims':
                # Split the keys of label2id based on "," and create a new mapping
                new_label2id = {}
                for key, value in label2id.items():
                    key_parts = [part.strip().lower() for part in key.split(',')]
                    new_label2id[key_parts[-1]] = value

                # Function to map and replace labels in the dataframe
                def map_labels(label):
                    parts = [part.strip().lower() for part in label.split(',')]
                    return parts[-1]
                    
                test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].apply(map_labels)
                test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].str.lower().map(new_label2id)
            else:
                test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].str.lower().map(label2id)

        report = classification_report(
                y_pred=test['gpt-4o-mini_label'].astype(str), 
                y_true=test['label'].astype(str),
                zero_division=0.0,
                output_dict=True            
            )
        
        f1_lower, f1_upper = bootstrap_confidence_interval(y_pred=test['gpt-4o-mini_label'].astype(str), y_true=test['label'].astype(str), num_bootstrap_samples=1000)
        print(report['macro avg']['f1-score'], f1_lower, f1_upper)
        
    if ('samples avg' in report.keys()) and ('accuracy' not in report.keys()):
        report['accuracy'] = report['samples avg']['f1-score']
        
    new_row = pd.DataFrame({
        'dataset_name': [dataset_name],
        'model_type': [model_type],
        'performance': [report['macro avg']['f1-score']],
        'performance_type': [performance_type],
        'n_labels': [np.nan],
        'seed': [42],
        "f1_upper": [f1_upper],
        "f1_lower": [f1_lower],
        "n_epoch": [np.nan],
        "precision": [report['macro avg']['precision']],
        "recall": [report['macro avg']['recall']],
        "weighted_f1": [report['weighted avg']['f1-score']],
        "accuracy": [report['accuracy']]
    })
    performances = pd.concat([performances, new_row], ignore_index=True)

In [None]:
performances.to_csv(perf_file_path, index=False)

# Performance explorer per datasets: 

In [None]:
from ast import literal_eval

performances = pd.read_csv(perf_file_path)
performances.sort_values(by=['performance'], inplace=True)

In [None]:
def find_errors(dataset_name, mode="mini"):
        
    # Loading data from saved file
    results = []
    if mode == "gpt-4o":
        result_file_name = f"llm/outputs/gpt-4o/{dataset_name}.jsonl"
    elif mode == "cot":
        result_file_name = f"llm/outputs/cot_full/{dataset_name}.jsonl"
    else:
        result_file_name = f"llm/outputs/full/{dataset_name}.jsonl"
        # result_file_name = f"llm/outputs/{dataset_name}.jsonl"


    with open(result_file_name, 'r') as file:
        for line in file:
            # Parsing the JSON string into a dict and appending to the list of results
            json_object = json.loads(line.strip())
            results.append(json_object["response"]['body']["choices"][0]['message']['content'])
    
    labels = []
    explainations = []
    
    for result in results:
        label, explanation = parse_label_explanation(result)
        if type(label) == str:
            labels += [label.strip()]
        else:
            labels += [label]

        explainations += [explanation]
    
    #test = pd.read_parquet(os.path.join("doccano", "random", "parquet", f"{dataset_name}.pkl"))
    test = pd.read_parquet(os.path.join("data", "llm_green_nlp_tasks", f"{dataset_name}.pkl"))

    test['gpt-4o-mini_label'] = labels
    test['gpt-4o-mini_explanation'] = explainations
    
    if dataset_name in label_readable_mapping:
        label2id = {v.lower(): k for k, v in label_readable_mapping[dataset_name]['labels'].items()}

        if dataset_name == 'contrarian_claims':
            # Split the keys of label2id based on "," and create a new mapping
            new_label2id = {}
            for key, value in label2id.items():
                key_parts = [part.strip().lower() for part in key.split(',')]
                new_label2id[key_parts[-1]] = value

            # Function to map and replace labels in the dataframe
            def map_labels(label):
                parts = [part.strip().lower() for part in label.split(',')]
                return parts[-1]
                
            test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].apply(map_labels)
            test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].str.lower().map(new_label2id)
        else:
            test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].str.lower().map(label2id)
        
    return test

In [None]:
def get_raw_outputs(dataset_name):
        
    # Loading data from saved file
    results = []
    result_file_name = f"llm/outputs/full/{dataset_name}.jsonl"
    
    with open(result_file_name, 'r') as file:
        for line in file:
            # Parsing the JSON string into a dict and appending to the list of results
            json_object = json.loads(line.strip())
            results.append(json_object["response"]['body']["choices"][0]['message']['content'])
    
    labels = []
    explainations = []
    
    for result in results:
        label, explanation = parse_label_explanation(result)
        labels += [label]
        explainations += [explanation]
    
    test = pd.read_parquet(os.path.join("data", "llm_green_nlp_tasks", f"{dataset_name}.pkl"))
    
    test['gpt-4o-mini_label'] = labels
    test['gpt-4o-mini_explanation'] = explainations
    
    if dataset_name in label_readable_mapping:
        label2id = {v.lower(): k for k, v in label_readable_mapping[dataset_name]['labels'].items()}

        if dataset_name == 'contrarian_claims':
            # Split the keys of label2id based on "," and create a new mapping
            new_label2id = {}
            for key, value in label2id.items():
                key_parts = [part.strip().lower() for part in key.split(',')]
                new_label2id[key_parts[-1]] = value

            # Function to map and replace labels in the dataframe
            def map_labels(label):
                parts = [part.strip().lower() for part in label.split(',')]
                return parts[-1]
                
            test['unprocessed_gpt-4o-mini_label'] = test['gpt-4o-mini_label'].copy()
            test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].apply(map_labels)
            test['parsing'] = test['gpt-4o-mini_label'].str.lower().map(new_label2id)
        else:
            test['parsing'] = test['gpt-4o-mini_label'].str.lower().map(label2id)
            
        test['label_text'] = test['label'].map(label_readable_mapping[dataset_name]['labels'])

    return test, results

In [None]:
import json
import os

def create_jsonl_errors_file(test, dataset_name, negative_label, positive_label, n_sample=10):
    # False Negatives:
    if len(test[(test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)) & (test['gpt-4o-mini_label'].astype(str) == negative_label)]) < 10:
        false_negative = test[(test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)) & (test['gpt-4o-mini_label'].astype(str) == negative_label)]
    else:
        false_negative = test[(test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)) & (test['gpt-4o-mini_label'].astype(str) == negative_label)].sample(n_sample, random_state=42)

    false_negative = false_negative.copy()
    false_negative['comment'] = ""

    # False Positives:
    if len(test[(test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)) & (test['gpt-4o-mini_label'].astype(str) == positive_label)]) < 10:
        false_positive = test[(test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)) & (test['gpt-4o-mini_label'].astype(str) == positive_label)]
    else:
        false_positive = test[(test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)) & (test['gpt-4o-mini_label'].astype(str) == positive_label)].sample(n_sample, random_state=42)

    false_positive = false_positive.copy()
    false_positive['comment'] = ""

    # Add readable labels:
    if dataset_name in label_readable_mapping.keys():
        false_negative['label'] = false_negative['label'].astype(str).map(label_readable_mapping[dataset_name]['labels'])
        false_positive['label'] = false_positive['label'].astype(str).map(label_readable_mapping[dataset_name]['labels'])

    # Convert DataFrames to list of dictionaries
    false_negative_list = false_negative.to_dict(orient='records')
    false_positive_list = false_positive.to_dict(orient='records')

    # Define file paths
    false_negative_path = f'error_analysis/{dataset_name}_fn.json'
    false_positive_path = f'error_analysis/{dataset_name}_fp.json'

    # Function to save JSON file if it doesn't exist
    def save_pretty_json(data, file_path):
        if not os.path.exists(file_path):
            with open(file_path, 'w') as f:
                json.dump(data, f, indent=4)
            print(f"File '{file_path}' saved successfully.")
        else:
            print(f"File '{file_path}' already exists. Skipping save.")

    # Save pretty-printed JSON files if they do not exist
    save_pretty_json(false_negative_list, false_negative_path)
    save_pretty_json(false_positive_list, false_positive_path)

def get_statistics_erros(dataset_name):

    false_negative_path = f'error_analysis/{dataset_name}_fn.json'
    false_positive_path = f'error_analysis/{dataset_name}_fp.json'

    # Function to read and print JSON file
    def read_json(file_path):
        if os.path.exists(file_path):
            with open(file_path, 'r') as f:
                data = json.load(f)
            print(f"Contents of '{file_path}':")
            return data
        else:
            print(f"File '{file_path}' does not exist.")

    # Read and print false negatives
    false_negatives = read_json(false_negative_path)

    # Read and print false positives
    false_positive = read_json(false_positive_path)

    # False negatives
    print("False Negatives:")
    if len(false_negatives) > 0:
        labels = [ex['comment'].split(":")[0].split(",") for ex in false_negatives]
        flattened_labels = set([label.strip() for sublist in labels for label in sublist])

        for label in flattened_labels:
            count_label = 0
            for _labels in labels:
                if label in _labels:
                    count_label += 1
            print(label, count_label, len(labels), count_label/len(labels))

    # False positives
    print("False Positives:")
    if len(false_positive) > 0:
        labels = [ex['comment'].split(":")[0].split(",") for ex in false_positive]
        flattened_labels = set([label.strip() for sublist in labels for label in sublist])

        for label in flattened_labels:
            count_label = 0
            for _labels in labels:
                if label in _labels:
                    count_label += 1
            print(label, count_label, len(labels), count_label/len(labels))

In [None]:
import json
import os

def create_jsonl_errors_file_multilabel(test, dataset_name, n_sample=10, use_readable_labels=True):

    if 'Date' in test.columns:
        test = test.drop(columns=['Date'])
    
    # False Negatives:
    if len(test[(test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str))]) < 10:
        false_negative = test[(test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str))]
    else:
        false_negative = test[(test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str))].sample(n_sample, random_state=42)

    false_negative = false_negative.copy()
    false_negative['comment'] = ""

    # Add readable labels:
    if (dataset_name in label_readable_mapping.keys()) and (use_readable_labels):
        false_negative['label'] = false_negative['label'].astype(str).map(label_readable_mapping[dataset_name]['labels'])

    # Convert DataFrames to list of dictionaries
    false_negative_list = false_negative.to_dict(orient='records')

    # Define file paths
    false_negative_path = f'error_analysis/{dataset_name}_fn.json'

    # Function to save JSON file if it doesn't exist
    def save_pretty_json(data, file_path):
        if not os.path.exists(file_path):
            with open(file_path, 'w') as f:
                json.dump(data, f, indent=4)
            print(f"File '{file_path}' saved successfully.")
        else:
            print(f"File '{file_path}' already exists. Skipping save.")

    # Save pretty-printed JSON files if they do not exist
    save_pretty_json(false_negative_list, false_negative_path)

def get_statistics_errors_multi(dataset_name):

    false_negative_path = f'error_analysis/{dataset_name}_fn.json'

    # Function to read and print JSON file
    def read_json(file_path):
        if os.path.exists(file_path):
            with open(file_path, 'r') as f:
                data = json.load(f)
            print(f"Contents of '{file_path}':")
            return data
        else:
            print(f"File '{file_path}' does not exist.")

    # Read and print false negatives
    false_negatives = read_json(false_negative_path)

    # False negatives
    print("False Negatives:")
    if len(false_negatives) > 0:
        labels = [ex['comment'].split(":")[0].split(",") for ex in false_negatives]
        flattened_labels = set([label.strip() for sublist in labels for label in sublist])

        for label in flattened_labels:
            count_label = 0
            for _labels in labels:
                if label in _labels:
                    count_label += 1
            print(label, count_label, len(labels), count_label/len(labels))

## ClimateFEVER_claims

In [None]:
dataset_name = "climateFEVER_claim"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test['label']
predicted_labels = test['gpt-4o-mini_label']

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)))

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    fig, ax = plt.subplots(figsize=(6, 6))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)


## ClimateFEVER evidence

In [None]:
dataset_name = "climateFEVER_evidence"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test['label']
predicted_labels = test['gpt-4o-mini_label']

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize="true"),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)
    
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)




In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test, dataset_name=dataset_name)
get_statistics_errors_multi(dataset_name)

## SciDCC

In [None]:
dataset_name = "sciDCC"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test['label']
predicted_labels = test['gpt-4o-mini_label']

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize="true"),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    # Plot both confusion matrices side by side using subplots
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)


In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import matplotlib.patheffects as PathEffects
import matplotlib.cm as cm

# Assuming 'generator' and 'dataset_name' are defined elsewhere in your code
train, test, dev = generator.load_dataset_unprocessed(dataset_name)

timeline_data = pd.concat([train, test, dev], ignore_index=True)
timeline_data['year'] = pd.to_datetime(timeline_data['Date']).dt.year

plot_data = timeline_data.groupby(['year', 'label']).size().unstack(fill_value=0)
label_proportions = plot_data.div(plot_data.sum(axis=1), axis=0)

fig, ax = plt.subplots(figsize=(15, 6))

# Use a colormap for better color distinction
cmap = cm.get_cmap("mako", len(label_proportions.columns))
colors = [cmap(i) for i in range(len(label_proportions.columns))]

# Plot stacked area chart
label_proportions.plot(kind='area', stacked=True, ax=ax, linewidth=0, alpha=0.8, color=colors)

# Improve text annotations
cumulative = label_proportions.cumsum(axis=1)

for i, column in enumerate(label_proportions.columns):
    max_idx = label_proportions[column].idxmax()
    max_value = (cumulative[column] - label_proportions[column] / 2).loc[max_idx]
    
    text = ax.text(
        x=max_idx, y=max_value, s=column, 
        verticalalignment='center', horizontalalignment='center', 
        fontsize=11, color="white", fontweight="bold"
    )
    text.set_path_effects([PathEffects.withStroke(linewidth=4, foreground='black')])  # Add contrast

# Remove legend
ax.legend().set_visible(False)

# Enhance grid and remove unnecessary borders
ax.grid(axis="y", linestyle="dashed", alpha=0.5)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)

# Improve titles and labels
plt.title('Evolution of Labels Over Time', fontsize=16, fontweight="bold", pad=20)
plt.xlabel('Year', fontsize=12)
plt.ylabel('Proportion of Labels', fontsize=12)

plt.xticks(rotation=45, fontsize=10)
plt.yticks(fontsize=10)
plt.tight_layout()
plt.show()



The errors might be explained by the evolution of labels overtime as describ

In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test, dataset_name=dataset_name)
get_statistics_errors_multi(dataset_name)

## contrarian_claims

In [None]:
dataset_name = "contrarian_claims"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test['label'] = test['label'].map(label_readable_mapping[dataset_name]['labels'])
test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].map(label_readable_mapping[dataset_name]['labels'])

# test[test['label'].astype(str) != test['gpt-4o-mini_label'].astype(str)]

In [None]:
shorter_label_mapping = {
    'CO2 is beneficial/not a pollutant': 'CO2 beneficial',
    'Climate sensitivity is low/negative feedbacks reduce warming': 'Low sensitivity',
    'Species/plants/reefs aren’t showing climate impacts/are benefiting from climate change': 'Species benefiting',
    'Climate movement is unreliable/alarmist/corrupt': 'Climate alarmist',
    'methods & models)': 'Science uncertain',
    'Clean energy technology/biofuels won’t work': 'Clean tech won’t work',
    'Climate policies (mitigation or adaptation) are harmful': 'Policies harmful',
    'Climate policies are ineffective/flawed': 'Policies ineffective',
    'People need energy (e.g. from fossil fuels/nuclear)': 'Energy demand',
    'Climate hasn’t warmed/changed over the last (few) decade(s)': 'No warming',
    'Extreme weather isn’t increasing/has happened before/isn’t linked to climate change': 'No extreme link',
    'Ice/permafrost/snow cover isn’t melting': 'Ice stable',
    'Sea level rise is exaggerated/not accelerating': 'Sea level stable',
    'Weather is cold/snowing': 'Cold weather',
    'We’re heading into an ice age/global cooling': 'Global cooling',
    'It’s natural cycles/variation': 'Natural cycles',
    'There’s no evidence for greenhouse effect/carbon dioxide driving climate change': 'No greenhouse effect',
    'No claim': 'No claim'
}

In [None]:
def get_category_ranges(categories):
    category_ranges = {}
    current_category = categories[0]
    start_idx = 0

    for i, category in enumerate(categories):
        if category != current_category:
            category_ranges[current_category] = (start_idx, i - 1)
            current_category = category
            start_idx = i

    category_ranges[current_category] = (start_idx, len(categories) - 1)
    return category_ranges

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder
from matplotlib.patheffects import withStroke
from matplotlib import patches

# Example: Replace these lists with your actual label data
true_labels = test['label'].astype(str)
predicted_labels = test['gpt-4o-mini_label'].astype(str)

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize="true")

# Get the class labels back from the encoder
class_labels = label_encoder.classes_
color_category = [c.split(',')[0] for c in class_labels]  # Get the category part of each label
short_labels = [shorter_label_mapping[c.split(',')[-1].strip()] for c in class_labels]  # Shorten label for display

# Build label info: list of tuples (index, category, label)
label_info = [(i, color_category[i], short_labels[i]) for i in range(len(class_labels))]

# Sort label_info by category to group labels
label_info_sorted = sorted(label_info, key=lambda x: x[1])

# Get new order of indices
new_order = [x[0] for x in label_info_sorted]
rearranged_labels = [x[2] for x in label_info_sorted]
rearranged_categories = [x[1] for x in label_info_sorted]

# Rearrange the confusion matrix according to the new label order
cm_rearranged = cm[np.ix_(new_order, new_order)]

# Generate a high-contrast palette using seaborn
palette = sns.color_palette("tab10", len(set(rearranged_categories)))  # Use "hsv" palette for distinct colors
category_to_color = {
    category: palette[i] for i, category in enumerate(sorted(set(rearranged_categories)))
}

# Get the ranges for each category
category_ranges = get_category_ranges(rearranged_categories)

# Display the confusion matrix with colored labels and distinct colors for zero values
def plot_confusion_matrix(cm, labels, categories, cell_width=0.7, cell_height=0.7):
    num_labels = len(labels)

    cm = np.round(cm,2)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)

    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)

    # Create a uniform background for all cells
    ax.matshow(np.zeros_like(cm), cmap="gray", alpha=0.1)  # Light gray background


    # Color each cell of the matrix according to its value and category
    for (i, j), val in np.ndenumerate(cm):
        # Color zero-value cells with light gray
        if val == 0:
            color = (0.95, 0.95, 0.95)  # Light gray for zeros
        else:
            # Color non-zero cells based on their category
            category_i = categories[i]  # Get category for true label
            category_j = categories[j]  # Get category for predicted label
            if category_i == category_j:
                color = category_to_color[category_i]
            else:
                color = (0.8, 0.8, 0.8)  # Light color for mismatched categories

        # Create a filled rectangle for each cell with custom color
        rect = plt.Rectangle((j-0.5, i-0.5), 1, 1, fill=True, color=color, alpha=0.2, edgecolor='none')
        ax.add_patch(rect)

    # Plot confusion matrix values
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=90)
    # plt.title("Confusion Matrix with Highlighted Zero and Non-Zero Values")

    # # Set the color of x-axis tick labels based on categories
    # for label in ax.get_xticklabels():
    #     idx = labels.index(label.get_text())
    #     category = categories[idx]
    #     # label.set_color(category_to_color[category])
    #     label.set_path_effects([withStroke(linewidth=2, foreground=category_to_color[category])])


    # # Set the color of y-axis tick labels based on categories
    # for label in ax.get_yticklabels():
    #     idx = labels.index(label.get_text())
    #     category = categories[idx]
    #     label.set_color(category_to_color[category])
    #     label.set_path_effects([withStroke(linewidth=2, foreground="black")])
        # Add rectangles around groups to highlight categories
    for category, (start, end) in category_ranges.items():
        rect = patches.Rectangle(
            (start - 0.5, start - 0.5),
            end - start + 1,
            end - start + 1,
            linewidth=2,
            edgecolor=category_to_color[category],
            facecolor='none'
        )
        ax.add_patch(rect)

    # Add legend for categories
    handles = [plt.Line2D([0], [0], color=category_to_color[cat], lw=4) for cat in sorted(set(categories))]

    plt.legend(handles, sorted(set(categories)), title='Categories', bbox_to_anchor=(0.5, -0.03), loc='upper center')

    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm_rearranged, rearranged_labels, rearranged_categories)


In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test, dataset_name=dataset_name)
get_statistics_errors_multi(dataset_name)

## ClimaINS_ours

In [None]:
dataset_name = "ClimaINS_ours"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test['label'] = test['label'].astype(str).map(label_readable_mapping[dataset_name]['labels'])
test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].astype(str).map(label_readable_mapping[dataset_name]['labels'])

test[test['label'].astype(str) != test['gpt-4o-mini_label'].astype(str)]

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test['label']
predicted_labels = test['gpt-4o-mini_label']

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize="true"),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)

    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)


In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test, dataset_name=dataset_name, use_readable_labels=False)
get_statistics_errors_multi(dataset_name)

## climaQA

In [None]:
dataset_name = "climaQA"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

# test['label'] = test['label'].astype(str).map(label_readable_mapping[dataset_name]['labels'])
# test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].astype(str).map(label_readable_mapping[dataset_name]['labels'])

test[test['label'].astype(str) != test['gpt-4o-mini_label'].astype(str)]

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report

# Prepare to collect accuracy data
accuracy_data = []

# Loop over each query type in the 'query' column
for q in test['query'].value_counts().index:
    accuracy = classification_report(
        y_pred=test[test['query'] == q]['gpt-4o-mini_label'].astype(str),
        y_true=test[test['query'] == q]['label'].astype(str),
        zero_division=0.0,
        output_dict=True
    )['macro avg']['f1-score']
    try:
        precision = classification_report(
            y_pred=test[test['query'] == q]['gpt-4o-mini_label'].astype(str),
            y_true=test[test['query'] == q]['label'].astype(str),
            zero_division=0.0,
            output_dict=True
        )['1']['precision']
    except:
        precision = 0
    try:
        recall = classification_report(
            y_pred=test[test['query'] == q]['gpt-4o-mini_label'].astype(str),
            y_true=test[test['query'] == q]['label'].astype(str),
            zero_division=0.0,
            output_dict=True
        )['1']['recall']
    except:
        recall = 0
    size = len(test[test['query'] == q])
    
    # Append the query and corresponding accuracy to the list
    accuracy_data.append((q, accuracy, size, precision, recall))

# Convert the list to a DataFrame for plotting
accuracy_df = pd.DataFrame(accuracy_data, columns=['query', 'accuracy', 'size', "precision", "recall"])

truncate_label = dict()
for i, q in enumerate(accuracy_df['query'].unique()):
    truncate_label[q] = f"Q{i}"

# Apply the truncation function to the query labels
accuracy_df['query'] = accuracy_df['query'].map(truncate_label)


# Plotting the accuracy and size on dual y-axes
fig, ax1 = plt.subplots(figsize=(8, 6))

# First plot for accuracy
ax1.bar(accuracy_df['query'], accuracy_df['accuracy'], color='C0', label='Accuracy')
ax1.set_xlabel('Question')
ax1.set_ylabel('macro F1-score', color='C0')
ax1.tick_params(axis='y', labelcolor='C0')
ax1.tick_params(axis='x', rotation=90)

ax1.set_ylim(0, 1)  # Set y-axis limits from 0 to 1 for clarity

# Second plot for size of the dataset, sharing the same x-axis
ax2 = ax1.twinx()
ax2.plot(accuracy_df['query'], accuracy_df['size'], color='C1', marker='o', label='Size')
ax2.set_ylabel('Size of Dataset', color='C1')
ax2.tick_params(axis='y', labelcolor='C1')

# Set title and adjust layout
plt.title('macro F1-score and Size for Each Question')
fig.tight_layout()

# Show the combined plot
plt.show()

In [None]:
dataset_name="ClimaQA"
negative_label = "0"
positive_label = "1"
n_sample = 10

create_jsonl_errors_file(test, negative_label=negative_label, positive_label=positive_label, dataset_name=dataset_name)
get_statistics_erros(dataset_name)

## logicClimate

In [None]:
from ast import literal_eval

dataset_name = "logicClimate"

test = find_errors(dataset_name)

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import classification_report

# Convert string labels to lists
y_true = test['label'].apply(literal_eval)
y_pred = test['gpt-4o-mini_label'].apply(lambda x: x.split(","))
# Initialize the MultiLabelBinarizer
mlb = MultiLabelBinarizer()

# Fit the binarizer and transform the labels
y_true_binarized = mlb.fit_transform(y_true)
y_pred_binarized = mlb.transform(y_pred)

print(
    classification_report(
            y_pred=y_pred_binarized, 
            y_true=y_true_binarized,
            target_names=mlb.classes_,
            zero_division=0.0,
        )
)

In [None]:
get_statistics_errors_multi(dataset_name)

## DESMOG /GW Stance

In [None]:
test_cot = find_errors(dataset_name, mode="cot")
test_cot['label'] = test_cot['label'].astype(str)
test_cot['gpt-4o-mini_label'] = test_cot['gpt-4o-mini_label'].astype(str)

print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'].astype(str), 
            y_true=test_cot['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test_cot[test_cot['gpt-4o-mini_label'].astype(str) != test_cot['label'].astype(str)].copy()
errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

merged_errors = errors.merge(errors_cot, on=['text', 'label'], how="outer", suffixes=("_zero", "_cot"))

print(len(merged_errors[merged_errors['gpt-4o-mini_label_zero'].isna()]), len(merged_errors[merged_errors['gpt-4o-mini_label_cot'].isna()]), len(merged_errors))

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test_cot['label']
predicted_labels = test_cot['gpt-4o-mini_label']

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize="true"),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)
    
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)

In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test_cot, dataset_name=dataset_name, use_readable_labels=False)
get_statistics_errors_multi(dataset_name)

## ClimateStance

In [None]:
test_cot = find_errors(dataset_name, mode="cot")
test_cot['label'] = test_cot['label'].astype(str)
test_cot['label'] = test_cot['label'].map(label_readable_mapping['climateStance']['labels'])
test_cot['gpt-4o-mini_label'] = test_cot['gpt-4o-mini_label'].astype(str)
test_cot['gpt-4o-mini_label'] = test_cot['gpt-4o-mini_label'].map(label_readable_mapping['climateStance']['labels'])

print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'].astype(str), 
            y_true=test_cot['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test_cot[test_cot['gpt-4o-mini_label'].astype(str) != test_cot['label'].astype(str)].copy()
errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

merged_errors = errors.merge(errors_cot, on=['text', 'label'], how="outer", suffixes=("_zero", "_cot"))

print(len(merged_errors[merged_errors['gpt-4o-mini_label_zero'].isna()]), len(merged_errors[merged_errors['gpt-4o-mini_label_cot'].isna()]), len(merged_errors))

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test_cot['label']
predicted_labels = test_cot['gpt-4o-mini_label']

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize="true"),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)
    
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)

In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test_cot, dataset_name=dataset_name, use_readable_labels=False)
get_statistics_errors_multi(dataset_name)

## Lobbymap query

In [None]:
from ast import literal_eval

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import classification_report

dataset_name = "lobbymap_query_origin"

test_cot = find_errors(dataset_name, mode="cot")

test_cot = test_cot[test_cot["query"].astype(str) != "[None]"].copy()

# Convert string labels to lists
y_true = test_cot['query'].apply(lambda x: [map_lobbymap_stance[e] for e in x])
y_pred = test_cot['gpt-4o-mini_label'].apply(lambda x: [e.strip() for e in x.split(",")])

# Initialize the MultiLabelBinarizer
mlb = MultiLabelBinarizer()

# Fit the binarizer and transform the labels
y_true_binarized = mlb.fit_transform(y_true)
y_pred_binarized = mlb.transform(y_pred)

print(
    classification_report(
        y_pred=y_pred_binarized,
        y_true=y_true_binarized,
        target_names=mlb.classes_,
        zero_division=0.0,
    )
)

In [None]:
# Loading data from saved file
results = []
result_file_name = f"llm/outputs/{dataset_name}.jsonl"

with open(result_file_name, 'r') as file:
    for line in file:
        # Parsing the JSON string into a dict and appending to the list of results
        json_object = json.loads(line.strip())
        results.append(json_object["response"]['body']["choices"][0]['message']['content'])

labels = []
explainations = []

for result in results:
    label, explanation = parse_label_explanation(result)
    labels += [label]
    explainations += [explanation]

test = pd.read_parquet(os.path.join("doccano", "random", "parquet", f"{dataset_name}.pkl"))

test['gpt-4o-mini_label'] = labels
test['gpt-4o-mini_explanation'] = explainations

if dataset_name in label_readable_mapping:
    label2id = {v.lower(): k for k, v in label_readable_mapping[dataset_name]['labels'].items()}
    test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].str.lower().map(label2id)  

## Lobbymap Page

In [None]:
dataset_name = "lobbymap_pages"

test = find_errors(dataset_name, mode="cot")

test['label'] = 1 * test['label']

print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

## Lobbymap Stance

In [None]:
relieve_stance = {
        'strongly_supporting': 'supporting',
        'supporting': 'supporting',
        'no_position_or_mixed_position': 'no_position',
        'not_supporting': 'not_supporting',
        'opposing': 'not_supporting',
    }

In [None]:
dataset_name = "lobbymap_stance_origin"

test_cot = find_errors(dataset_name, mode="cot")

test_cot = test_cot[~test_cot['query'].isna()].copy()

test_cot['label'] = test_cot['stance'].copy()
test_cot['label'] = test_cot['label'].astype(str)
# test['label'] = test['label'].map(relieve_stance)
test_cot['gpt-4o-mini_label'] = test_cot['gpt-4o-mini_label'].astype(str)
# test['gpt-4o-mini_label'] = test['gpt-4o-mini_label'].map(relieve_stance)

print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'], 
            y_true=test_cot['label'],
            zero_division=0.0,
        )
)

In [None]:
test_cot['relieved_label'] = test_cot['label'].map(relieve_stance)
test_cot['relieved_gpt-4o-mini_label'] = test_cot['gpt-4o-mini_label'].map(relieve_stance)

print(
    classification_report(
            y_pred=test_cot['relieved_gpt-4o-mini_label'], 
            y_true=test_cot['relieved_label'],
            zero_division=0.0,
        )
)

In [None]:
test_cot['comment'] = ""
test_cot[test_cot['label'] != test_cot['gpt-4o-mini_label']].sample(10).to_json('paper_utils\\error_analysis\\lobbymap_query_stance_fn.json', orient="records", indent=4)

In [None]:
get_statistics_errors_multi(dataset_name="lobbymap_query_stance")

In [None]:
def get_page_idx(l):
    list_of_lists = [e['page_indices'] for e in l]
    flattened_list = [item for sublist in list_of_lists for item in sublist]
    return list(set(flattened_list))

def get_page_stance_map(ds):
    ds_exploded = ds.explode('evidences')
    ds_exploded['page_indices'] = ds_exploded['evidences'].apply(lambda x: x['page_indices'])
    ds_exploded['comment'] = ds_exploded['evidences'].apply(lambda x: x['comment'])
    ds_exploded['query'] = ds_exploded['evidences'].apply(lambda x: x['query'])
    ds_exploded['stance'] = ds_exploded['evidences'].apply(lambda x: x['stance'])
    mapping = ds_exploded[['document_id', 'page_indices', 'query', 'stance', 'comment']].explode('page_indices')
    return mapping.reset_index()

def reconstruct_page(dataset_df):
    exploded_train = dataset_df[['document_id', 'sentences']].explode('sentences')

    exploded_train['page_idx'] = exploded_train['sentences'].apply(lambda x: x['page_idx'])
    exploded_train['sentence_id'] = exploded_train['sentences'].apply(lambda x: x['sentence_id'])
    exploded_train['block_idx'] = exploded_train['sentences'].apply(lambda x: x['block_idx'])
    exploded_train['text'] = exploded_train['sentences'].apply(lambda x: x['text'])

    page_inputs = exploded_train.groupby(by=['document_id', 'page_idx', 'block_idx'])[
        'text'].sum().reset_index()
    page_inputs = page_inputs.groupby(by=['document_id', 'page_idx'])['text'].apply(lambda x: "\\n".join(x))

    return page_inputs.reset_index()

In [None]:
folder_path = os.path.join(os.getcwd(), "data", "lobbymap", "lobbymap_dataset")

train_path = os.path.join(folder_path, "train.jsonl")
raw_train = pd.read_json(train_path, lines=True)

test_path = os.path.join(folder_path, "test.jsonl")
raw_test = pd.read_json(test_path, lines=True)

dev_path = os.path.join(folder_path, "valid.jsonl")
raw_dev = pd.read_json(dev_path, lines=True)

In [None]:
def binary_stance_dataset(raw):
    map_stance_label = get_page_stance_map(raw)
    pages = reconstruct_page(raw.copy())
    stance = map_stance_label.merge(pages, left_on=['document_id', 'page_indices'],
                                  right_on=['document_id', 'page_idx'], how='left')
    stance = stance[['text', 'query', 'stance', 'comment', "document_id"]].copy()
    stance.columns=['text', 'query', 'label', 'comment', "document_id"]
    return stance

x_train = binary_stance_dataset(raw_train)
x_test = binary_stance_dataset(raw_test)
x_dev = binary_stance_dataset(raw_dev)

In [None]:
import pandas as pd
from collections import defaultdict

def reconstruct_pages(sentences):
    # Group sentences by page index
    pages = defaultdict(list)
    for sentence in sentences:
        page_idx = sentence['page_idx']
        pages[page_idx].append(sentence)
    
    # Reconstruct pages
    reconstructed_pages = {}
    for page_idx, page_sentences in pages.items():
        # Sort sentences by block index and block sentence index
        sorted_sentences = sorted(
            page_sentences,
            key=lambda x: (x['block_idx'], x['block_sentence_idx'])
        )
        # Concatenate the text of the sentences
        page_text = ' '.join(sentence['text'] for sentence in sorted_sentences)
        reconstructed_pages[page_idx] = page_text
    return reconstructed_pages

# Apply the function to your DataFrame
sampled_documents['reconstructed_pages'] = sampled_documents['sentences'].apply(reconstruct_pages)

In [None]:
y_true = []
X_test = []
for i, r in sampled_documents.iterrows():
    for evidence in r["evidences"]:
        X_test += [r['reconstructed_pages']]
        y_true += [(evidence['query'], evidence['stance'], evidence['page_indices'])]

In [None]:
pd.DataFrame({
    "X":X_test,
    "y":y_true
}).to_csv('llm/dataset/lobbymap.csv', sep="\t", index=False)

## Lobbymap

In [None]:
dataset_name = "lobbymap_query_origin"

test_query = find_errors(dataset_name, mode="cot")

dataset_name = "lobbymap_pages"

test_page = find_errors(dataset_name, mode="cot")
test_page = test_page.drop_duplicates(subset=['document_id', 'page_id'], keep="first")

query_page = test_query.merge(test_page, on=['document_id', 'page_id', "text"], how="left", suffixes=("_query", "_page"))

query_page['comment_page'] = ""
query_page['comment_query'] = ""

In [None]:
import re

def parse_lobbymap_label(text):
    # Regular expression to extract Label and Explanation
    label_pattern = r'Evidences:\s*(.*)'
    explanation_pattern = r'Explanation:\s*(.*)'

    # Find the label
    label_match = re.search(label_pattern, text)
    label = label_match.group(1) if label_match else None

    # Find the explanation
    explanation_match = re.search(explanation_pattern, text, re.DOTALL)
    explanation = explanation_match.group(1).strip() if explanation_match else None
    
    # label = label.replace('[', "").replace(']', "")

    return label, explanation

In [None]:
dataset_name = "lobbymap"
        
# Loading data from saved file
results = []
result_file_name = f"llm/outputs/{dataset_name}.jsonl"

with open(result_file_name, 'r') as file:
    for line in file:
        # Parsing the JSON string into a dict and appending to the list of results
        json_object = json.loads(line.strip())
        results.append(json_object["response"]['body']["choices"][0]['message']['content'])

labels = []
explainations = []

for result in results:
    label, explanation = parse_lobbymap_label(result)
    labels += [label]
    explainations += [explanation]

test = pd.read_csv('llm/dataset/lobbymap.csv', sep="\t")
test['gpt-4o-mini_label'] = labels
test['gpt-4o-mini_explanation'] = explainations

In [None]:
parsed_test = test.groupby('X').agg({
    "y":list,
    "gpt-4o-mini_label":"first",
    "gpt-4o-mini_explanation":"first"
}).reset_index()

In [None]:
import ast

# Function to ensure correct tuple format
def ensure_tuple_format(row):
    # Check if row is a list of tuples, if not convert each element to a tuple
    if len(row) == 3:
        for el in row:
            if not isinstance(el, tuple):  # Assuming each tuple should have 3 elements
                return [tuple(row)]
    return row

parsed_test['predicted_evidences'] = parsed_test['gpt-4o-mini_label'].apply(lambda x: ensure_tuple_format(list(literal_eval(x))))
parsed_test['gold_evidences'] = parsed_test['y'].apply(lambda x: ensure_tuple_format([literal_eval(el) for el in x]))

In [None]:
from src.lobbymap.evaluate_f1 import evaluate_strict_f1, evaluate_overlap_f1, evaluate_document_f1

In [None]:
df = parsed_test.copy()
df['document_id'] = df.index.astype(str)

gold_jds = []
pred_jds = []

for idx, row in df.iterrows():
    document_id = row['document_id']
    
    # Process gold evidences
    gold_evidences = []
    for evidence in row['gold_evidences']:
        query, stance, page_indices = evidence
        if not isinstance(page_indices, list):
            page_indices = [page_indices]  # Ensure page_indices is a list
        gold_evidences.append({
            'query': query,
            'stance': stance,
            'page_indices': page_indices
        })
    
    gold_jds.append({
        'document_id': document_id,
        'evidences': gold_evidences
    })
    
    # Process predicted evidences
    predicted_evidences = []
    for evidence in row['predicted_evidences']:
        query, stance, page_indices = evidence
        if not isinstance(page_indices, list):
            page_indices = [page_indices]  # Ensure page_indices is a list
        predicted_evidences.append({
            'query': query,
            'stance': stance,
            'page_indices': page_indices
        })
    
    pred_jds.append({
        'document_id': document_id,
        'evidences': predicted_evidences
    })

In [None]:
result_strict = evaluate_strict_f1(gold_jds=gold_jds, pred_jds=pred_jds)
result_document = evaluate_document_f1(gold_jds=gold_jds, pred_jds=pred_jds)
result_overlap = evaluate_overlap_f1(gold_jds=gold_jds, pred_jds=pred_jds)


print("GPT-4o-mini", "&", result_document['page']['f'], "&", result_document['query']['f'], "&", result_document['stance']['f'], "&", result_overlap['page']['f'], "&", result_overlap['query']['f'], "&", result_overlap['stance']['f'], "&", result_strict['page']['f'], "&", result_strict['query']['f'], "&", result_strict['stance']['f'], "\\\\")

## Clima Text

In [None]:
test_cot = find_errors(dataset_name, mode="cot")
print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'].astype(str), 
            y_true=test_cot['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test_cot[test_cot['gpt-4o-mini_label'].astype(str) != test_cot['label'].astype(str)].copy()
errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

merged_errors = errors.merge(errors_cot, on=['text', 'label'], how="outer", suffixes=("_zero", "_cot"))

In [None]:
negative_label = "0"
positive_label = "1"
n_sample = 10

create_jsonl_errors_file(test, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## Climatext wiki/10K

In [None]:
dataset_name = "climatext_wiki"

test = find_errors(dataset_name, mode="cot")
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
negative_label = "0"
positive_label = "1"
n_sample = 10

create_jsonl_errors_file(test, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

In [None]:
dataset_name = "climatext_10k"

test = find_errors(dataset_name, mode="cot")
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
negative_label = "0"
positive_label = "1"
n_sample = 10

create_jsonl_errors_file(test, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## climate_detection

In [None]:
test_cot = find_errors(dataset_name, mode="cot")
print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'].astype(str), 
            y_true=test_cot['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test_cot[test_cot['gpt-4o-mini_label'].astype(str) != test_cot['label'].astype(str)].copy()
errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

merged_errors = errors.merge(errors_cot, on=['text', 'label'], how="outer", suffixes=("_zero", "_cot"))

In [None]:
negative_label = "no"
positive_label = "yes"
n_sample = 11

create_jsonl_errors_file(test, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## Specificity

In [None]:
dataset_name = "climate_specificity"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test, dataset_name=dataset_name, use_readable_labels=False)
get_statistics_errors_multi(dataset_name)

## climate tcfd recommendations

In [None]:
dataset_name = "climate_tcfd_recommendations"

test_cot = find_errors(dataset_name, mode="cot")
print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'].astype(str), 
            y_true=test_cot['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test_cot[test_cot['gpt-4o-mini_label'].astype(str) != test_cot['label'].astype(str)].copy()
errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

merged_errors = errors.merge(errors_cot, on=['text', 'label'], how="outer", suffixes=("_zero", "_cot"))

print(len(merged_errors[merged_errors['gpt-4o-mini_label_zero'].isna()]), len(merged_errors[merged_errors['gpt-4o-mini_label_cot'].isna()]), len(merged_errors))

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test_cot['label'].astype(str)
predicted_labels = test_cot['gpt-4o-mini_label'].astype(str)

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize='true'),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    # Plot both confusion matrices side by side using subplots
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)


In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test_cot, dataset_name=dataset_name)
get_statistics_errors_multi(dataset_name)

## Environmental Claims

In [None]:
dataset_name = "environmental_claims"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
negative_label = "0"
positive_label = "1"
n_sample = 10

create_jsonl_errors_file(test, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## Explicit/Implicit Claims

In [None]:
dataset_name = "green_claims_3"

test_cot = find_errors(dataset_name, mode="cot")
print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'].astype(str), 
            y_true=test_cot['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test_cot[test_cot['gpt-4o-mini_label'].astype(str) != test_cot['label'].astype(str)].copy()
errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

merged_errors = errors.merge(errors_cot, on=['text', 'label'], how="outer", suffixes=("_zero", "_cot"))

print(len(merged_errors[merged_errors['gpt-4o-mini_label_zero'].isna()]), len(merged_errors[merged_errors['gpt-4o-mini_label_cot'].isna()]), len(merged_errors))

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test_cot['label'].astype(str)
predicted_labels = test_cot['gpt-4o-mini_label'].astype(str)

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize="true"),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)
    
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)


In [None]:
n_sample = 14

create_jsonl_errors_file_multilabel(test_cot, dataset_name=dataset_name, n_sample=n_sample)
get_statistics_errors_multi(dataset_name)

## Green Claims

In [None]:
dataset_name = "green_claims"

test_cot = find_errors(dataset_name, mode="cot")
print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'].astype(str), 
            y_true=test_cot['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test_cot[test_cot['gpt-4o-mini_label'].astype(str) != test_cot['label'].astype(str)].copy()
errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

merged_errors = errors.merge(errors_cot, on=['text', 'label'], how="outer", suffixes=("_zero", "_cot"))

print(len(merged_errors[merged_errors['gpt-4o-mini_label_zero'].isna()]), len(merged_errors[merged_errors['gpt-4o-mini_label_cot'].isna()]), len(merged_errors))

In [None]:
negative_label = "not_green"
positive_label = "green_claim"
n_sample = 10

create_jsonl_errors_file(test, negative_label=negative_label, positive_label=positive_label, dataset_name=dataset_name)
get_statistics_erros(dataset_name)

## ClimateBUG data

In [None]:
dataset_name = "climateBUG_data"

test_cot = find_errors(dataset_name, mode="cot")
print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'].astype(str), 
            y_true=test_cot['label'].astype(str),
            zero_division=0.0,
        )
)

errors_cot = test_cot[test_cot['gpt-4o-mini_label'].astype(str) != test_cot['label'].astype(str)].copy()
errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

merged_errors = errors.merge(errors_cot, on=['text', 'label'], how="outer", suffixes=("_zero", "_cot"))

In [None]:
negative_label = "non-climate"
positive_label = "climate"
n_sample = 10

create_jsonl_errors_file(test_cot, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## ESG E

In [None]:
dataset_name = "esgbert_g"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
negative_label = "0"
positive_label = "1"
n_sample = 10

create_jsonl_errors_file(test, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## ESGBERT NAture/FOREST/water

In [None]:
dataset_name = "esgbert_category_water"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
negative_label = "0"
positive_label = "1"
n_sample = 10

create_jsonl_errors_file(test, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## Sustainable review

In [None]:
dataset_name = "sustainable_signals_review"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

## ClimateEng

In [None]:
dataset_name = "climateEng"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test['label'].astype(str).map(label_readable_mapping['climateEng']['labels'])
predicted_labels = test['gpt-4o-mini_label'].astype(str).map(label_readable_mapping['climateEng']['labels'])

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize="true"),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    # Plot both confusion matrices side by side using subplots
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)


In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test, dataset_name=dataset_name)
get_statistics_errors_multi(dataset_name)

## Action 500

In [None]:
dataset_name = "esgbert_action500"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
negative_label = "0"
positive_label = "1"
n_sample = 10

create_jsonl_errors_file(test, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## ClimaTOPIC

In [None]:
dataset_name = "ClimaTOPIC"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test['label'].astype(str)
predicted_labels = test['gpt-4o-mini_label'].astype(str)

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize='true'),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    # Plot both confusion matrices side by side using subplots
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)


In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test, dataset_name=dataset_name)
get_statistics_errors_multi(dataset_name)

## Commitment And Actions

In [None]:
dataset_name = "climate_commitments_actions"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
negative_label = "no"
positive_label = "yes"
n_sample = 10

create_jsonl_errors_file(test, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## Net Zero

In [None]:
dataset_name = "netzero_reduction"

test_cot = find_errors(dataset_name, mode="cot")
print(
    classification_report(
            y_pred=test_cot['gpt-4o-mini_label'].astype(str), 
            y_true=test_cot['label'].astype(str),
            zero_division=0.0,
        )
)

test_cot[test_cot['gpt-4o-mini_label'].astype(str) != test_cot['label'].astype(str)]

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test_cot['label'].astype(str)
predicted_labels = test_cot['gpt-4o-mini_label'].astype(str)

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels)), normalize='true'),2)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    cell_width = 0.7
    cell_height = 0.7
    num_labels = len(labels)

    # Calculate figure size based on the number of labels and fixed cell dimensions
    figsize = (cell_width * num_labels, cell_height * num_labels)
    fig, ax = plt.subplots(figsize=figsize)

    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)


In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test_cot, dataset_name=dataset_name, n_sample=n_sample)
get_statistics_errors_multi(dataset_name=dataset_name)

## Climate Specificity

In [None]:
dataset_name = "climate_specificity"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
negative_label = "non-specific"
positive_label = "specific"
n_sample = 10

create_jsonl_errors_file(test, dataset_name=dataset_name, negative_label=negative_label, positive_label=positive_label)
get_statistics_erros(dataset_name)

## Climate Sentiment

In [None]:
dataset_name = "climate_sentiment"

test = find_errors(dataset_name)
print(
    classification_report(
            y_pred=test['gpt-4o-mini_label'].astype(str), 
            y_true=test['label'].astype(str),
            zero_division=0.0,
        )
)

test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)]

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

# Example: Replace these lists with your actual label data
true_labels = test['label'].astype(str)
predicted_labels = test['gpt-4o-mini_label'].astype(str)

# Combine both true and predicted labels to get the full set of unique classes
all_labels = list(set(true_labels) | set(predicted_labels))

# Use LabelEncoder to encode textual labels to integers based on the full label set
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

true_labels_encoded = label_encoder.transform(true_labels)
predicted_labels_encoded = label_encoder.transform(predicted_labels)

# Compute confusion matrix
cm = np.round(confusion_matrix(true_labels_encoded, predicted_labels_encoded, labels=range(len(all_labels))),1)

# Get the class labels back from the encoder
class_labels = label_encoder.classes_

# Display the confusion matrix
def plot_confusion_matrix(cm, labels):
    fig, ax = plt.subplots(figsize=(6, 6))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    plt.xticks(rotation=45, ha="right")  # Rotate labels by 45 degrees
    plt.title("Confusion Matrix")
    plt.show()

# Call the function to display the confusion matrix
plot_confusion_matrix(cm, class_labels)

In [None]:
n_sample = 10

create_jsonl_errors_file_multilabel(test, dataset_name=dataset_name)
get_statistics_errors_multi(dataset_name)

# Table of fails

In [None]:
results = []

for dataset_name in ['climateFEVER_claim',
#  'lobbymap_stance_origin',
# 'logicClimate',
 'sciDCC',
#  'lobbymap_query_origin',
 'contrarian_claims',
 'climate_tcfd_recommendations',
 'ClimaTOPIC',
 'climatext',
 'ClimaINS_ours',
 'sustainable_signals_review',
 'climateFEVER_evidence',
 'climateStance',
 'lobbymap_pages',
 'climateEng',
 'climate_specificity',
 'environmental_claims',
 'esgbert_action500',
 'gw_stance_detection',
 'climate_commitments_actions',
 'climate_sentiment',
 'green_claims_3',
 'climateBUG_data',
 'green_claims',
 'esgbert_e',
 'esgbert_category_forest',
 'esgbert_category_nature',
 'climaQA',
 'esgbert_g',
 'esgbert_category_biodiversity',
 'esgbert_s',
 'esgbert_category_water',
 'climate_detection',
 'netzero_reduction']:
    # Find errors for GPT-4o-mini
    test = find_errors(dataset_name)
    gpt_errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

    # Find errors for distilRoBERTa
    test_distilroberta = pd.read_parquet(f'experiment_results\\performances\\y_pred\\{dataset_name}_distilRoBERTa_42.pkl')
    distil_errors = test_distilroberta[test_distilroberta['label'] != test_distilroberta['y_pred']].copy()

    # Merge errors to find common errors
    merged = gpt_errors.merge(distil_errors[['text', 'y_pred', 'label']], on='text', how="inner", suffixes=('_gpt4', '_distil'))
    all_errors = gpt_errors.merge(distil_errors[['text', 'y_pred', 'label']], on='text', how="outer", suffixes=('_gpt4', '_distil'))

    # Calculate percentages
    gpt_error_percentage = np.round(100 * len(gpt_errors) / len(test), 2) if len(test) > 0 else 0
    distil_error_percentage = np.round(100 * len(distil_errors) / len(test_distilroberta), 2) if len(test_distilroberta) > 0 else 0
    common_error_percentage = np.round(100 * len(merged) / len(all_errors), 2)

    # Append results to the list
    results.append([dataset_name.replace('_', '\\_'), gpt_error_percentage, distil_error_percentage, common_error_percentage])

# Create a DataFrame for sorting
results_df = pd.DataFrame(results, columns=['Dataset', 'GPT-4o-mini Error %', 'distilRoBERTa Error %', 'Common Error %'])

# Sort the results by 'Common Error %' in descending order
sorted_results_df = results_df.sort_values(by='Common Error %', ascending=False)

# Print the sorted results in the desired format
for _, row in sorted_results_df.iterrows():
    print(f"{row['Dataset']} & {row['GPT-4o-mini Error %']} & {row['distilRoBERTa Error %']} & {row['Common Error %']} \\\\")


In [None]:
def get_outputs(data, dataset_name):
    outputs = []
    for _data in data[dataset_name]:
        outputs.append(_data[2]['content'])
    return outputs

def get_errors_llama(dataset_name):
    results = []
    #result_file_name = f"llm/outputs/gpt-4o/{dataset_name}.jsonl" if use_gpt4 else f"llm/outputs/{dataset_name}.jsonl"
    results = get_outputs(data=data, dataset_name=dataset_name)
    
    labels = []
    explainations = []
    
    for result in results:
        label, explanation = parse_label_explanation(result)
        labels += [label]
        explainations += [explanation]
    
    # test = pd.read_parquet(os.path.join("doccano", "random", "parquet", f"{dataset_name}.pkl"))
    test = pd.read_parquet(f'doccano\\random\\parquet\\{dataset_name}.pkl')
        
    if dataset_name == "lobbymap_pages":
        test['label'] = 1 * test['label']

    test['model_label'] = labels
    test['model_explanation'] = explainations

    if dataset_name == "lobbymap_query_origin":
        test = test[test['query'].astype(str) != "[None]"]
        dataset_name = "lobbymap_query"
    if dataset_name == "lobbymap_stance_origin":
        test = test[test['query'].astype(str) != "None"]
        test.rename(columns={'stance':'label'}, inplace=True)
        dataset_name = "lobbymap_stance"

    if dataset_name in label_readable_mapping:
        label2id = {v.lower(): k for k, v in label_readable_mapping[dataset_name]['labels'].items()}
        test['model_label'] = test['model_label'].str.lower().map(label2id)  
    
    return test

In [None]:
results = []

# for dataset_name in ['climateFEVER_claim',
# #  'lobbymap_stance_origin',
# # 'logicClimate',
#  'sciDCC',
# #  'lobbymap_query_origin',
#  'contrarian_claims',
#  'climate_tcfd_recommendations',
#  'ClimaTOPIC',
#  'climatext',
#  'ClimaINS_ours',
#  'sustainable_signals_review',
# #  'climateFEVER_evidence',
#  'climateStance',
#  'lobbymap_pages',
#  'climateEng',
#  'climate_specificity',
#  'environmental_claims',
#  'esgbert_action500',
#  'gw_stance_detection',
#  'climate_commitments_actions',
#  'climate_sentiment',
#  'green_claims_3',
#  'climateBUG_data',
#  'green_claims',
#  'esgbert_e',
#  'esgbert_category_forest',
#  'esgbert_category_nature',
#  #'climaQA',
#  'esgbert_g',
#  'esgbert_category_biodiversity',
#  'esgbert_s',
#  'esgbert_category_water',
#  'climate_detection',
#  'netzero_reduction']:
for dataset_name in ['netzero_reduction']:
    print("#"*10)
    print(dataset_name)
    print("#"*10)

    # Find errors for GPT-4o-mini
    test = find_errors(dataset_name)
    gpt_errors = test[test['gpt-4o-mini_label'].astype(str) != test['label'].astype(str)].copy()

    # Find errors for distilRoBERTa
    test_distilroberta = pd.read_parquet(f'experiment_results\\performances\\y_pred\\{dataset_name}_distilRoBERTa_42.pkl')
    distil_errors = test_distilroberta[test_distilroberta['label'] != test_distilroberta['y_pred']].copy()

    # Find errors for Llama
    test_llama = get_errors_llama(dataset_name)
    llama_errors = test_llama[test_llama['label'].astype(str) != test_llama['model_label'].astype(str)].copy()

    # Merge errors to find common errors
    merged = gpt_errors.merge(distil_errors[['text', 'y_pred', 'label']], on='text', how="inner", suffixes=('_gpt4', '_distil'))
    all_errors = gpt_errors.merge(distil_errors[['text', 'y_pred', 'label']], on='text', how="outer", suffixes=('_gpt4', '_distil'))
    all_errors = test[['text']].merge(all_errors, on='text', how='inner')

    merged_llama_gpt = gpt_errors.merge(llama_errors[['text', 'model_label', 'model_explanation', 'label']], on='text', how="inner", suffixes=('_gpt4', '_llama'))
    all_errors_llama = gpt_errors.merge(llama_errors[['text', 'model_label', 'model_explanation', 'label']], on='text', how="outer", suffixes=('_gpt4', '_llama'))
    common_test = test_llama[['text']].merge(test[['text']], how="inner", on="text")
    all_errors_llama = common_test[['text']].merge(all_errors_llama, on='text', how='inner')

    # Calculate percentages
    gpt_error_percentage = np.round(100 * len(gpt_errors) / len(test), 2) if len(test) > 0 else 0
    distil_error_percentage = np.round(100 * len(distil_errors) / len(test_distilroberta), 2) if len(test_distilroberta) > 0 else 0
    common_error_percentage = np.round(100 * len(merged) / len(all_errors), 2) if len(all_errors) > 0 else np.nan

    llama_error_percentage = np.round(100 * len(llama_errors) / len(test_llama), 2) if len(test_llama) > 0 else 0
    llama_common_error_percentage = np.round(100 * len(merged_llama_gpt) / len(all_errors_llama), 2) if len(all_errors_llama) > 0 else np.nan

    # Append results to the list
    results.append([dataset_name.replace('_', '\\_'), gpt_error_percentage, distil_error_percentage, common_error_percentage, llama_error_percentage, llama_common_error_percentage])

# Create a DataFrame for sorting
results_df = pd.DataFrame(results, columns=['Dataset', 'GPT-4o-mini Error %', 'distilRoBERTa Error %', 'GPT-4o-mini-DistilBERT Common Error %', 'Llama Error %', 'Llama-GPT-4omini Common %'])

# Sort the results by 'Common Error %' in descending order
sorted_results_df = results_df.sort_values(by='GPT-4o-mini-DistilBERT Common Error %', ascending=False)

# Print the sorted results in the desired format
for _, row in sorted_results_df.iterrows():
    print(f"{row['Dataset']} & {row['GPT-4o-mini Error %']} & {row['distilRoBERTa Error %']} & {row['GPT-4o-mini-DistilBERT Common Error %']} & {row['Llama Error %']} & {row['Llama-GPT-4omini Common %']} \\\\")

# Parse LLama outputs

In [None]:
import re

def parse_label_explanation(text, dataset_name=None):
    try:
        # Regular expression to extract Label and Explanation
        label_pattern = r'Label:\s*(.*)'
        explanation_pattern = r'Explanation:\s*(.*)'

        # Find the label
        label_match = re.search(label_pattern, text)
        label = label_match.group(1) if label_match else None

        # Find the explanation
        explanation_match = re.search(explanation_pattern, text, re.DOTALL)
        explanation = explanation_match.group(1).strip() if explanation_match else None
        
        label = label.replace('[', "").replace(']', "").strip()

        label = label.replace('Climate solutions won’t work, Climate policies are harmful', "Climate solutions won’t work, Climate policies (mitigation or adaptation) are harmful")
        label = label.replace('Climate solutions won’t work, One country is negligible', 'Climate solutions won’t work, Climate policies are ineffective/flawed')

        if dataset_name == "ClimaINS_ours":
            label = label.split(" ")[0]
        elif dataset_name == "climate_tcfd_recommendations":
            label = label.replace("general", "none")
            
    except Exception as e:
        label = None
        if dataset_name == "netzero_reduction":
            label="none"
        elif dataset_name == "lobbymap_pages":
            label="The page does not contain evidence about the stance of the company regarding any of the policy"
        elif dataset_name == "logicClimate":
            label = "None"
        explanation = e

    return label, explanation

In [None]:
import json
from src.logger import bootstrap_confidence_interval_saving

llama_path = "llm\outputs\llama\\results_climatext_llama_8B_cot.json" # "llm\outputs\llama\greenbench_llama_70B_cot_4bit.json"
model_type = "Llama-8B" # Llama-70B

# Read the JSON file as a string
with open(llama_path, 'r') as file:
    json_string = file.read()

# Parse the JSON string into a dictionary
data = json.loads(json_string)
data = json.loads(data)

In [None]:
def get_outputs(data, dataset_name):
    outputs = []
    for _data in data[dataset_name]:
        outputs.append(_data[2]['content'])
    return outputs

In [None]:
perf_file_path = "experiment_results\performances\performances_llama.csv"

In [None]:
from ast import literal_eval
from sklearn.preprocessing import MultiLabelBinarizer

performance_type = "f1_score"

if os.path.exists(perf_file_path):
    performances = pd.read_csv(perf_file_path)
else:
    performances = pd.DataFrame()

for dataset_name in set(prompts.keys())-{"lobbymap_query", "logicClimate"}:
    # Loading data from saved file
    results = []
    #result_file_name = f"llm/outputs/gpt-4o/{dataset_name}.jsonl" if use_gpt4 else f"llm/outputs/{dataset_name}.jsonl"
    results = get_outputs(data=data, dataset_name=dataset_name)
    
    labels = []
    explainations = []
    
    for result in results:
        label, explanation = parse_label_explanation(result)
        labels += [label]
        explainations += [explanation]
    
    test = pd.read_parquet(f'data\\llm_green_nlp_tasks\\{dataset_name}.pkl')

In [None]:
from ast import literal_eval
from sklearn.preprocessing import MultiLabelBinarizer

performance_type = "f1_score"

if os.path.exists(perf_file_path):
    performances = pd.read_csv(perf_file_path)
else:
    performances = pd.DataFrame()

# set(prompts.keys())-{"lobbymap_query", "logicClimate"}
# for dataset_name in set(prompts.keys())-{"lobbymap_query", "logicClimate", "lobbymap", 'climatext_10k', "lobbymap_stance"}:
for dataset_name in ["climatext_10k", "climatext_wiki", "climatext_claim"]:

    # if dataset_name == "map_lobbymap_stance":
    #     continue
    
    # Loading data from saved file
    results = []
    #result_file_name = f"llm/outputs/gpt-4o/{dataset_name}.jsonl" if use_gpt4 else f"llm/outputs/{dataset_name}.jsonl"
    results = get_outputs(data=data, dataset_name=dataset_name)
    
    labels = []
    explainations = []
    
    for result in results:
        label, explanation = parse_label_explanation(result, dataset_name=dataset_name)
        labels += [label]
        explainations += [explanation]
    
    # test = pd.read_parquet(os.path.join("doccano", "random", "parquet", f"{dataset_name}.pkl"))
    # test = pd.read_parquet(f'doccano\\random\\parquet\\{dataset_name}.pkl')
    test = pd.read_parquet(f'data\\llm_green_nlp_tasks\\{dataset_name}.pkl')

    if dataset_name == "lobbymap_pages":
        test['label'] = 1 * test['label']

    test['model_label'] = labels
    test['model_explanation'] = explainations

    if dataset_name == "sustainable_signals_review":
        test['model_label'] = test['model_label'].str.replace("Not Relevant", "Not relevant")
    
    if dataset_name == "contrarian_claims":
        test['model_label'] = test['model_label'].str.replace("No claim", "No claim, No claim")

    if dataset_name == "lobbymap_query_origin":
        test = test[test['query'].astype(str) != "[None]"]
        dataset_name = "lobbymap_query"
    if dataset_name == "lobbymap_stance_origin":
        test = test[test['query'].astype(str) != "None"]
        test.rename(columns={'stance':'label'}, inplace=True)
        dataset_name = "lobbymap_stance"
    
    print(dataset_name)
    
    if dataset_name in ["logicClimate", "lobbymap_query"]:
        if dataset_name == "logicClimate":
            y_true = test['label'].apply(literal_eval)
            y_pred = test['model_label'].apply(lambda x: [e.strip().lower() for e in x.split(",")])
        elif dataset_name == "lobbymap_query":
            y_true = test['label'].apply(lambda x: [map_lobbymap_stance[e] for e in x])
            y_pred = test['model_label'].apply(lambda x: [e.strip() for e in x.split(",")])   
        
        # Initialize the MultiLabelBinarizer
        mlb = MultiLabelBinarizer()
        
        # Fit the binarizer and transform the labels
        y_true_binarized = mlb.fit_transform(y_true)
        y_pred_binarized = mlb.transform(y_pred)
        
        report = classification_report(
                    y_pred=y_pred_binarized, 
                    y_true=y_true_binarized,
                    target_names=mlb.classes_,
                    zero_division=0.0,
                    output_dict=True 
                )
        
        f1_lower, f1_upper = bootstrap_confidence_interval_saving(y_pred=y_pred_binarized, y_true=y_true_binarized, num_bootstrap_samples=1000, dataset_name=dataset_name, model_name=model_type)
        print(report['macro avg']['f1-score'], f1_lower, f1_upper)
        
    else:
        if dataset_name in label_readable_mapping:
            label2id = {v.lower(): k for k, v in label_readable_mapping[dataset_name]['labels'].items()}
            test['model_label'] = test['model_label'].str.lower().map(label2id)

        # Replace non existing labels by the most frequent
        test.loc[~test['model_label'].astype(str).isin(test['label'].astype(str).unique()), "model_label"] = test['label'].mode()[0]
                    
        report = classification_report(
                y_pred=test['model_label'].astype(str), 
                y_true=test['label'].astype(str),
                zero_division=0.0,
                output_dict=True            
            )
        
        f1_lower, f1_upper = bootstrap_confidence_interval_saving(y_pred=test['model_label'].astype(str), y_true=test['label'].astype(str), num_bootstrap_samples=1000, dataset_name=dataset_name, model_name=model_type)
        print(report['macro avg']['f1-score'], f1_lower, f1_upper)
        
    if ('samples avg' in report.keys()) and ('accuracy' not in report.keys()):
        report['accuracy'] = report['samples avg']['f1-score']
        
    new_row = pd.DataFrame({
        'dataset_name': [dataset_name],
        'model_type': [model_type],
        'performance': [report['macro avg']['f1-score']],
        'performance_type': [performance_type],
        'n_labels': [np.nan],
        'seed': [42],
        "f1_upper": [f1_upper],
        "f1_lower": [f1_lower],
        "n_epoch": [np.nan],
        "precision": [report['macro avg']['precision']],
        "recall": [report['macro avg']['recall']],
        "weighted_f1": [report['weighted avg']['f1-score']],
        "accuracy": [report['accuracy']]
    })
    performances = pd.concat([performances, new_row], ignore_index=True)

In [None]:
performances.to_csv(perf_file_path, index=False)
performances

# Meta Analysis of errors

In [None]:
import os
import pandas as pd
path = "paper_utils/error_analysis/"

error_analysis_df = pd.DataFrame()

for filename in os.listdir(path):
    if "_old" in filename:
        continue
    if "archive" in filename:
        continue
    if ".zip" in filename:
        continue
    if "iaa" in filename:
        continue

    dataset_name = filename[:-8]

    file_df = pd.read_json(path+filename)
    file_df['dataset_name'] = dataset_name

    if "comment_page" in file_df:
        file_df = file_df[file_df["comment_page"]!="true:"].copy()
        file_df = file_df[file_df["comment_page"]!=""].copy()
        file_df.rename(columns={"comment_query":"comment"}, inplace=True)

    error_analysis_df = pd.concat([error_analysis_df, file_df[['comment', 'dataset_name']]], ignore_index=True)


In [None]:
number_errors = dict()

cot_list = ["netzero_reduction", 
"climate_tcfd_recommendations",
"climateBUG_data",
"green_claims",
"green_claims_3",
"climateStance",
"gw_stance_detection",
"climateFEVER_evidence",
"lobbymap_query",
"lobbymap_stance",
"lobbymap_pages",
"climatext",
"climatext_10k",
"climatext_claim",
"climatext_wiki",
"climate_detection"]

l = []
for filename in os.listdir(path):
    if "lobbymap" in filename:
        continue
    if "_old" in filename:
        continue
    if "archive" in filename:
        continue
    if ".zip" in filename:
        continue
    if "iaa" in filename:
        continue

    ds_name = filename.replace("_fn.json", "").replace("_fp.json", "")
    l += [ds_name]

for ds_name in set(l):
    if ds_name in cot_list:
        test = find_errors(ds_name, mode="cot")
    else:
        test = find_errors(ds_name)

    number_errors[ds_name] = len(test[test['gpt-4o-mini_label'].astype(str)!=test['label'].astype(str)])
    # print(test[test['gpt-4o-mini_label'].astype(str)!=test['label'].astype(str)].sample(1)[['gpt-4o-mini_label', 'label']])

test = find_errors("lobbymap_pages", mode="cot")
number_errors['lobbymap_query_p'] = len(test[(1*test['label']).astype(str) != test['gpt-4o-mini_label'].astype(str)])

test = find_errors("lobbymap_stance_origin")
test = test[~test['stance'].isna()]
number_errors['lobbymap_query_stance'] = len(test[test['stance'].astype(str) != test['gpt-4o-mini_label'].astype(str)])

number_errors

In [None]:
error_analysis_df['error_annotation'] = error_analysis_df['comment'].apply(lambda x: x.split(":")[0].split(","))

In [None]:
unique_error_types = error_analysis_df['error_annotation'].explode().unique()
unique_error_types

In [None]:
error_analysis_df['error_annotation'] = error_analysis_df['comment'].apply(lambda x: x.split(":")[0].split(","))

unique_error_types = error_analysis_df['error_annotation'].explode().unique()

error_analysis_df["Actual Error"] = error_analysis_df['error_annotation'].apply(
    lambda x: ("error" in x) | ("honest-mistake" in x) | ("indirect" in x) | ("implicit-describe" in x) | ("table" in x) | ("list" in x) # Check for last one (update annotations)
                                                                                )
def debtable(x):
    if "ambiguous" in x:
        return True
    if "main-branch" in x:
        return True
    if "cropped-small" in x:
        return True
    if "off-topic" in x:
        return True
    if "debatable" in x:
        return True
    if "out-of-context" in x: # Check if error/ooo or debatalbe only etc ?
        return True
    if "close" in x:
        return True
    if "page-selection" in x:
        return True
    if "nature-arg" in x: # Sure ?
        return True
    if "exhaustif" in x:
        return True
    
    return False
error_analysis_df["Debatable Error"] = error_analysis_df['error_annotation'].apply(debtable)
error_analysis_df["Multilabel"] = error_analysis_df['error_annotation'].apply(lambda x: ("multi" in x) | ("multiple" in x))
error_analysis_df["Mislabeled"] = error_analysis_df['error_annotation'].apply(lambda x: "wrong" in x)

In [None]:
annotated_error_analysis = error_analysis_df[(error_analysis_df['Actual Error'] | error_analysis_df['Debatable Error'] | error_analysis_df['Mislabeled'] | error_analysis_df['Multilabel'])]

In [None]:
annotated_error_analysis.loc[annotated_error_analysis['Debatable Error'], "error"] = "Debatable Error"
annotated_error_analysis.loc[annotated_error_analysis['Multilabel'], "error"] = "Multilabel"
annotated_error_analysis.loc[annotated_error_analysis['Actual Error'], "error"] = "Actual Error"
annotated_error_analysis.loc[annotated_error_analysis['Mislabeled'], "error"] = "Mislabeled"

In [None]:
total_errors = number_errors

In [None]:
mapping_labels = dict()
for ds_name in annotated_error_analysis['dataset_name']:
    mapping_labels[ds_name] = parse_dataset_name(ds_name)

mapping_labels

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Assuming 'number_errors' and 'total_errors' are defined and contain the total number of errors per dataset.
error_counts_per_dataset = annotated_error_analysis.groupby(['dataset_name', 'error']).size().unstack(fill_value=0)

# Add 'Not Annotated' column
error_counts_per_dataset['Not Annotated'] = error_counts_per_dataset.index.map(number_errors)
error_counts_per_dataset['Not Annotated'] = error_counts_per_dataset['Not Annotated'] - error_counts_per_dataset['Actual Error'] - error_counts_per_dataset['Debatable Error'] - error_counts_per_dataset['Mislabeled'] - error_counts_per_dataset['Multilabel']

error_type_colors = {
    'Actual Error':  '#3498DB',   # Blue (neutral)
    'Debatable Error': '#F39C12',  # Orange (ambiguous)
    'Mislabeled': '#E74C3C', # Red (critical)
    'Multilabel': '#9B59B6',  # Purple (distinct)
    'Not Annotated': '#95A5A6',  # Gray (uncertain)
}

error_counts_per_dataset.index = error_counts_per_dataset.index.map(mapping_labels)


# Create a figure with 2 rows (one for the total errors and one for the stacked bars)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 10), sharex=True)

# Plotting the total number of errors (this will be above the stacked bar plot)
error_counts_per_dataset['Total Errors'] = error_counts_per_dataset.sum(axis=1)

# Plotting the stacked bar plot below with custom colors
error_types = error_counts_per_dataset.drop('Total Errors', axis=1).columns
colors = [error_type_colors[error_type] for error_type in error_types]

ax1.bar(error_counts_per_dataset.index, error_counts_per_dataset['Total Errors'], color=error_type_colors['Not Annotated'], width=0.5)
ax1.set_title('Total Number of Errors per Dataset')
ax1.set_ylabel('Total Errors')
ax1.set_ylim(25, 665)


# Plotting the stacked bar plot below
error_counts_per_dataset.drop('Total Errors', axis=1).plot(kind='bar', stacked=True, color=colors, ax=ax2)

ax2.set_title('Error Types per Dataset')
ax2.set_xlabel('Dataset Name')
ax2.set_ylim(0, 25)
ax2.set_ylabel('Count of Error Types')

# Rotate x-axis labels for better visibility
ax2.set_xticklabels(error_counts_per_dataset.index, rotation=90)

# Add a legend
ax2.legend(title="Error Type", bbox_to_anchor=(0.5, -1), loc='upper center', ncol=len(error_types), frameon=False)

# Adjust layout to avoid overlap
plt.tight_layout()

# Show the plot
plt.show()


# IAA

In [None]:
def relax_annotation(comment):
    x =  comment.split(":")[0].split(",")

    Error = ("error" in x) | ("honest-mistake" in x) | ("indirect" in x) | ("implicit-describe" in x) | ("table" in x) | ("list" in x)
    
    def debtable(x):
        if "ambiguous" in x:
            return True
        if "main-branch" in x:
            return True
        if "cropped-small" in x:
            return True
        if "off-topic" in x:
            return True
        if "debatable" in x:
            return True
        if "out-of-context" in x: # Check if error/ooo or debatalbe only etc ?
            return True
        if "close" in x:
            return True
        if "page-selection" in x:
            return True
        if "nature-arg" in x: # Sure ?
            return True
        if "exhaustif" in x:
            return True
        
        return False
    
    Debatable = debtable(x)
    Multilabel = ("multi" in x) | ("multiple" in x)
    Wrong = "wrong" in x

    if Wrong:
        return "wrong"
    elif Error:
        return "error"
    elif Multilabel:
        return "multi"
    else:
        return "debatable"

In [None]:
import os
import json

path_1 = "error_analysis/"
path_2 = "error_analysis/annotator2/"

all_data = pd.DataFrame()

for filename in os.listdir(path_2):
    print(filename)
    
    with open(path_1+filename, "r", encoding="utf-8") as file:
        data_1 = pd.read_json(file)
        data_1['error_type']=data_1['comment'].apply(relax_annotation)
        data_1["dataset"] = filename[:-8]

    with open(path_2+filename, "r", encoding="utf-8") as file:
        data_2 = pd.read_json(file)
        data_2['error_type']=data_2['comment'].apply(relax_annotation)
        data_2["dataset"] = filename[:-8]

    data = data_1.merge(data_2, how="left", on=['text', 'label', 'gpt-4o-mini_label', 'gpt-4o-mini_explanation', "dataset"], suffixes=("_1", "_2"))
    all_data = pd.concat([all_data, data])

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import cohen_kappa_score, confusion_matrix

# Load the data
df = all_data[["dataset", "error_type_1", "error_type_2"]].copy()  # Replace with your actual file path

df = df[df["dataset"].isin(['climateEng', 'climate_detection', 'climate_sentiment', 'green_claims_3'])].copy()

# Assuming columns are named "annotator_1" and "annotator_2"
labels = ["error", "wrong", "multi", "debatable"]

# Compute Cohen's Kappa
kappa = cohen_kappa_score(df["error_type_1"], df["error_type_2"], labels=labels)
print(f"Cohen's Kappa: {kappa:.3f}")

# Compute confusion matrix
conf_matrix = confusion_matrix(df["error_type_1"], df["error_type_2"], labels=labels)

# Convert to DataFrame for visualization
conf_matrix_df = pd.DataFrame(conf_matrix, index=labels, columns=labels)

# Plot the confusion matrix
plt.figure(figsize=(6,5))
sns.heatmap(conf_matrix_df, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels)
plt.xlabel("Annotator 2")
plt.ylabel("Annotator 1")
plt.title("Confusion Matrix of Annotations")
plt.show()

# Comparing LLM errors

In [None]:
from sklearn.metrics import f1_score

cot_list = ["netzero_reduction", 
"climate_tcfd_recommendations",
"climateBUG_data",
"green_claims",
"green_claims_3",
"climateStance",
"gw_stance_detection",
"climateFEVER_evidence",
"lobbymap_query",
"lobbymap_stance",
"lobbymap_pages",
"climatext",
"climatext_10k",
"climatext_claim",
"climatext_wiki",
"climate_detection"]

path = "paper_utils/error_analysis/"

l = []
for filename in os.listdir(path):
    if "iaa" in filename:
        continue
    if "lobbymap" in filename:
        continue
    if "_old" in filename:
        continue
    if "archive" in filename:
        continue
    if ".zip" in filename:
        continue
    if "FEVER" in filename:
        continue
    if "ClimaQA" in filename:
        continue

    ds_name = filename.replace("_fn.json", "").replace("_fp.json", "")
    l += [ds_name]

for ds_name in set(l):
    if ds_name in cot_list:
        test = find_errors(ds_name, mode="cot")
    else:
        test = find_errors(ds_name)

    annotated_errors = pd.DataFrame()
    if os.path.exists(path + ds_name + "_fp.json"):
        annotated_errors = pd.concat([annotated_errors, pd.read_json(path + ds_name + "_fp.json")])
    if os.path.exists(path + ds_name + "_fn.json"):
        annotated_errors = pd.concat([annotated_errors, pd.read_json(path + ds_name + "_fn.json")])

    annotated_errors['error_annotation'] = annotated_errors['comment'].apply(lambda x: x.split(":")[0].split(","))
    annotated_errors["Multilabel"] = annotated_errors['error_annotation'].apply(lambda x: ("multi" in x) | ("multiple" in x))
    annotated_errors["Mislabeled"] = annotated_errors['error_annotation'].apply(lambda x: "wrong" in x)

In [None]:
import json
from src.logger import bootstrap_confidence_interval_saving

llama_path = "llm\outputs\llama\greenbench_llama_8B_cot.json" #"llm\outputs\llama\\results_climatext_llama_8B_cot.json" # "llm\outputs\llama\greenbench_llama_70B_cot_4bit.json"
llama_path_climatext = "llm\outputs\llama\\results_climatext_llama_8B_cot.json"
model_type = "Llama-8B" # Llama-70B

# Read the JSON file as a string
with open(llama_path, 'r') as file:
    json_string = file.read()

# Parse the JSON string into a dictionary
data = json.loads(json_string)
data = json.loads(data)

with open(llama_path_climatext, 'r') as file:
    json_string = file.read()

data_clima = json.loads(json_string)
data_clima = json.loads(data_clima)

data = {**data, **data_clima}

def get_outputs(data, dataset_name):
    outputs = []
    for _data in data[dataset_name]:
        outputs.append(_data[2]['content'])
    return outputs

In [None]:

llama_path = "llm\outputs\llama\greenbench_llama_70B_cot_4bit.json" #"llm\outputs\llama\\results_climatext_llama_8B_cot.json" # "llm\outputs\llama\greenbench_llama_70B_cot_4bit.json"
llama_path_climatext = "llm\outputs\llama\\results_climatext_llama_70B_cot.json"
model_type = "Llama-8B" # Llama-70B

# Read the JSON file as a string
with open(llama_path, 'r') as file:
    json_string = file.read()

# Parse the JSON string into a dictionary
data_70b = json.loads(json_string)
data_70b = json.loads(data_70b)

with open(llama_path_climatext, 'r') as file:
    json_string = file.read()

data_clima = json.loads(json_string)
data_clima = json.loads(data_clima)

data_70b = {**data_70b, **data_clima}

def get_outputs(data, dataset_name):
    outputs = []
    for _data in data[dataset_name]:
        outputs.append(_data[2]['content'])
    return outputs

In [None]:
def parse_llama(data, dataset_name):
    # Loading data from saved file
    results = []
    #result_file_name = f"llm/outputs/gpt-4o/{dataset_name}.jsonl" if use_gpt4 else f"llm/outputs/{dataset_name}.jsonl"
    results = get_outputs(data=data, dataset_name=dataset_name)
    
    labels = []
    explainations = []
    
    for result in results:
        label, explanation = parse_label_explanation(result, dataset_name=dataset_name)
        labels += [label]
        explainations += [explanation]
    
    # test = pd.read_parquet(os.path.join("doccano", "random", "parquet", f"{dataset_name}.pkl"))
    # test = pd.read_parquet(f'doccano\\random\\parquet\\{dataset_name}.pkl')
    test = pd.read_parquet(f'data\\llm_green_nlp_tasks\\{dataset_name}.pkl')


    if dataset_name == "lobbymap_pages":
        test['label'] = 1 * test['label']

    test['model_label'] = labels
    test['model_explanation'] = explainations

    if dataset_name == "sustainable_signals_review":
        test['model_label'] = test['model_label'].str.replace("Not Relevant", "Not relevant")
    
    if dataset_name == "contrarian_claims":
        test['model_label'] = test['model_label'].str.replace("No claim", "No claim, No claim")

    if dataset_name == "lobbymap_query_origin":
        test = test[test['query'].astype(str) != "[None]"]
        dataset_name = "lobbymap_query"
    if dataset_name == "lobbymap_stance_origin":
        test = test[test['query'].astype(str) != "None"]
        test.rename(columns={'stance':'label'}, inplace=True)
        dataset_name = "lobbymap_stance"

    return test

In [None]:
from ast import literal_eval
from sklearn.preprocessing import MultiLabelBinarizer

total_llama = 0
total_llama_70b = 0
total_gpt = 0
total_errors = 0
Total_common_8b = 0
Total_8b = 0
Total_common_70b = 0
Total_70b = 0

performance_type = "f1_score"

if os.path.exists(perf_file_path):
    performances = pd.read_csv(perf_file_path)
else:
    performances = pd.DataFrame()

print("dataset & Llama 8B & Llama 70B & Sampled & Llama 8B & Llama 70B \\")

# set(prompts.keys())-{"lobbymap_query", "logicClimate"}
# for dataset_name in set(prompts.keys())-{"lobbymap_query", "logicClimate", "lobbymap", 'climatext_10k', "lobbymap_stance"}:
for dataset_name in set(prompts.keys())-{"lobbymap_query", "logicClimate", "lobbymap", "lobbymap_stance"}:
    test = parse_llama(data, dataset_name)
    test_70b = parse_llama(data_70b, dataset_name)
    
    if dataset_name in ["logicClimate", "lobbymap_query"]:
        if dataset_name == "logicClimate":
            y_true = test['label'].apply(literal_eval)
            y_pred = test['model_label'].apply(lambda x: [e.strip().lower() for e in x.split(",")])
        elif dataset_name == "lobbymap_query":
            y_true = test['label'].apply(lambda x: [map_lobbymap_stance[e] for e in x])
            y_pred = test['model_label'].apply(lambda x: [e.strip() for e in x.split(",")])   
        
        continue

    else:
        if dataset_name in label_readable_mapping:
            label2id = {v.lower(): k for k, v in label_readable_mapping[dataset_name]['labels'].items()}
            test['model_label'] = test['model_label'].str.lower().map(label2id)
            test_70b['model_label'] = test_70b['model_label'].str.lower().map(label2id)

        if dataset_name in cot_list:
            # print("worked", dataset_name)
            test_gpt = find_errors(dataset_name, mode="cot")
        else:
            # print("not cot", dataset_name)
            test_gpt = find_errors(dataset_name) 

        # Replace non existing labels by the most frequent
        test.loc[~test['model_label'].astype(str).isin(test['label'].astype(str).unique()), "model_label"] = test['label'].mode()[0]
        test_70b.loc[~test_70b['model_label'].astype(str).isin(test_70b['label'].astype(str).unique()), "model_label"] = test_70b['label'].mode()[0]

        error_llm = test[test['model_label'].astype(str)!=test['label'].astype(str)].copy()
        error_llm_70b = test_70b[test_70b['model_label'].astype(str)!=test_70b['label'].astype(str)].copy()
        error_gpt = test_gpt[test_gpt['gpt-4o-mini_label'].astype(str) != test_gpt['label'].astype(str)]

        annotated_errors = pd.DataFrame()
        if os.path.exists(path + dataset_name + "_fp.json"):
            annotated_errors = pd.concat([annotated_errors, pd.read_json(path + dataset_name + "_fp.json")])
        if os.path.exists(path + dataset_name + "_fn.json"):
            annotated_errors = pd.concat([annotated_errors, pd.read_json(path + dataset_name + "_fn.json")])

        if len(annotated_errors)>0:
            if dataset_name in ["lobbymap_stance", "climaQA", 'climateFEVER_evidence']:
                annotated_errors = annotated_errors.merge(error_llm, on=['text', 'query'], how='left')
                annotated_errors = annotated_errors.merge(error_llm_70b, on=['text', 'query'], how='left', suffixes=("", "_70b"))

                common_gpt_70b = error_gpt.merge(error_llm_70b, how="inner", on=['text', 'query'])
                all_gpt_70b = error_gpt.merge(error_llm_70b, how="outer", on=['text', 'query'])
                prop_gpt_70b = len(common_gpt_70b)#/len(error_llm_70b)
                prop_not_gpt_70b = (len(error_llm_70b))#-len(common_gpt_70b))#/len(error_llm_70b)

                common_gpt_8b = error_gpt.merge(error_llm, how="inner", on=['text', 'query'])
                all_gpt_8b = error_gpt.merge(error_llm, how="outer", on=['text', 'query'])
                prop_gpt_8b = len(common_gpt_8b)#/len(error_llm)
                prop_not_gpt_8b = (len(error_llm))#-len(common_gpt_8b))#/len(error_llm)
            else:
                annotated_errors = annotated_errors.merge(error_llm, on='text', how='left')
                annotated_errors = annotated_errors.merge(error_llm_70b, on='text', how='left', suffixes=("", "_70b"))

                common_gpt_70b = error_gpt.merge(error_llm_70b, how="inner", on="text")
                all_gpt_70b = error_gpt.merge(error_llm_70b, how="outer", on="text")
                prop_gpt_70b = len(common_gpt_70b)#/len(error_llm_70b)
                prop_not_gpt_70b = (len(error_llm_70b))#-len(common_gpt_70b))#/len(error_llm_70b)


                common_gpt_8b = error_gpt.merge(error_llm, how="inner", on="text")
                all_gpt_8b = error_gpt.merge(error_llm, how="outer", on="text")
                prop_gpt_8b = len(common_gpt_8b)#/len(error_llm)
                prop_not_gpt_8b = (len(error_llm))#-len(common_gpt_8b))#/len(error_llm)

                # common_gpt_8b = error_gpt.merge(error_llm, how="inner", on="text")
                # all_gpt_8b = error_gpt.merge(error_llm, how="outer", on="text")
                # prop_gpt_8b = len(common_gpt_8b)/len(all_gpt_8b)

            total_llama += len(annotated_errors[annotated_errors["model_label"].notna()])
            total_llama_70b += len(annotated_errors[annotated_errors["model_label_70b"].notna()])
            total_errors += len(annotated_errors)
            total_gpt += len(error_gpt)

            Total_common_8b += prop_gpt_8b
            Total_8b += prop_not_gpt_8b
            Total_common_70b += prop_gpt_70b
            Total_70b += prop_not_gpt_70b


            print(parse_dataset_name(dataset_name), "&",
                len(annotated_errors[annotated_errors["model_label"].notna()]), "&", 
                len(annotated_errors[annotated_errors["model_label_70b"].notna()]), "&", 
                len(annotated_errors), "&",
                str(int(np.round(prop_gpt_8b,0))), "&",
                str(int(np.round(prop_not_gpt_8b,0))), "&",
                str(int(np.round(prop_gpt_70b,0))),  "&",
                str(int(np.round(prop_not_gpt_70b,0))), "&",
                str(int(len(error_gpt))),
                "\\\\")     

print("\\midrule")
print("Total", "&",
                total_llama, "&", 
                total_llama_70b, "&", 
                total_errors, "&",
                str(int(np.round(Total_common_8b,0))), "&",
                str(int(np.round(Total_8b,0))), "&",
                str(int(np.round(Total_common_70b,0))),  "&",
                str(int(np.round(Total_70b,0))), "&",
                str(int(total_gpt)),
                "\\\\")  