"""
01_judgments_list.ipynb
"""

In [307]:
### IMPORT ###
from pathlib import Path
from typing import List
from bs4 import BeautifulSoup
from datetime import datetime
import pandas as pd
import openai
from openai import OpenAI
import os
import csv

In [308]:
### LOCAL IMPORT ###
from config import config_reader
from utilities import create_directory_with_gitkeep

In [309]:
### GLOBALS ###
yaml_config = config_reader.config_read_yaml("config.yml", "config")

# OpenAI
openai.api_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# inputs
judgments_raw_dir = str(yaml_config["JUDGMENTS_RAW_DIR"])
judgments_ext = str(yaml_config["JUDGMENTS_EXT"])
starting_pqm_markers = ['PQM', 'P.Q.M.', 'pqm', 'p.q.m.']

# outputs
judgments_clean_dir = str(yaml_config["JUDGMENTS_CLEAN_DIR"])
pqm_file_texts = str(yaml_config["PQM_FILE_TEXTS"])
pqm_file_labels = str(yaml_config["PQM_FILE_LABELS"])


# Functions

In [310]:
def get_files_with_extension(directory: str, extension: str) -> List[str]:
    """
    Retrieve all files with the specified extension from the given directory using pathlib.

    Args:
        directory (str): The path of the directory in which to search for files.
        extension (str): The file extension to filter by (e.g. '.txt').

    Returns:
        List[str]: A list containing the full paths of the files with the given extension.
    """
    path = Path(directory)
    return [str(file) for file in path.rglob(f"*{extension}") if file.is_file()]

In [311]:
def extract_paragraphs(file_path: str, starting_markers: list) -> list:
    """Extract non-empty paragraph texts from an HTML file.

    This function utilises BeautifulSoup to parse the HTML file located at the given file path.
    It returns a list of strings corresponding to the text content of each non-empty <p> element
    that is found after the first occurrence of a <p> element with the class "fatto" whose text matches
    any of the provided starting markers (excluded) and before the first occurrence of a <p> element
    with the class "tabula" (also excluded).

    Args:
        file_path (str): The path to the HTML file.
        starting_markers (list): A list of strings considered as valid starting markers.

    Returns:
        list: A list of strings containing the extracted paragraph texts.
    """
    with open(file_path, 'r', encoding='utf-8') as file:
        html_content = file.read()

    soup = BeautifulSoup(html_content, 'html.parser')

    # Locate the starting marker: a <p class="fatto"> element with text matching one of the starting markers.
    start_tag = None
    for tag in soup.find_all('p', class_='fatto'):
        text = tag.get_text(strip=True)
        if any(text == marker for marker in starting_markers):
            start_tag = tag
            break

    if start_tag is None:
        return []

    paragraphs = []
    # Iterate over subsequent <p> elements until a <p> with class "tabula" is found.
    for p_tag in start_tag.find_all_next('p'):
        if 'tabula' in p_tag.get('class', []):
            break
        text = p_tag.get_text(strip=True)
        if text:
            paragraphs.append(text)

    return paragraphs

In [312]:
def store_paragraphs_in_dataframe(file_path: str) -> pd.DataFrame:
    """Store the file name and extracted paragraphs in a pandas DataFrame.

    This function extracts paragraphs from the given HTML file using the extract_paragraphs function,
    then creates and returns a pandas DataFrame containing the file name (without the full path) and
    each extracted paragraph. If multiple paragraphs are extracted, each paragraph will be stored as a
    separate row in the DataFrame with the same file name.

    Args:
        file_path (str): The path to the HTML file.

    Returns:
        pd.DataFrame: A DataFrame with two columns: 'file_name' and 'paragraph'. Each row contains the
        file name and one of the extracted paragraphs.
    """
    file_name = Path(file_path).name
    data = [{'file_name': file_name, 'paragraph': paragraph} for paragraph in paragraphs]
    return pd.DataFrame(data)

In [313]:
def analyse_judgement(text: str) -> str:
    """Analyse the given judgement paragraph and determine whether the decision is 'accolta' or 'respinta'.

    If the text does not contain a clear decision (e.g. headers, notes, or lists of names),
    the function returns 'none'.

    Args:
        text (str): The paragraph text to analyse.

    Returns:
        str: 'accolta', 'respinta', or 'none' if no clear decision is present.
    """
    messages = [
        {
            "role": "system",
            "content": (
                "You are a legal expert. When provided with the text of a judgement, determine if the decision is "
                "'accolta' (granted) or 'respinta' (dismissed). If the text contains only non-decisive content such as "
                "headers, notes, lists of names, or procedural indications, respond exclusively with 'None'. "
                "Reply only with one of the following words: 'accolta', 'respinta', or 'None'."
            )
        },
        {
            "role": "user",
            "content": f"Please analyse the following judgement text:\n\n{text}"
        }
    ]

    try:
        response = client.chat.completions.create(model="gpt-3.5-turbo",
        messages=messages,
        temperature=0,
        max_tokens=10)
    except Exception as e:
        print("Error during API request:", e)
        return "Error"

    # print("Result:", response) # debug

    result = response.choices[0].message.content.strip().lower()

    if result in {"accolta", "respinta"}:
        return result
    else:
        return None

# Main

In [314]:
print()
print("*** PROGRAM START ***")
print()


*** PROGRAM START ***



In [315]:
start_time = datetime.now().replace(microsecond=0)
print("Start process:", str(start_time))
print()

Start process: 2025-02-04 14:47:27



In [316]:
print("Preparing outputs")
create_directory_with_gitkeep(judgments_clean_dir)

Preparing outputs
Directory 'judgments_clean' and .gitkeep file created


In [317]:
output_file_path = Path(judgments_clean_dir) / pqm_file_labels
file_exists = output_file_path.exists()
with output_file_path.open(mode='a', newline='', encoding='utf-8') as csvfile:
    writer = csv.writer(csvfile, delimiter=";")
    # If the file does not exist, write the header row.
    if not file_exists:
        writer.writerow(["file_name", "label"])

In [318]:
print("> Listing file in TAR Judgments")
print("Directory:", judgments_raw_dir)
list_files = get_files_with_extension(judgments_raw_dir, judgments_ext)
list_files_len = len(list_files)
print("Files found:", list_files_len)

> Listing file in TAR Judgments
Directory: judgments_raw
Files found: 129


# PQM extraction

In [319]:
print("> Reading TAR Judgments file")
all_data = []
for count, judgment_file in enumerate(list_files, start=1):
    print(f"[{count} / {list_files_len}] - {judgment_file}")
    list_para = extract_paragraphs(judgment_file, starting_pqm_markers)
    file_name = Path(judgment_file).name # get only the file from complete path
    data = [{'file_name': file_name, 'paragraph': paragraph} for paragraph in list_para]
    all_data.extend(data)
df_judg = pd.DataFrame(all_data) # create an unique dataframe

> Reading TAR Judgments file
[1 / 129] - judgments_raw/tar_to_202200005_01.html
[2 / 129] - judgments_raw/tar_to_202200008_01.html
[3 / 129] - judgments_raw/tar_to_202200023_20.html
[4 / 129] - judgments_raw/tar_to_202200041_01.html
[5 / 129] - judgments_raw/tar_to_202200042_01.html
[6 / 129] - judgments_raw/tar_to_202200045_01.html
[7 / 129] - judgments_raw/tar_to_202200046_01.html
[8 / 129] - judgments_raw/tar_to_202200053_01.html
[9 / 129] - judgments_raw/tar_to_202200055_01.html
[10 / 129] - judgments_raw/tar_to_202200056_01.html
[11 / 129] - judgments_raw/tar_to_202200058_01.html
[12 / 129] - judgments_raw/tar_to_202200061_01.html
[13 / 129] - judgments_raw/tar_to_202200063_01.html
[14 / 129] - judgments_raw/tar_to_202200077_20.html
[15 / 129] - judgments_raw/tar_to_202200086_01.html
[16 / 129] - judgments_raw/tar_to_202200091_01.html
[17 / 129] - judgments_raw/tar_to_202200092_01.html
[18 / 129] - judgments_raw/tar_to_202200116_20.html
[19 / 129] - judgments_raw/tar_to_202200125_

In [320]:
df_judg.head(10)

Unnamed: 0,file_name,paragraph
0,tar_to_202200005_01.html,Il Tribunale Amministrativo Regionale per il P...
1,tar_to_202200005_01.html,Spese compensate.
2,tar_to_202200005_01.html,Ordina che la presente sentenza sia eseguita d...
3,tar_to_202200005_01.html,Così deciso in Torino nella camera di consigli...
4,tar_to_202200005_01.html,"Gianluca Bellucci,\tPresidenteSilvia Cattaneo,..."
5,tar_to_202200008_01.html,Il Tribunale Amministrativo Regionale per il P...
6,tar_to_202200008_01.html,Condanna la società ricorrente alla rifusione ...
7,tar_to_202200008_01.html,Ordina che la presente sentenza sia eseguita d...
8,tar_to_202200008_01.html,Così deciso in Torino nella camera di consigli...
9,tar_to_202200008_01.html,"Vincenzo Salamone,\tPresidenteFlavia Risso,\tP..."


In [321]:
print("> Saving")
path_out = Path(judgments_clean_dir) / pqm_file_texts
print("Path:", path_out)
df_judg.to_csv(path_out, sep=";", index=False, quoting=csv.QUOTE_MINIMAL)

> Saving
Path: judgments_clean/pqm_texts.csv


In [None]:
# Labelling phase
# Iterate over each row in the DataFrame.
count = 0
df_judg_len = len(df_judg)
if df_judg_len > 0:
    for _, row in df_judg.iterrows():
        count+=1
        file_name = row["file_name"]
        paragraph = row["paragraph"]
        result = analyse_judgement(paragraph)
        print(f"[{count} / {df_judg_len}] Result ({file_name}): {result}")
        # Only append the row if a clear decision is detected.
        if result != None:
            with output_file_path.open(mode='a', newline='', encoding='utf-8') as csvfile:
                writer = csv.writer(csvfile, delimiter=";")
                writer.writerow([file_name, result])

Result (tar_to_202200005_01.html): respinta respinta
Result (tar_to_202200005_01.html): None None
Result (tar_to_202200005_01.html): None None
Result (tar_to_202200005_01.html): None None
Result (tar_to_202200005_01.html): None None
Result (tar_to_202200008_01.html): respinta respinta
Result (tar_to_202200008_01.html): accolta accolta
Result (tar_to_202200008_01.html): None None
Result (tar_to_202200008_01.html): None None
Result (tar_to_202200008_01.html): None None
Result (tar_to_202200023_20.html): respinta respinta
Result (tar_to_202200023_20.html): None None
Result (tar_to_202200023_20.html): None None
Result (tar_to_202200023_20.html): None None
Result (tar_to_202200023_20.html): None None
Result (tar_to_202200041_01.html): respinta respinta
Result (tar_to_202200041_01.html): respinta respinta
Result (tar_to_202200041_01.html): None None
Result (tar_to_202200041_01.html): None None
Result (tar_to_202200041_01.html): None None
Result (tar_to_202200042_01.html): respinta respinta
R

In [None]:
end_time = datetime.now().replace(microsecond=0)
delta_time = end_time - start_time
total_seconds = int(delta_time.total_seconds())
minutes, seconds = divmod(total_seconds, 60)

print()
print("End process:", end_time)
print(f"Time to finish ({delta_time}): {minutes} minutes, {seconds} seconds")

In [None]:
print()
print("*** PROGRAM START ***")
print()