# Guida all'Analisi Automatica dei Documenti KID

Questo notebook è stato sviluppato per automatizzare il processo di acquisizione, trasformazione ed estrazione di dati da documenti in formato "Key Information Document" (KID) o altri documenti PDF simili. L'obiettivo è facilitare l'analisi e la comprensione delle informazioni contenute in questi documenti.

## Panoramica del Flusso Operativo:

Il processo si articola in diverse fasi principali, come evidenziato nelle sezioni successive di questo notebook:

1.  **Installazione delle Dipendenze**: Vengono installate le librerie Python necessarie per la manipolazione dei PDF e la loro conversione. Nello specifico, si installa `pdf-to-markdown` e le sue dipendenze, come `pdfminer.six`, essenziali per convertire i PDF in un formato testuale più gestibile [2].

2.  **Scarico dei PDF**: Il notebook procede al download dei documenti PDF da un elenco di URL. I log mostrano che vengono trovati e scaricati numerosi file PDF, con alcuni tentativi di recupero in caso di timeout o errori di accesso (es. "Access denied" o "File not found") [3-5]. I file scaricati vengono salvati in una directory locale [4, 5].

3.  **Conversione da PDF a Markdown**: Dopo aver scaricato i PDF, il notebook avvia un processo per convertirli nel formato Markdown. Questo passaggio è cruciale per standardizzare il contenuto e renderlo facilmente processabile da modelli di linguaggio [6, 7]. I file Markdown risultanti vengono caricati e la loro dimensione (in caratteri) viene registrata [7].

4.  **Generazione e Lancio dei Prompt (Analisi Intelligente)**: Vengono generati dei "prompt intelligenti" basati sul contenuto dei file Markdown. Questi prompt vengono poi utilizzati per interrogare un modello di linguaggio (probabilmente GPT, dato il riferimento a "Processing KID documents with GPT") per estrarre informazioni strutturate dai documenti. Il processo monitora l'avanzamento, il numero di KID estratti e segnala eventuali risposte troncate o problemi nel formato JSON [7-9].

5.  **Estrazione e Pulizia dei Dati in Pandas**: Le informazioni estratte dal modello di linguaggio vengono caricate e organizzate in un DataFrame Pandas [9]. Successivamente, viene eseguito un processo di "pulizia del dato" e uniformazione avanzata del dataset per standardizzare le colonne e i valori, preparandoli per ulteriori analisi [9-11].

Questo flusso permette di trasformare una collezione di documenti PDF non strutturati in un dataset pulito e analizzabile, pronto per essere utilizzato per scopi di ricerca, comparazione o reportistica.

## 1. Installazione delle Dipendenze Essenziali

Questa sezione del notebook si occupa di configurare l'ambiente installando tutte le librerie Python necessarie per l'analisi dei documenti. È un passaggio fondamentale per assicurare che tutte le funzionalità successive possano operare correttamente.

### Scopo:

Lo scopo principale è fornire al notebook gli strumenti software per:
*   Interagire con i file PDF.
*   Convertire il contenuto dei PDF in un formato testuale leggibile e processabile.

### Dettagli Tecnici:

Come puoi vedere dall'output sottostante, il processo installa le seguenti librerie chiave [1]:

*   **`pdf-to-markdown`**: Questa è la libreria principale per convertire i documenti PDF nel formato Markdown. È cruciale per trasformare il contenuto non strutturato dei PDF in un formato più standardizzato, facilitando l'estrazione di informazioni.
*   **`pdfminer.six`**: È una dipendenza di `pdf-to-markdown` e costituisce un'infrastruttura robusta per l'estrazione di testo e altre informazioni dai documenti PDF [1].
*   Altre dipendenze come `charset-normalizer`, `cryptography`, `cffi`, e `pycparser` sono automaticamente gestite e installate (o verificate se già presenti) per supportare il funzionamento di `pdfminer.six` [1].

L'output mostra il download e la preparazione dei metadati, la risoluzione delle dipendenze e, infine, la conferma dell'avvenuta installazione di `pdf-to-markdown` e `pdfminer.six` [1]. Questo assicura che il notebook sia pronto per procedere con le fasi successive di scaricamento e conversione dei documenti.

In [None]:
pip install pdf-to-markdown

Collecting pdf-to-markdown
  Downloading pdf-to-markdown-0.1.0.tar.gz (6.8 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pdfminer.six (from pdf-to-markdown)
  Downloading pdfminer_six-20250506-py3-none-any.whl.metadata (4.2 kB)
Downloading pdfminer_six-20250506-py3-none-any.whl (5.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m60.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pdf-to-markdown
  Building wheel for pdf-to-markdown (setup.py) ... [?25l[?25hdone
  Created wheel for pdf-to-markdown: filename=pdf_to_markdown-0.1.0-py3-none-any.whl size=8139 sha256=bbdd6d80fd1a09c9ae7c8fac64e816d1b04afd7371fb0fdf2549db20de7f3f72
  Stored in directory: /root/.cache/pip/wheels/fd/dd/fe/f94c16513eef3f4f229e16a9c88c3e14f6f38b400d24f3b631
Successfully built pdf-to-markdown
Installing collected packages: pdfminer.six, pdf-to-markdown
Successfully installed pdf-to-markdown-0.1.0 pdfminer.six-20250506


## 2. Scaricamento dei Documenti PDF

Questa sezione si occupa del reperimento dei documenti "Key Information Document" (KID) o altri file PDF rilevanti da una lista di URL predefinita (indicata come "CSV" nei log [2]). È un passo critico per raccogliere i dati grezzi che verranno successivamente analizzati.

### Scopo:

L'obiettivo di questa fase è scaricare in modo automatizzato tutti i documenti PDF necessari per l'analisi, creando una collezione locale di file da processare.

### Dettagli del Processo:

Il notebook esegue le seguenti operazioni, come evidenziato dagli output [2, 3]:

*   **Identificazione dei Record**: Il processo inizia con la rilevazione di un certo numero di record (es. "Found 60 records in CSV" [2]), presumibilmente corrispondenti a URL di documenti.
*   **Tentativi di Download**: Per ogni URL, il notebook tenta di scaricare il file PDF. I log mostrano un "attempt 1" (tentativo 1) per ogni download [2, 3].
*   **Gestione degli Errori**: Il sistema è progettato per gestire diverse casistiche durante lo scaricamento:
    *   **Download Riuscito (`✓ Saved`)**: Quando un file viene scaricato correttamente, viene visualizzato un messaggio di successo e il percorso in cui è stato salvato, ad esempio `✓ Saved: downloaded_pdfs/A>>>>>a_Assicurazioni_S.p.A._Partner_M>>>>_Opportunity.pdf` [2].
    *   **Accesso Negato (`❌ Access denied (403)`)**: Questo errore si verifica quando il server rifiuta la richiesta di accesso al file, spesso a causa di permessi insufficienti. Un esempio è `❌ Access denied (403) for https://www.c>>vita.it/documenti/...` [2].
    *   **File Non Trovato (`❌ File not found (404)`)**: Indica che l'URL è valido ma il file specifico non esiste sul server. Un esempio è `❌ File not found (404) for https://www.z>>>>>.it/api/ArchivioDigitale/...` [2].
    *   **Timeout**: Si verifica quando la connessione al server impiega troppo tempo per rispondere o completare il download. Il notebook tenta di riprovare il download dopo un breve intervallo (`Retrying in 1 seconds...`, `Retrying in 2 seconds...`) come mostrato per i file `n>>>>>>>>.it` [2].

Questa fase è fondamentale per costruire il corpus di documenti su cui verrà eseguita l'analisi, e la gestione degli errori è cruciale per identificare quali documenti non sono stati acquisiti e perché.

In [None]:
import pandas as pd
import requests
import os
import re
from urllib.parse import urlparse
import time

# Assicurati che il file CSV sia stato caricato nell'ambiente di Colab
csv_file_path = 'kid_compagnie_vita_italiane.csv'

def clean_filename(filename):
    """Pulisce il nome del file rimuovendo caratteri non validi"""
    # Rimuovi caratteri non validi per i nomi file
    filename = re.sub(r'[<>:"/\\|?*()[\]]', '_', filename)
    # Rimuovi spazi multipli e sostituisci con underscore
    filename = re.sub(r'\s+', '_', filename)
    # Rimuovi punti multipli
    filename = re.sub(r'\.+', '.', filename)
    # Limita la lunghezza del nome file
    if len(filename) > 200:
        filename = filename[:200]
    return filename

def download_with_retry(url, file_path, max_retries=3, delay=1):
    """Scarica un file con retry automatico"""
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept': 'application/pdf,application/octet-stream,*/*',
        'Accept-Language': 'it-IT,it;q=0.9,en;q=0.8',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1'
    }

    for attempt in range(max_retries):
        try:
            print(f"Downloading (attempt {attempt + 1}): {url}")

            # Timeout più lungo e headers per evitare blocchi
            response = requests.get(url, stream=True, headers=headers, timeout=30)
            response.raise_for_status()

            # Verifica che sia effettivamente un PDF
            content_type = response.headers.get('content-type', '')
            if 'pdf' not in content_type.lower() and 'octet-stream' not in content_type.lower():
                print(f"Warning: Content type is {content_type}, might not be a PDF")

            # Salva il PDF
            with open(file_path, 'wb') as pdf_file:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:  # Filtra i chunk vuoti
                        pdf_file.write(chunk)

            print(f"✓ Saved: {file_path}")
            return True

        except requests.exceptions.Timeout:
            print(f"Timeout for {url} (attempt {attempt + 1})")
        except requests.exceptions.ConnectionError as e:
            print(f"Connection error for {url}: {e} (attempt {attempt + 1})")
        except requests.exceptions.HTTPError as e:
            if response.status_code == 403:
                print(f"❌ Access denied (403) for {url}")
                return False
            elif response.status_code == 404:
                print(f"❌ File not found (404) for {url}")
                return False
            else:
                print(f"HTTP error {response.status_code} for {url} (attempt {attempt + 1})")
        except Exception as e:
            print(f"Unexpected error for {url}: {e} (attempt {attempt + 1})")

        if attempt < max_retries - 1:
            print(f"Retrying in {delay} seconds...")
            time.sleep(delay)
            delay *= 2  # Aumenta il delay per ogni retry

    return False

try:
    # Leggi il file CSV
    df = pd.read_csv(csv_file_path, delimiter=';')
    print(f"Found {len(df)} records in CSV")

    # Crea una directory per salvare i PDF (se non esiste)
    output_dir = 'downloaded_pdfs'
    os.makedirs(output_dir, exist_ok=True)

    # Statistiche
    successful_downloads = 0
    failed_downloads = 0
    skipped_downloads = 0

    # Itera sulle righe del DataFrame e scarica i PDF
    for index, row in df.iterrows():
        compagnia = row['Compagnia']
        prodotto = row['Prodotto']
        link_kid = row['Link_KID']

        # Verifica che il link non sia vuoto
        if pd.isna(link_kid) or not link_kid.strip():
            print(f"⚠️ Skipping empty link for {compagnia} - {prodotto}")
            skipped_downloads += 1
            continue

        # Pulisci i nomi per usarli nei nomi dei file
        clean_compagnia = clean_filename(compagnia)
        clean_prodotto = clean_filename(prodotto)
        file_name = f"{clean_compagnia}_{clean_prodotto}.pdf"
        file_path = os.path.join(output_dir, file_name)

        # Controlla se il file esiste già
        if os.path.exists(file_path):
            print(f"File already exists, skipping: {file_path}")
            skipped_downloads += 1
            continue

        # Scarica il PDF
        if download_with_retry(link_kid, file_path):
            successful_downloads += 1
        else:
            failed_downloads += 1

        # Piccola pausa tra i download per essere gentili con i server
        time.sleep(0.5)

    # Statistiche finali
    print("\n" + "="*50)
    print("DOWNLOAD SUMMARY")
    print("="*50)
    print(f"✓ Successful downloads: {successful_downloads}")
    print(f"❌ Failed downloads: {failed_downloads}")
    print(f"⚠️ Skipped downloads: {skipped_downloads}")
    print(f"📊 Total processed: {len(df)}")
    print(f"📁 Files saved in: {output_dir}")

except FileNotFoundError:
    print(f"❌ Error: The file '{csv_file_path}' was not found. Please upload it to your Colab environment.")
except Exception as e:
    print(f"❌ An error occurred while reading the CSV: {e}")

## 3. Conversione da PDF a Markdown

Dopo aver scaricato i documenti PDF, questa sezione del notebook si dedica alla loro trasformazione in un formato più gestibile e standardizzato: il Markdown. Questo passaggio è fondamentale per preparare i testi all'analisi automatizzata.

### Scopo:

L'obiettivo principale di questa fase è convertire il contenuto non strutturato dei documenti PDF in testo formattato in Markdown [1]. Il formato Markdown facilita l'estrazione di informazioni e la successiva elaborazione da parte di modelli di linguaggio, rendendo i documenti "leggibili" per gli algoritmi [1].

### Dettagli del Processo:

Come indicato dagli output, il processo di conversione include i seguenti passaggi:

*   **Avvio dell'Elaborazione**: Il notebook avvia l'elaborazione dei documenti PDF [2].
*   **Rilevamento dei File**: Viene riportato il numero di file PDF trovati e pronti per essere processati (ad esempio, "Trovati 67 file PDF da processare") [2].
*   **Conversione e Caricamento**: Ogni file PDF scaricato viene convertito in un file Markdown corrispondente. Successivamente, questi file Markdown vengono caricati, e il loro contenuto viene misurato in termini di numero di caratteri (ad esempio, "Loaded /content/markdown_output/...md: XXXXX characters") [3]. Questo controllo sulla dimensione dei file Markdown indica che la conversione è avvenuta con successo e che il contenuto testuale è stato acquisito [3].

Questa fase è cruciale per la normalizzazione del formato dei documenti, trasformando una collezione eterogenea di PDF in un corpus di testo uniforme pronto per l'analisi intelligente.

In [None]:
import os
import pandas as pd
import fitz  # PyMuPDF
import pytesseract
from PIL import Image
import io
import json
from datetime import datetime

def pdf_to_markdown_with_ocr(pdf_path, lang='ita+eng'):
    """
    Converte un PDF in markdown estraendo testo normale e facendo OCR sulle immagini

    Args:
        pdf_path: percorso del file PDF
        lang: lingue per OCR (default: italiano + inglese)

    Returns:
        tuple: (markdown_text, extraction_stats)
    """
    try:
        doc = fitz.open(pdf_path)
        markdown_text = ""
        stats = {
            'pages': len(doc),
            'text_pages': 0,
            'images_processed': 0,
            'ocr_text_found': 0,
            'errors': []
        }

        markdown_text += f"# {os.path.basename(pdf_path)}\n\n"
        markdown_text += f"*Estratto il: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n"

        for page_num in range(len(doc)):
            page = doc.load_page(page_num)

            # Estrai testo normale
            text = page.get_text()
            markdown_text += f"## Pagina {page_num + 1}\n\n"

            if text.strip():
                stats['text_pages'] += 1
                # Pulisci il testo (rimuovi righe vuote eccessive)
                cleaned_text = '\n'.join(line for line in text.split('\n') if line.strip())
                markdown_text += cleaned_text + "\n\n"

            # Estrai e processa immagini
            image_list = page.get_images()

            if image_list:
                markdown_text += f"### Immagini trovate: {len(image_list)}\n\n"

            for img_index, img in enumerate(image_list):
                try:
                    # Estrai l'immagine
                    xref = img[0]
                    pix = fitz.Pixmap(doc, xref)

                    if pix.n - pix.alpha < 4:  # GRAY o RGB
                        stats['images_processed'] += 1
                        img_data = pix.tobytes("png")
                        img_pil = Image.open(io.BytesIO(img_data))

                        # OCR sull'immagine
                        ocr_text = pytesseract.image_to_string(img_pil, lang=lang)

                        if ocr_text.strip():
                            stats['ocr_text_found'] += 1
                            markdown_text += f"#### Testo estratto da immagine {img_index + 1}:\n"
                            markdown_text += f"```\n{ocr_text.strip()}\n```\n\n"

                    pix = None

                except Exception as e:
                    error_msg = f"Errore nel processare immagine {img_index + 1} a pagina {page_num + 1}: {str(e)}"
                    stats['errors'].append(error_msg)
                    print(f"⚠️ {error_msg}")

        doc.close()
        return markdown_text, stats

    except Exception as e:
        error_msg = f"Errore nel processare {pdf_path}: {str(e)}"
        return f"# ERRORE\n\nImpossibile processare il file: {error_msg}\n", {'errors': [error_msg]}

def process_all_pdfs(pdf_directory='downloaded_pdfs', output_directory='markdown_output'):
    """
    Processa tutti i PDF in una directory e salva i risultati in markdown

    Args:
        pdf_directory: directory contenente i PDF
        output_directory: directory dove salvare i file markdown
    """

    # Installa le dipendenze necessarie
    print("🔧 Installazione dipendenze...")
    os.system('pip install pymupdf pillow pytesseract -q')
    os.system('apt install tesseract-ocr tesseract-ocr-ita -y -q')

    # Crea directory di output
    os.makedirs(output_directory, exist_ok=True)

    # Trova tutti i PDF
    if not os.path.exists(pdf_directory):
        print(f"❌ Directory {pdf_directory} non trovata!")
        return

    pdf_files = [f for f in os.listdir(pdf_directory) if f.lower().endswith('.pdf')]

    if not pdf_files:
        print(f"❌ Nessun PDF trovato in {pdf_directory}")
        return

    print(f"📁 Trovati {len(pdf_files)} file PDF da processare")

    # Statistiche globali
    global_stats = {
        'total_files': len(pdf_files),
        'successful': 0,
        'failed': 0,
        'total_pages': 0,
        'total_text_pages': 0,
        'total_images': 0,
        'total_ocr_extractions': 0,
        'processing_times': []
    }

    # Salva anche un file di log dettagliato
    log_data = []

    # Processa ogni PDF
    for i, pdf_file in enumerate(pdf_files, 1):
        pdf_path = os.path.join(pdf_directory, pdf_file)
        output_filename = pdf_file.replace('.pdf', '.md')
        output_path = os.path.join(output_directory, output_filename)

        print(f"\n📄 [{i}/{len(pdf_files)}] Processando: {pdf_file}")

        start_time = datetime.now()

        try:
            # Converti PDF in markdown
            markdown_content, stats = pdf_to_markdown_with_ocr(pdf_path)

            # Salva il file markdown
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write(markdown_content)

            processing_time = (datetime.now() - start_time).total_seconds()

            # Aggiorna statistiche
            global_stats['successful'] += 1
            global_stats['total_pages'] += stats.get('pages', 0)
            global_stats['total_text_pages'] += stats.get('text_pages', 0)
            global_stats['total_images'] += stats.get('images_processed', 0)
            global_stats['total_ocr_extractions'] += stats.get('ocr_text_found', 0)
            global_stats['processing_times'].append(processing_time)

            # Log dettagliato
            log_entry = {
                'file': pdf_file,
                'status': 'success',
                'processing_time': processing_time,
                'stats': stats,
                'output_file': output_filename
            }
            log_data.append(log_entry)

            print(f"✅ Completato in {processing_time:.1f}s - {stats.get('pages', 0)} pagine, {stats.get('images_processed', 0)} immagini")

        except Exception as e:
            global_stats['failed'] += 1
            error_msg = str(e)

            log_entry = {
                'file': pdf_file,
                'status': 'failed',
                'error': error_msg,
                'processing_time': (datetime.now() - start_time).total_seconds()
            }
            log_data.append(log_entry)

            print(f"❌ Errore: {error_msg}")

    # Salva log dettagliato
    log_path = os.path.join(output_directory, 'processing_log.json')
    with open(log_path, 'w', encoding='utf-8') as f:
        json.dump(log_data, f, indent=2, ensure_ascii=False, default=str)

    # Crea report riassuntivo
    create_summary_report(global_stats, log_data, output_directory)

    # Stampa statistiche finali
    print_final_statistics(global_stats, output_directory)

def create_summary_report(stats, log_data, output_dir):
    """Crea un report riassuntivo in markdown"""

    avg_time = sum(stats['processing_times']) / len(stats['processing_times']) if stats['processing_times'] else 0

    report = f"""# Report di Elaborazione PDF

*Generato il: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*

## Statistiche Generali

- **File totali processati:** {stats['total_files']}
- **Successi:** {stats['successful']} ✅
- **Fallimenti:** {stats['failed']} ❌
- **Tasso di successo:** {(stats['successful']/stats['total_files']*100):.1f}%

## Statistiche di Contenuto

- **Pagine totali elaborate:** {stats['total_pages']}
- **Pagine con testo:** {stats['total_text_pages']}
- **Immagini processate:** {stats['total_images']}
- **Estrazioni OCR riuscite:** {stats['total_ocr_extractions']}

## Performance

- **Tempo medio per file:** {avg_time:.1f} secondi
- **Tempo totale:** {sum(stats['processing_times']):.1f} secondi

## Dettagli per File

| File | Status | Tempo (s) | Pagine | Immagini | OCR |
|------|--------|-----------|---------|----------|-----|
"""

    for entry in log_data:
        status_icon = "✅" if entry['status'] == 'success' else "❌"
        time_val = f"{entry['processing_time']:.1f}"

        if entry['status'] == 'success':
            pages = entry['stats'].get('pages', 0)
            images = entry['stats'].get('images_processed', 0)
            ocr = entry['stats'].get('ocr_text_found', 0)
            report += f"| {entry['file']} | {status_icon} | {time_val} | {pages} | {images} | {ocr} |\n"
        else:
            report += f"| {entry['file']} | {status_icon} | {time_val} | - | - | - |\n"

    # Salva il report
    report_path = os.path.join(output_dir, 'REPORT_ELABORAZIONE.md')
    with open(report_path, 'w', encoding='utf-8') as f:
        f.write(report)

def print_final_statistics(stats, output_dir):
    """Stampa le statistiche finali"""
    print("\n" + "="*60)
    print("🎉 ELABORAZIONE COMPLETATA!")
    print("="*60)
    print(f"📊 File processati: {stats['successful']}/{stats['total_files']}")
    print(f"📄 Pagine elaborate: {stats['total_pages']}")
    print(f"🖼️ Immagini processate: {stats['total_images']}")
    print(f"🔍 Estrazioni OCR: {stats['total_ocr_extractions']}")

    if stats['processing_times']:
        avg_time = sum(stats['processing_times']) / len(stats['processing_times'])
        print(f"⏱️ Tempo medio: {avg_time:.1f}s per file")

    print(f"📁 File salvati in: {output_dir}")
    print(f"📋 Log dettagliato: {output_dir}/processing_log.json")
    print(f"📄 Report completo: {output_dir}/REPORT_ELABORAZIONE.md")

    if stats['failed'] > 0:
        print(f"⚠️ {stats['failed']} file non sono stati processati correttamente")

# Esegui il processamento
if __name__ == "__main__":
    print("🚀 Avvio elaborazione PDF...")
    process_all_pdfs()
    print("\n✨ Processo completato!")

## 4. Generazione e Lancio dei Prompt per l'Analisi Intelligente

Questa sezione rappresenta il cuore dell'analisi automatizzata, dove il contenuto testuale estratto dai documenti Markdown viene utilizzato per interrogare un modello di linguaggio avanzato, al fine di estrarre informazioni strutturate e rilevanti.

### Scopo:

L'obiettivo primario di questa fase è trasformare il testo non strutturato dei documenti in dati utilizzabili, attraverso l'interazione con un modello di Intelligenza Artificiale. Il processo è progettato per identificare e raccogliere automaticamente le informazioni chiave dai documenti, come indicato nel formato "Key Information Document" (KID) o simili.

### Dettagli del Processo:

Come evidenziato dagli output di questa sezione, il flusso di lavoro comprende:

*   **Preparazione dei Prompt**: Vengono creati dei "prompt intelligenti" (ad esempio, "Generated 68 intelligent analysis prompts for 68 documents") basati sul contenuto dei file Markdown precedentemente generati. Questi prompt sono formulati per guidare il modello di linguaggio nell'estrazione delle informazioni desiderate.
*   **Avvio dell'Estrazione**: Il processo di estrazione delle informazioni viene avviato, con un'indicazione chiara che si tratta di "Processing KID documents with GPT..." [1]. Questo suggerisce l'utilizzo di un modello avanzato per l'elaborazione.
*   **Monitoraggio dell'Avanzamento**: Durante l'esecuzione, il notebook fornisce aggiornamenti sullo stato di avanzamento (es. "Progress: 10/68", "Progress: 20/68", ecc.) [2]. Questo permette di tenere traccia di quanti documenti sono stati processati.
*   **Validazione e Stato delle Risposte**: Il sistema monitora la qualità delle risposte ottenute dal modello di linguaggio, classificandole in:
    *   `Valid JSON`: Risposte correttamente formattate in JSON e completamente elaborate [2].
    *   `Partial`: Risposte JSON parziali, che potrebbero indicare problemi nell'estrazione completa dei dati [2].
    *   `Invalid`: Risposte che non sono conformi al formato JSON atteso [2].
    *   Viene anche segnalato il numero di `Truncated responses` (risposte troncate), il che indica che il modello potrebbe aver raggiunto il limite di token nella sua risposta, impedendo un'estrazione completa [2].
*   **Conteggio delle Informazioni Estratte**: Il notebook traccia il `Total KIDs extracted` (numero totale di informazioni chiave estratte) [2], fornendo una metrica cumulativa del successo dell'estrazione.

Questa fase è cruciale per la trasformazione dei dati grezzi in un formato strutturato e pronto per l'analisi successiva, anche se occasionalmente si possono riscontrare limitazioni o problemi nelle risposte del modello.

*Prima provvediamo a mettere al sicuro i markdown che abbiamo generato*:

In [None]:
!sudo apt install p7zip-full p7zip-rar

In [3]:
!7z x mds.7z -o/markdown_output/


7-Zip [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21
p7zip Version 16.02 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,64 bits,2 CPUs Intel(R) Xeon(R) CPU @ 2.20GHz (406F0),ASM,AES-NI)

Scanning the drive for archives:
  0M Scan         1 file, 564451 bytes (552 KiB)

Extracting archive: mds.7z
--
Path = mds.7z
Type = 7z
Physical Size = 564451
Headers Size = 2062
Method = LZMA2:23
Solid = +
Blocks = 1

  0%    Everything is Ok

Files: 69
Size:       6849850
Compressed: 564451


In [None]:
!mkdir -p /content/markdown_output/
!mv /markdown_output/markdown_output/*.md /content/markdown_output/

In [17]:
import json
from datetime import datetime
import re

def generate_kid_analysis_prompts(kid_documents, output_file='prompts_kid_analysis.txt'):
    """
    Genera prompt per GPT-4.1 per identificare automaticamente e estrarre informazioni
    da uno o più KID contenuti in un documento markdown. GPT gestisce autonomamente
    la rilevazione e separazione dei KID multipli.

    Args:
        kid_documents (list): Lista di documenti markdown che possono contenere uno o più KID
        output_file (str): Nome del file in cui salvare i prompt generati

    Returns:
        list: Lista dei prompt generati in formato JSON
    """

    # System prompt che delega tutto a GPT
    system_prompt = """You are an expert financial analyst specializing in KID (Key Information Documents) analysis for insurance and investment products. Your task is to:

1. AUTOMATICALLY DETECT how many individual KID documents are contained within the provided markdown content
2. EXTRACT comprehensive structured data for EACH individual KID found
3. GENERATE descriptive texts for each KID
4. RETURN a JSON response that reflects the actual number of KIDs found

You must analyze the entire document and identify distinct KID sections based on headers like:
- "DOCUMENTO CONTENENTE LE INFORMAZIONI CHIAVE"
- "DOCUMENTO CONTENENTE LE INFORMAZIONI SPECIFICHE"
- Product name changes (e.g., different investment options)
- Clear section breaks between different financial products

For each KID found, extract complete information including numerical data, risk metrics, cost structures, target market, and generate engaging descriptive content. Return everything in a structured JSON format that clearly shows how many individual KIDs were processed."""

    prompts = []

    for i, kid_doc in enumerate(kid_documents):
        # Gestione input (stringa o dict)
        if isinstance(kid_doc, str):
            doc_content = kid_doc
            doc_id = f"DOC_{i+1}"
        elif isinstance(kid_doc, dict):
            doc_content = kid_doc.get('content', '')
            doc_id = kid_doc.get('id', f"DOC_{i+1}")
        else:
            print(f"Warning: Invalid document format for document {i+1}")
            continue

        if not doc_content.strip():
            print(f"Warning: Empty content for document {doc_id}")
            continue

        # User prompt che chiede a GPT di gestire tutto
        user_prompt = f"""Analyze the following markdown document and automatically identify how many individual KID (Key Information Documents) sections it contains. Then extract comprehensive information for each KID found.

DOCUMENT ID: "{doc_id}"
DOCUMENT CONTENT:
"{doc_content}"

INSTRUCTIONS:
1. First, identify how many distinct KID documents are present in this content
2. For each KID found, extract all information according to the structure below
3. Return a JSON with an array containing one object per KID identified

REQUIRED JSON STRUCTURE:
{{
  "document_analysis": {{
    "source_document_id": "{doc_id}",
    "analysis_date": "{datetime.now().isoformat()}",
    "total_kids_found": 0,
    "processing_notes": "Brief description of how many KIDs were identified and any challenges"
  }},
  "kids": [
    {{
      "kid_id": "auto_generated_unique_id",
      "kid_sequence": 1,
      "product_identification": {{
        "nome_prodotto": "",
        "compagnia": "",
        "data_documento": "",
        "tipo_prodotto": "",
        "durata_contratto": "",
        "periodo_versamenti": ""
      }},
      "investment_structure": {{
        "opzioni_investimento": [
          {{
            "nome_opzione": "",
            "allocazione_percentuale": "",
            "tipologia": "gestione_separata | fondo_interno",
            "obiettivo_investimento": "",
            "livello_rischio": "1-7",
            "tematiche_esg": []
          }}
        ]
      }},
      "risk_return_profile": {{
        "indicatore_rischio_sintetico": "1-7",
        "periodo_detenzione_raccomandato": "",
        "periodo_minimo": "",
        "protezione_capitale": "si/no",
        "valuta_rischio": "",
        "scenari_performance": {{
          "stress": {{
            "1_anno": {{"rimborso": "", "rendimento": ""}},
            "5_anni": {{"rimborso": "", "rendimento": ""}},
            "10_anni": {{"rimborso": "", "rendimento": ""}}
          }},
          "sfavorevole": {{
            "1_anno": {{"rimborso": "", "rendimento": ""}},
            "5_anni": {{"rimborso": "", "rendimento": ""}},
            "10_anni": {{"rimborso": "", "rendimento": ""}}
          }},
          "moderato": {{
            "1_anno": {{"rimborso": "", "rendimento": ""}},
            "5_anni": {{"rimborso": "", "rendimento": ""}},
            "10_anni": {{"rimborso": "", "rendimento": ""}}
          }},
          "favorevole": {{
            "1_anno": {{"rimborso": "", "rendimento": ""}},
            "5_anni": {{"rimborso": "", "rendimento": ""}},
            "10_anni": {{"rimborso": "", "rendimento": ""}}
          }}
        }}
      }},
      "cost_structure": {{
        "costi_ingresso": {{
          "percentuale_annua": "",
          "descrizione": ""
        }},
        "costi_uscita": {{
          "penali_primi_7_anni": "",
          "percentuale_max": "",
          "descrizione": ""
        }},
        "costi_gestione_annui": {{
          "commissioni_gestione": "",
          "costi_amministrativi": "",
          "costi_transazione": ""
        }},
        "incidenza_costi_totali": {{
          "1_anno": "",
          "5_anni": "",
          "10_anni": ""
        }},
        "ric_totale_costi": {{
          "min": "",
          "max": ""
        }}
      }},
      "insurance_benefits": {{
        "copertura_decesso": {{
          "base": "",
          "formula_calcolo": "",
          "bonus_completamento": ""
        }},
        "coperture_aggiuntive": [
          {{
            "nome": "",
            "prestazione": "",
            "premio_annuo": "",
            "descrizione": ""
          }}
        ],
        "prestazioni_vita": {{
          "riscatto_disponibile": "si/no",
          "modalita_riscatto": "",
          "vincoli_temporali": ""
        }}
      }},
      "target_market": {{
        "profilo_cliente": {{
          "tipologia": "retail/professionale",
          "esperienza_richiesta": "",
          "orizzonte_temporale": "",
          "profilo_rischio": "",
          "capacita_perdite": "",
          "importo_minimo_premio": "",
          "bisogni_primari": []
        }}
      }},
      "actuarial_metrics": {{
        "ter_medio_10_anni": "",
        "volatilita_rendimenti": "",
        "duration_portafoglio": "",
        "max_drawdown_stimato": "",
        "break_even_anni": ""
      }},
      "compliance_flags": {{
        "costi_elevati": "si/no",
        "complessita_alta": "si/no",
        "rischio_cambio": "si/no",
        "liquidita_limitata": "si/no",
        "mifid_compatibile": "si/no",
        "warnings_presenti": []
      }},
      "descriptive_texts": {{
        "executive_summary": "Brief 2-3 sentence summary of this specific KID's key characteristics",
        "product_description": "Detailed 200-300 word description of this specific product",
        "risk_assessment": "100-150 word analysis of the main risks for this specific product",
        "cost_analysis": "100-150 word explanation of the cost structure for this specific product",
        "suitability_notes": "100-150 word guidance on client suitability for this specific product"
      }},
      "extracted_text_sections": {{
        "product_objectives": "Verbatim text describing this KID's objectives",
        "investment_policy": "Verbatim text describing this KID's investment policy",
        "risk_warnings": "Verbatim risk warnings specific to this KID",
        "cost_disclosures": "Verbatim cost information for this KID",
        "target_market_description": "Verbatim target market text for this KID"
      }},
      "data_quality": {{
        "completeness_score": "1-10",
        "missing_fields": [],
        "extraction_confidence": "high/medium/low"
      }}
    }}
  ]
}}

CRITICAL REQUIREMENTS:
1. Identify ALL distinct KID sections in the document automatically
2. Do NOT split a single KID into multiple parts
3. Extract complete information for each individual KID found
4. Generate unique, descriptive kid_id for each KID (use product name + option if available)
5. The "total_kids_found" field must match the actual number of objects in the "kids" array
6. Use "N/A" for fields not present in the specific KID
7. Preserve all numerical precision from the original documents
8. Generate engaging, professional descriptive texts specific to each KID
9. Include verbatim text extractions from key sections of each KID

Return ONLY the JSON response. Be thorough and accurate in identifying distinct KID sections."""

        # Formato compatibile con il sistema di processing
        prompt_json = json.dumps([
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ], ensure_ascii=False)

        prompts.append(prompt_json)

    # Salva i prompt
    with open(output_file, 'w', encoding='utf-8') as f:
        for prompt in prompts:
            f.write(prompt + '\n')

    print(f"Generated {len(prompts)} intelligent analysis prompts for {len(kid_documents)} documents")
    print(f"Each prompt will automatically detect and extract multiple KIDs per document")
    print(f"Saved to {output_file}")

    return prompts

def load_kid_documents_from_markdown(file_paths):
    """
    Carica documenti KID da file markdown (versione semplificata)

    Args:
        file_paths (list): Lista di percorsi ai file markdown

    Returns:
        list: Lista di documenti KID processati
    """
    documents = []

    for i, file_path in enumerate(file_paths):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                documents.append({
                    'id': f"KID_DOC_{i+1}",
                    'file_path': file_path,
                    'content': content,
                    'content_length': len(content)
                })
                print(f"Loaded {file_path}: {len(content)} characters")
        except Exception as e:
            print(f"Error loading {file_path}: {e}")

    return documents

def process_gpt_responses(response_file, output_file='processed_kid_data.json'):
    """
    Processa le risposte JSON di GPT per aggregare i dati di tutti i KID estratti

    Args:
        response_file (str): File contenente le risposte JSON di GPT
        output_file (str): File di output per i dati aggregati

    Returns:
        dict: Dati aggregati di tutti i KID processati
    """
    all_kids_data = []
    processing_stats = {
        'total_documents_processed': 0,
        'total_kids_extracted': 0,
        'avg_kids_per_document': 0,
        'extraction_errors': [],
        'data_quality_summary': {}
    }

    try:
        with open(response_file, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                try:
                    response_data = json.loads(line.strip())

                    # Estrai metadati documento
                    doc_analysis = response_data.get('document_analysis', {})
                    kids_found = doc_analysis.get('total_kids_found', 0)

                    processing_stats['total_documents_processed'] += 1
                    processing_stats['total_kids_extracted'] += kids_found

                    # Aggiungi ogni KID ai dati aggregati
                    for kid in response_data.get('kids', []):
                        kid['source_line'] = line_num
                        kid['source_document'] = doc_analysis.get('source_document_id', f'DOC_{line_num}')
                        all_kids_data.append(kid)

                except json.JSONDecodeError as e:
                    error_msg = f"JSON decode error at line {line_num}: {e}"
                    processing_stats['extraction_errors'].append(error_msg)
                    print(f"Warning: {error_msg}")

                except Exception as e:
                    error_msg = f"Processing error at line {line_num}: {e}"
                    processing_stats['extraction_errors'].append(error_msg)
                    print(f"Error: {error_msg}")

        # Calcola statistiche finali
        if processing_stats['total_documents_processed'] > 0:
            processing_stats['avg_kids_per_document'] = processing_stats['total_kids_extracted'] / processing_stats['total_documents_processed']

        # Analizza qualità dati
        quality_scores = [int(kid.get('data_quality', {}).get('completeness_score', 0)) for kid in all_kids_data]
        if quality_scores:
            processing_stats['data_quality_summary'] = {
                'avg_completeness': sum(quality_scores) / len(quality_scores),
                'min_completeness': min(quality_scores),
                'max_completeness': max(quality_scores)
            }

        # Salva risultati aggregati
        final_output = {
            'processing_metadata': processing_stats,
            'extraction_timestamp': datetime.now().isoformat(),
            'total_kids_count': len(all_kids_data),
            'kids_data': all_kids_data
        }

        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(final_output, f, ensure_ascii=False, indent=2)

        print(f"\nPROCESSING COMPLETE:")
        print(f"- Documents processed: {processing_stats['total_documents_processed']}")
        print(f"- Total KIDs extracted: {processing_stats['total_kids_extracted']}")
        print(f"- Average KIDs per document: {processing_stats['avg_kids_per_document']:.1f}")
        print(f"- Processing errors: {len(processing_stats['extraction_errors'])}")
        print(f"- Output saved to: {output_file}")

        if processing_stats['data_quality_summary']:
            print(f"- Average data completeness: {processing_stats['data_quality_summary']['avg_completeness']:.1f}/10")

        return final_output

    except FileNotFoundError:
        print(f"Error: Response file {response_file} not found")
        return None
    except Exception as e:
        print(f"Error processing responses: {e}")
        return None



In [11]:
import glob

kid_files = glob.glob('/content/markdown_output/*.md')

In [None]:
# Esempio di utilizzo
if __name__ == "__main__":
    # Carica documenti markdown (possono contenere uno o più KID)
    kid_documents = load_kid_documents_from_markdown(kid_files)


    # Genera prompt intelligenti (GPT gestisce automaticamente i KID multipli)
    prompts = generate_kid_analysis_prompts(kid_documents, 'smart_kid_analysis.txt')

    print(f"\nSUCCESS: Generated {len(prompts)} intelligent prompts")
    print("Each prompt will automatically:")
    print("- Detect how many KIDs are in each document")
    print("- Extract complete data for each KID found")
    print("- Return JSON with proper structure reflecting actual KID count")

    # Esempio con contenuto diretto
    sample_content = """
    Il tuo documento markdown con uno o più KID...
    GPT detecterà automaticamente la struttura!
    """

    direct_prompts = generate_kid_analysis_prompts([sample_content], 'direct_smart_analysis.txt')

    # Dopo aver ottenuto le risposte da GPT, processale
    # processed_data = process_gpt_responses('gpt_responses.jsonl', 'final_kid_database.json')

## 5. Estrazione e Pulizia dei Dati

Questa sezione del notebook si concentra sulla trasformazione delle informazioni estratte dal modello di linguaggio in un formato strutturato e sulla successiva fase di pulizia e uniformazione dei dati. È un passaggio cruciale per rendere i dati pronti per l'analisi, la visualizzazione o l'ulteriore elaborazione.

### Scopo:

L'obiettivo di questa fase è duplice:
1.  **Strutturazione dei dati**: Convertire le risposte non ancora completamente strutturate del modello di linguaggio in un "DataFrame" Pandas, che è una tabella organizzata e facile da manipolare per l'analisi.
2.  **Qualità e uniformità dei dati**: Pulire, standardizzare e arricchire il dataset, risolvendo incoerenze e preparandolo per analisi quantitative.

### Dettagli del Processo:

Gli output mostrano le seguenti operazioni [1-3]:

*   **Creazione del Dataset Completo**: Il processo inizia con la creazione di un dataset completo delle informazioni chiave (KID) estratte ("🚀 Creating complete KID dataset...") [1].
*   **Caricamento ed Elaborazione**: I dati chiave vengono caricati e processati. Il notebook indica che sono state elaborate un certo numero di singole informazioni chiave (ad esempio, "📊 Processed 81 individual KIDs") [1].
*   **Creazione del DataFrame**: Le informazioni elaborate vengono consolidate in un DataFrame Pandas, con indicazione delle sue dimensioni (es. "📋 Created DataFrame with 81 rows and 171 columns") [1]. Questo significa che ogni riga rappresenta un'informazione chiave estratta e ogni colonna un attributo di tale informazione.
*   **Arricchimento del DataFrame**: Il DataFrame viene ulteriormente migliorato, ad esempio con l'aggiunta di colonne numeriche, utili per analisi quantitative (es. "🔧 Enhancing DataFrame... ✅ DataFrame enhanced with 18 numeric columns") [1].
*   **Uniformazione Avanzata**: Si avvia un processo di uniformazione avanzata del dataset [2]. Questo passaggio è essenziale per standardizzare i formati dei dati, gestire valori mancanti o inconsistenti e preparare il dataset per l'analisi comparativa. Viene mostrato il confronto tra le dimensioni del "Dataset originale" e del "Dataset pulito", indicando una riduzione delle colonne ma un aumento delle colonne numeriche a seguito della pulizia e uniformazione (es. "Dataset originale: 81 righe, 49 colonne" e "Dataset pulito: (81, 119) Colonne numeriche: 62") [2, 3]. Questo indica che alcune colonne non essenziali o ridondanti sono state rimosse o consolidate, mentre nuove colonne numeriche sono state create o identificate.

Questa fase è fondamentale per trasformare i dati grezzi e testuali, estratti dal modello di linguaggio, in un formato strutturato e pulito, essenziale per qualsiasi successiva analisi o modellazione predittiva.

*riestraiamo i risultati ottenuti dalle sezioni precedenti*

In [None]:
!7z a pdfs.7z downloaded_pdfs/*

In [None]:
!7z a mds.7z markdown_output/*

# lancio degli prompt

In [None]:
import asyncio
import openai
import json
import re
from tqdm.notebook import tqdm
from datetime import datetime

def clean_and_fix_json_response(response_text):
    """
    Tenta di pulire e riparare risposte JSON malformate da GPT
    """
    if not response_text or not response_text.strip():
        return None

    # Rimuovi caratteri di controllo e spazi extra
    cleaned = response_text.strip()

    # Rimuovi eventuali ``` markdown
    cleaned = re.sub(r'^```json\s*', '', cleaned, flags=re.MULTILINE)
    cleaned = re.sub(r'^```\s*$', '', cleaned, flags=re.MULTILINE)

    # Rimuovi eventuali commenti o testo extra prima/dopo il JSON
    json_start = cleaned.find('{')
    json_end = cleaned.rfind('}')

    if json_start != -1 and json_end != -1 and json_end > json_start:
        cleaned = cleaned[json_start:json_end + 1]

    # Tenta di parsare il JSON
    try:
        parsed = json.loads(cleaned)
        return parsed
    except json.JSONDecodeError as e:
        # Tenta riparazioni comuni
        try:
            # Riparazione 1: Aggiungi virgole mancanti
            fixed = re.sub(r'"\s*\n\s*"', '",\n"', cleaned)
            parsed = json.loads(fixed)
            return parsed
        except:
            pass

        try:
            # Riparazione 2: Rimuovi virgole trailing
            fixed = re.sub(r',\s*}', '}', cleaned)
            fixed = re.sub(r',\s*]', ']', fixed)
            parsed = json.loads(fixed)
            return parsed
        except:
            pass

        # Se tutte le riparazioni falliscono, ritorna None
        return None

def extract_partial_kid_data(response_text):
    """
    Estrae dati parziali anche da risposte JSON malformate
    """
    try:
        # Cerca pattern specifici nel testo
        patterns = {
            'nome_prodotto': r'"nome_prodotto":\s*"([^"]+)"',
            'compagnia': r'"compagnia":\s*"([^"]+)"',
            'indicatore_rischio': r'"indicatore_rischio_sintetico":\s*"([^"]+)"',
            'total_kids_found': r'"total_kids_found":\s*(\d+)'
        }

        extracted = {}
        for key, pattern in patterns.items():
            match = re.search(pattern, response_text, re.IGNORECASE)
            if match:
                extracted[key] = match.group(1)

        return extracted if extracted else None

    except Exception:
        return None

def estimate_tokens_needed():
    """
    Stima i token necessari per una risposta KID completa
    """
    # Esempio di JSON KID completo tipico
    sample_json_structure = {
        "document_analysis": {"source_document_id": "DOC_1", "total_kids_found": 6},
        "kids": [
            {
                "kid_id": "G>>>>Sviluppo_MultiPlan_G>>>>>>_Consumo_>>>>>nsabile_KID_1",
                "product_identification": {
                    "nome_prodotto": "G>>>>>Sviluppo MultiPlan",
                    "compagnia": "G>>>>>>> ITALIA S.p.A.",
                    "data_documento": "2024-01-15",
                    "tipo_prodotto": "Assicurazione sulla vita di tipo I",
                    "durata_contratto": "Tutta la vita",
                    "periodo_versamenti": "Primo versamento più eventuali versamenti aggiuntivi"
                },
                "descriptive_texts": {
                    "executive_summary": "G>>>>>Sviluppo MultiPlan è un prodotto assicurativo vita di tipo I che combina protezione assicurativa e opportunità di investimento attraverso diverse opzioni di gestione separata e fondi interni, offrendo flessibilità nella scelta dell'allocazione degli investimenti con un indicatore di rischio moderato.",
                    "product_description": "Il prodotto G>>>>Sviluppo MultiPlan rappresenta una soluzione assicurativa vita completa che offre al contraente la possibilità di combinare protezione del capitale investito con opportunità di crescita attraverso diverse opzioni di investimento."
                }
            }
        ]
    }

    # Stima approssimativa: 1 token ≈ 4 caratteri per l'italiano
    sample_text = json.dumps(sample_json_structure, ensure_ascii=False, indent=2)
    estimated_tokens_per_kid = len(sample_text) // 4

    print(f"📏 TOKEN ESTIMATION:")
    print(f"Sample JSON length: {len(sample_text)} characters")
    print(f"Estimated tokens per KID: {estimated_tokens_per_kid}")
    print(f"For 6 KIDs: ~{estimated_tokens_per_kid * 6} tokens")
    print(f"Current max_tokens: 12000")
    print(f"Recommended max_tokens: {max(10000, estimated_tokens_per_kid * 8)}")

    return estimated_tokens_per_kid

async def call_gpt_kid_analysis(prompt_json, client):
    """Esegue una singola chiamata a GPT per analisi KID con recovery"""
    try:
        # Parsa il JSON delle messages
        messages = json.loads(prompt_json)

        response = await asyncio.to_thread(
            client.chat.completions.create,
            model="gpt-4.1",
            messages=messages,
            temperature=0.1,
            max_tokens=12000  # Aumentato ulteriormente per gestire JSON molto complessi
        )

        response_content = response.choices[0].message.content

        # Verifica se la risposta è stata troncata
        finish_reason = response.choices[0].finish_reason
        if finish_reason == 'length':
            print(f"⚠️ Response truncated due to max_tokens limit")

        # Tenta di pulire e parsare la risposta
        cleaned_response = clean_and_fix_json_response(response_content)

        if cleaned_response:
            kids_found = cleaned_response.get('document_analysis', {}).get('total_kids_found', 0)
            validation_status = "valid_json"
            final_response = json.dumps(cleaned_response, ensure_ascii=False)
        else:
            # Tenta estrazione parziale
            partial_data = extract_partial_kid_data(response_content)
            kids_found = 0
            validation_status = "invalid_json"
            final_response = response_content

            # Se abbiamo dati parziali, segna come "partial"
            if partial_data:
                validation_status = "partial_data"
                if 'total_kids_found' in partial_data:
                    try:
                        kids_found = int(partial_data['total_kids_found'])
                    except:
                        kids_found = 0

        return {
            "prompt": prompt_json,
            "response": final_response,
            "raw_response": response_content,
            "finish_reason": finish_reason,
            "status": "success",
            "validation": validation_status,
            "kids_found": kids_found,
            "timestamp": datetime.now().isoformat()
        }

    except Exception as e:
        return {
            "prompt": prompt_json,
            "response": f"Error: {str(e)}",
            "raw_response": "",
            "finish_reason": "error",
            "status": "error",
            "validation": "error",
            "kids_found": 0,
            "timestamp": datetime.now().isoformat()
        }

async def process_kid_prompts(api_key, input_file, output_file, max_concurrent=15):
    """Processa tutti i prompt KID dal file con recovery avanzato"""
    # Inizializza il client OpenAI
    client = openai.OpenAI(api_key=api_key)

    # Leggi i prompt
    with open(input_file, 'r', encoding='utf-8') as f:
        prompts = [line.strip() for line in f if line.strip()]

    print(f"Loaded {len(prompts)} KID analysis prompts")

    # Crea un semaforo per limitare le richieste concorrenti
    sem = asyncio.Semaphore(max_concurrent)

    async def bounded_call(prompt):
        async with sem:
            return await call_gpt_kid_analysis(prompt, client)

    # Esegui le richieste con progress bar
    tasks = [bounded_call(prompt) for prompt in prompts]
    results = []

    print("Processing KID documents with GPT...")
    for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
        result = await coro
        results.append(result)

        # Log progress dettagliato ogni 10 documenti
        if len(results) % 10 == 0:
            successful = sum(1 for r in results if r['status'] == 'success')
            valid_json = sum(1 for r in results if r['validation'] == 'valid_json')
            partial_data = sum(1 for r in results if r['validation'] == 'partial_data')
            invalid_json = sum(1 for r in results if r['validation'] == 'invalid_json')
            truncated = sum(1 for r in results if r.get('finish_reason') == 'length')
            total_kids = sum(r['kids_found'] for r in results)

            print(f"Progress: {len(results)}/{len(prompts)}")
            print(f"  Valid JSON: {valid_json}, Partial: {partial_data}, Invalid: {invalid_json}")
            print(f"  Truncated responses: {truncated}")
            print(f"  Total KIDs extracted: {total_kids}")

    # Salva i risultati grezzi
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(results, f, ensure_ascii=False, indent=2)

    # Salva solo le risposte valide in formato JSONL
    jsonl_file = output_file.replace('.json', '_valid.jsonl')
    valid_count = 0
    with open(jsonl_file, 'w', encoding='utf-8') as f:
        for result in results:
            if result['validation'] == 'valid_json':
                f.write(result['response'] + '\n')
                valid_count += 1

    # Salva anche le risposte problematiche per debugging
    debug_file = output_file.replace('.json', '_debug.txt')
    with open(debug_file, 'w', encoding='utf-8') as f:
        f.write("PROBLEMATIC RESPONSES FOR DEBUGGING\n")
        f.write("=" * 50 + "\n\n")

        for i, result in enumerate(results):
            if result['validation'] in ['invalid_json', 'partial_data']:
                f.write(f"RESPONSE {i+1} - Status: {result['validation']}\n")
                f.write("-" * 30 + "\n")
                f.write(result['raw_response'][:500] + "...\n\n")

    # Statistiche finali dettagliate
    stats = {
        'total_prompts': len(prompts),
        'successful_responses': sum(1 for r in results if r['status'] == 'success'),
        'valid_json_responses': sum(1 for r in results if r['validation'] == 'valid_json'),
        'partial_data_responses': sum(1 for r in results if r['validation'] == 'partial_data'),
        'invalid_json_responses': sum(1 for r in results if r['validation'] == 'invalid_json'),
        'error_responses': sum(1 for r in results if r['status'] == 'error'),
        'truncated_responses': sum(1 for r in results if r.get('finish_reason') == 'length'),
        'total_kids_extracted': sum(r['kids_found'] for r in results)
    }

    print(f"\n📊 DETAILED PROCESSING SUMMARY:")
    print(f"Total prompts: {stats['total_prompts']}")
    print(f"Successful responses: {stats['successful_responses']}")
    print(f"✅ Valid JSON: {stats['valid_json_responses']}")
    print(f"🔧 Partial data: {stats['partial_data_responses']}")
    print(f"❌ Invalid JSON: {stats['invalid_json_responses']}")
    print(f"💥 Errors: {stats['error_responses']}")
    print(f"✂️ Truncated (need more tokens): {stats['truncated_responses']}")
    print(f"📈 Total KIDs extracted: {stats['total_kids_extracted']}")

    if stats['truncated_responses'] > 0:
        print(f"\n⚠️ WARNING: {stats['truncated_responses']} responses were truncated!")
        print(f"Consider increasing max_tokens beyond 12000 for better results.")

    if stats['valid_json_responses'] > 0:
        avg_kids = stats['total_kids_extracted'] / stats['valid_json_responses']
        print(f"📊 Average KIDs per valid document: {avg_kids:.1f}")

    success_rate = stats['successful_responses'] / stats['total_prompts'] * 100
    valid_rate = stats['valid_json_responses'] / stats['successful_responses'] * 100 if stats['successful_responses'] > 0 else 0

    print(f"🎯 Success rate: {success_rate:.1f}%")
    print(f"🎯 JSON validation rate: {valid_rate:.1f}%")
    print(f"📁 Valid responses saved to: {jsonl_file}")
    print(f"🐛 Debug file created: {debug_file}")

    return results, stats

def analyze_kid_extraction_results(results_file):
    """Analizza i risultati dell'estrazione KID"""
    with open(results_file, 'r', encoding='utf-8') as f:
        results = json.load(f)

    # Analisi dettagliata
    analysis = {
        'total_documents': len(results),
        'successful_extractions': 0,
        'failed_extractions': 0,
        'json_parse_errors': 0,
        'total_kids_found': 0,
        'kids_per_document': [],
        'extraction_errors': [],
        'data_quality_issues': []
    }

    for result in results:
        if result['status'] == 'success':
            analysis['successful_extractions'] += 1

            if result['validation'] == 'valid_json':
                kids_count = result['kids_found']
                analysis['total_kids_found'] += kids_count
                analysis['kids_per_document'].append(kids_count)

                # Analisi qualità dati
                try:
                    response_data = json.loads(result['response'])
                    actual_kids = len(response_data.get('kids', []))
                    reported_kids = response_data.get('document_analysis', {}).get('total_kids_found', 0)

                    if actual_kids != reported_kids:
                        analysis['data_quality_issues'].append({
                            'issue': 'kids_count_mismatch',
                            'reported': reported_kids,
                            'actual': actual_kids
                        })
                except:
                    pass
            else:
                analysis['json_parse_errors'] += 1
        else:
            analysis['failed_extractions'] += 1
            analysis['extraction_errors'].append(result['response'])

    # Statistiche distribuzione
    if analysis['kids_per_document']:
        analysis['avg_kids_per_doc'] = sum(analysis['kids_per_document']) / len(analysis['kids_per_document'])
        analysis['max_kids_per_doc'] = max(analysis['kids_per_document'])
        analysis['min_kids_per_doc'] = min(analysis['kids_per_document'])

        # Distribuzione
        from collections import Counter
        distribution = Counter(analysis['kids_per_document'])
        analysis['kids_distribution'] = dict(distribution)

    print("KID EXTRACTION ANALYSIS:")
    print(f"📄 Total documents processed: {analysis['total_documents']}")
    print(f"✅ Successful extractions: {analysis['successful_extractions']}")
    print(f"❌ Failed extractions: {analysis['failed_extractions']}")
    print(f"🔧 JSON parse errors: {analysis['json_parse_errors']}")
    print(f"📊 Total KIDs extracted: {analysis['total_kids_found']}")

    if analysis['kids_per_document']:
        print(f"📈 Average KIDs per document: {analysis['avg_kids_per_doc']:.1f}")
        print(f"📊 KIDs range: {analysis['min_kids_per_doc']} - {analysis['max_kids_per_doc']}")
        print(f"📋 Distribution: {analysis['kids_distribution']}")

    if analysis['data_quality_issues']:
        print(f"⚠️ Data quality issues: {len(analysis['data_quality_issues'])}")

    return analysis

# Per Google Colab - usa questo invece del main normale
async def main_kid_processing_colab():
    # Configurazione
    API_KEY = "sk-proj-OPENAI-API-KEY"  # Sostituisci con la tua chiave API

    # Percorsi dei file
    INPUT_FILE = "smart_kid_analysis.txt"  # File generato da generate_kid_analysis_prompts()
    OUTPUT_FILE = "kid_extraction_results.json"

    # Numero massimo di richieste concorrenti (ridotto per analisi complesse)
    MAX_CONCURRENT = 15

    print("🚀 Starting KID extraction process...")

    # Esegui il processo di estrazione
    results, stats = await process_kid_prompts(API_KEY, INPUT_FILE, OUTPUT_FILE, MAX_CONCURRENT)

    print(f"✅ Extraction completed! Results saved in {OUTPUT_FILE}")

    # Analizza i risultati
    analysis = analyze_kid_extraction_results(OUTPUT_FILE)

    # Mostra un esempio di risposta se disponibile
    valid_responses = [r for r in results if r['validation'] == 'valid_json']
    if valid_responses:
        try:
            first_response = json.loads(valid_responses[0]["response"])
            doc_analysis = first_response.get('document_analysis', {})
            kids = first_response.get('kids', [])

            print(f"\n📋 EXAMPLE EXTRACTION:")
            print(f"Document: {doc_analysis.get('source_document_id', 'N/A')}")
            print(f"KIDs found: {doc_analysis.get('total_kids_found', 0)}")

            if kids:
                first_kid = kids[0]
                product_name = first_kid.get('product_identification', {}).get('nome_prodotto', 'N/A')
                risk_level = first_kid.get('risk_return_profile', {}).get('indicatore_rischio_sintetico', 'N/A')
                print(f"First KID: {product_name} (Risk Level: {risk_level})")

                # Mostra summary se disponibile
                summary = first_kid.get('descriptive_texts', {}).get('executive_summary', '')
                if summary:
                    print(f"Summary: {summary[:150]}...")

        except Exception as e:
            print(f"\n❌ Could not parse example response: {e}")

    return results, stats

# Esempio per testare i token
# estimate_tokens_needed()

# Per eseguire in Google Colab:


In [None]:
results, stats = await main_kid_processing_colab()

## 5. Estrazione e Pulizia dei Dati

Questa sezione del notebook si concentra sulla trasformazione delle informazioni estratte dal modello di linguaggio in un formato strutturato e sulla successiva fase di pulizia e uniformazione dei dati. È un passaggio cruciale per rendere i dati pronti per l'analisi, la visualizzazione o l'ulteriore elaborazione.

### Scopo:

L'obiettivo di questa fase è duplice:
1.  **Strutturazione dei dati**: Convertire le risposte non ancora completamente strutturate del modello di linguaggio in un "DataFrame" Pandas, che è una tabella organizzata e facile da manipolare per l'analisi.
2.  **Qualità e uniformità dei dati**: Pulire, standardizzare e arricchire il dataset, risolvendo incoerenze e preparandolo per analisi quantitative.

### Dettagli del Processo:

Gli output mostrano le seguenti operazioni:

*   **Creazione del Dataset Completo**: Il processo inizia con la creazione di un dataset completo delle informazioni chiave (KID) estratte ("🚀 Creating complete KID dataset...") [1].
*   **Caricamento ed Elaborazione**: I dati chiave vengono caricati e processati. Il notebook indica che sono state elaborate un certo numero di singole informazioni chiave (ad esempio, "📊 Processed 81 individual KIDs") [1].
*   **Creazione del DataFrame**: Le informazioni elaborate vengono consolidate in un DataFrame Pandas, con indicazione delle sue dimensioni (es. "📋 Created DataFrame with 81 rows and 171 columns") [1]. Questo significa che ogni riga rappresenta un'informazione chiave estratta e ogni colonna un attributo di tale informazione.
*   **Arricchimento del DataFrame**: Il DataFrame viene ulteriormente migliorato, ad esempio con l'aggiunta di colonne numeriche, utili per analisi quantitative (es. "🔧 Enhancing DataFrame... ✅ DataFrame enhanced with 18 numeric columns") [1].
*   **Uniformazione Avanzata**: Si avvia un processo di uniformazione avanzata del dataset ("Inizio uniformazione avanzata dataset KID...") [2]. Questo passaggio è essenziale per standardizzare i formati dei dati, gestire valori mancanti o inconsistenti e preparare il dataset per l'analisi comparativa. Viene mostrato il confronto tra le dimensioni del "Dataset originale" (ad esempio, "81 righe, 49 colonne") e del "Dataset pulito" (ad esempio, "(81, 119) Colonne numeriche: 62") [2]. Questo indica che alcune colonne non essenziali o ridondanti sono state rimosse o consolidate, mentre nuove colonne numeriche sono state create o identificate.

Questa fase è fondamentale per trasformare i dati grezzi e testuali, estratti dal modello di linguaggio, in un formato strutturato e pulito, essenziale per qualsiasi successiva analisi o modellazione predittiva.

In [None]:
import pandas as pd
import json
import numpy as np
from datetime import datetime
import re

def flatten_nested_dict(nested_dict, parent_key='', sep='_'):
    """
    Appiattisce un dizionario nested in un dizionario flat
    """
    items = []
    for key, value in nested_dict.items():
        new_key = f"{parent_key}{sep}{key}" if parent_key else key

        if isinstance(value, dict):
            items.extend(flatten_nested_dict(value, new_key, sep=sep).items())
        elif isinstance(value, list):
            # Per le liste, crea colonne separate per ogni elemento
            if value and isinstance(value[0], dict):
                # Lista di dizionari (es. opzioni_investimento)
                for i, item in enumerate(value):
                    if isinstance(item, dict):
                        items.extend(flatten_nested_dict(item, f"{new_key}_{i}", sep=sep).items())
                    else:
                        items.append((f"{new_key}_{i}", item))
            else:
                # Lista semplice, converti in stringa
                items.append((new_key, ' | '.join(map(str, value)) if value else ''))
        else:
            items.append((new_key, value))

    return dict(items)

def extract_numeric_value(value_str):
    """
    Estrae valori numerici da stringhe (es. "5.2%" -> 5.2)
    """
    if pd.isna(value_str) or value_str == '' or value_str == 'N/A':
        return np.nan

    # Rimuovi caratteri non numerici eccetto . , - %
    cleaned = re.sub(r'[^\d.,%\-]', '', str(value_str))

    # Estrai il primo numero trovato
    number_match = re.search(r'[\-]?[\d,]+\.?\d*', cleaned)
    if number_match:
        number_str = number_match.group().replace(',', '')
        try:
            return float(number_str)
        except ValueError:
            return np.nan

    return np.nan

def convert_kids_to_dataframe(jsonl_file):
    """
    Converte il file JSONL di KID estratti in un DataFrame pandas strutturato

    Args:
        jsonl_file (str): Path al file JSONL con i KID validi

    Returns:
        pd.DataFrame: DataFrame completo con tutti i KID
    """

    all_kids = []
    processing_errors = []

    print("🔄 Loading and processing KID data...")

    # Leggi tutti i KID dal file JSONL
    with open(jsonl_file, 'r', encoding='utf-8') as f:
        for line_num, line in enumerate(f, 1):
            try:
                response_data = json.loads(line.strip())

                # Estrai metadati documento
                doc_analysis = response_data.get('document_analysis', {})
                source_doc = doc_analysis.get('source_document_id', f'DOC_{line_num}')

                # Processa ogni KID nel documento
                for kid_data in response_data.get('kids', []):
                    try:
                        # Aggiungi metadati documento
                        kid_data['source_document_id'] = source_doc
                        kid_data['extraction_line'] = line_num
                        kid_data['processing_timestamp'] = datetime.now().isoformat()

                        # Appiattisci la struttura nested
                        flattened_kid = flatten_nested_dict(kid_data)

                        all_kids.append(flattened_kid)

                    except Exception as e:
                        processing_errors.append({
                            'line': line_num,
                            'error': str(e),
                            'kid_id': kid_data.get('kid_id', 'unknown')
                        })

            except json.JSONDecodeError as e:
                processing_errors.append({
                    'line': line_num,
                    'error': f'JSON decode error: {e}',
                    'kid_id': 'parse_error'
                })

    print(f"📊 Processed {len(all_kids)} individual KIDs")
    if processing_errors:
        print(f"⚠️ {len(processing_errors)} processing errors encountered")

    # Crea DataFrame
    df = pd.DataFrame(all_kids)

    if df.empty:
        print("❌ No valid KID data found!")
        return pd.DataFrame()

    print(f"📋 Created DataFrame with {len(df)} rows and {len(df.columns)} columns")

    # Post-processing per migliorare i dati
    df = enhance_dataframe(df)

    return df

def enhance_dataframe(df):
    """
    Migliora il DataFrame con conversioni di tipo e colonne calcolate
    """
    print("🔧 Enhancing DataFrame...")

    # Conversioni numeriche per colonne finanziarie chiave
    numeric_columns = [
        'risk_return_profile_indicatore_rischio_sintetico',
        'cost_structure_costi_ingresso_percentuale_annua',
        'cost_structure_incidenza_costi_totali_1_anno',
        'cost_structure_incidenza_costi_totali_5_anni',
        'cost_structure_incidenza_costi_totali_10_anni',
        'data_quality_completeness_score'
    ]

    for col in numeric_columns:
        if col in df.columns:
            df[f'{col}_numeric'] = df[col].apply(extract_numeric_value)

    # Estrazioni specifiche per scenari performance
    scenario_columns = [col for col in df.columns if 'scenari_performance' in col and 'rendimento' in col]
    for col in scenario_columns:
        if col in df.columns:
            df[f'{col}_numeric'] = df[col].apply(extract_numeric_value)

    # Conversioni booleane
    boolean_columns = [
        'risk_return_profile_protezione_capitale',
        'insurance_benefits_prestazioni_vita_riscatto_disponibile'
    ]

    for col in boolean_columns:
        if col in df.columns:
            df[f'{col}_bool'] = df[col].map({
                'si': True, 'sì': True, 'yes': True, 'true': True, 'True': True,
                'no': False, 'false': False, 'False': False
            })

    # Colonne calcolate
    if 'data_quality_completeness_score_numeric' in df.columns:
        df['data_quality_high'] = df['data_quality_completeness_score_numeric'] >= 8

    # Categorizzazione livello rischio
    if 'risk_return_profile_indicatore_rischio_sintetico_numeric' in df.columns:
        df['risk_category'] = pd.cut(
            df['risk_return_profile_indicatore_rischio_sintetico_numeric'],
            bins=[0, 2, 4, 6, 7],
            labels=['Basso', 'Medio-Basso', 'Medio-Alto', 'Alto'],
            include_lowest=True
        )

    # Pulizia nomi colonne
    df.columns = df.columns.str.replace('_', ' ').str.title()

    print(f"✅ DataFrame enhanced with {len([c for c in df.columns if 'numeric' in c.lower()])} numeric columns")

    return df

def create_summary_statistics(df):
    """
    Crea statistiche riassuntive del dataset KID
    """
    print("\n📈 DATASET SUMMARY STATISTICS")
    print("=" * 50)

    # Statistiche generali
    print(f"📊 Total KIDs: {len(df)}")
    print(f"📄 Unique source documents: {df['Source Document Id'].nunique()}")
    print(f"🏢 Unique companies: {df['Product Identification Compagnia'].nunique()}")

    # Distribuzione prodotti
    if 'Product Identification Nome Prodotto' in df.columns:
        product_counts = df['Product Identification Nome Prodotto'].value_counts().head(10)
        print(f"\n🏆 TOP 10 PRODUCTS:")
        for product, count in product_counts.items():
            print(f"  {product}: {count}")

    # Distribuzione rischio
    if 'Risk Category' in df.columns:
        risk_dist = df['Risk Category'].value_counts()
        print(f"\n⚠️ RISK DISTRIBUTION:")
        for risk, count in risk_dist.items():
            print(f"  {risk}: {count} ({count/len(df)*100:.1f}%)")

    # Statistiche qualità dati
    if 'Data Quality Completeness Score Numeric' in df.columns:
        quality_stats = df['Data Quality Completeness Score Numeric'].describe()
        print(f"\n📋 DATA QUALITY STATS:")
        print(f"  Average completeness: {quality_stats['mean']:.1f}/10")
        print(f"  Min-Max: {quality_stats['min']:.0f}-{quality_stats['max']:.0f}")

        high_quality = (df['Data Quality Completeness Score Numeric'] >= 8).sum()
        print(f"  High quality (8+): {high_quality} ({high_quality/len(df)*100:.1f}%)")

    return df

def save_enhanced_dataset(df, output_file='kid_complete_dataset.xlsx'):
    """
    Salva il dataset in formato Excel con fogli separati
    """
    print(f"\n💾 Saving enhanced dataset to {output_file}...")

    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        # Foglio principale con tutti i dati
        df.to_excel(writer, sheet_name='All KIDs', index=False)

        # Foglio riassuntivo con statistiche chiave
        summary_cols = [
            'Kid Id', 'Source Document Id',
            'Product Identification Nome Prodotto',
            'Product Identification Compagnia',
            'Risk Return Profile Indicatore Rischio Sintetico Numeric',
            'Risk Category',
            'Cost Structure Incidenza Costi Totali 1 Anno Numeric',
            'Data Quality Completeness Score Numeric',
            'Descriptive Texts Executive Summary'
        ]

        summary_df = df[[col for col in summary_cols if col in df.columns]].copy()
        summary_df.to_excel(writer, sheet_name='Summary', index=False)

        # Foglio con solo dati numerici per analisi
        numeric_cols = [col for col in df.columns if 'numeric' in col.lower() or 'bool' in col.lower()]
        if numeric_cols:
            numeric_df = df[['Kid Id'] + numeric_cols].copy()
            numeric_df.to_excel(writer, sheet_name='Numeric Data', index=False)

    print(f"✅ Dataset saved with {len(df)} KIDs across multiple sheets")

def create_concise_kid_dataset(df_full):
    """
    Crea un DataFrame conciso con solo le informazioni più importanti

    Args:
        df_full (pd.DataFrame): DataFrame completo con tutte le colonne

    Returns:
        pd.DataFrame: DataFrame conciso con ~30 colonne chiave
    """

    print("✂️ Creating concise KID dataset with key information only...")

    # Definisci le colonne più importanti per analisi business
    key_columns = {
        # Identificativi
        'kid_id': 'KID_ID',
        'source_document_id': 'Source_Document',

        # Prodotto base
        'product_identification_nome_prodotto': 'Product_Name',
        'product_identification_compagnia': 'Company',
        'product_identification_tipo_prodotto': 'Product_Type',
        'product_identification_durata_contratto': 'Contract_Duration',

        # Struttura investimento (prima opzione)
        'investment_structure_opzioni_investimento_0_nome_opzione': 'Investment_Option_Name',
        'investment_structure_opzioni_investimento_0_tipologia': 'Investment_Type',
        'investment_structure_opzioni_investimento_0_livello_rischio': 'Investment_Risk_Level',

        # Profilo rischio-rendimento
        'risk_return_profile_indicatore_rischio_sintetico': 'Risk_Indicator',
        'risk_return_profile_indicatore_rischio_sintetico_numeric': 'Risk_Level_Numeric',
        'risk_category': 'Risk_Category',
        'risk_return_profile_periodo_detenzione_raccomandato': 'Recommended_Holding_Period',
        'risk_return_profile_protezione_capitale': 'Capital_Protection',

        # Scenari performance chiave (moderato e favorevole)
        'risk_return_profile_scenari_performance_moderato_1_anno_rendimento': 'Return_1Y_Moderate',
        'risk_return_profile_scenari_performance_moderato_5_anni_rendimento': 'Return_5Y_Moderate',
        'risk_return_profile_scenari_performance_favorevole_1_anno_rendimento': 'Return_1Y_Favorable',
        'risk_return_profile_scenari_performance_favorevole_5_anni_rendimento': 'Return_5Y_Favorable',

        # Costi chiave
        'cost_structure_costi_ingresso_percentuale_annua': 'Entry_Costs_Pct',
        'cost_structure_costi_gestione_annui_commissioni_gestione': 'Management_Fees',
        'cost_structure_incidenza_costi_totali_1_anno': 'Total_Costs_1Y',
        'cost_structure_incidenza_costi_totali_5_anni': 'Total_Costs_5Y',
        'cost_structure_incidenza_costi_totali_10_anni': 'Total_Costs_10Y',

        # Benefici assicurativi
        'insurance_benefits_copertura_decesso_base': 'Death_Benefit_Base',
        'insurance_benefits_prestazioni_vita_riscatto_disponibile': 'Surrender_Available',

        # Target market
        'target_market_profilo_cliente_tipologia': 'Client_Type',
        'target_market_profilo_cliente_orizzonte_temporale': 'Time_Horizon',
        'target_market_profilo_cliente_importo_minimo_premio': 'Min_Premium',

        # Testi descrittivi chiave - COMPLETI
        'descriptive_texts_executive_summary': 'Executive_Summary',
        'descriptive_texts_risk_assessment': 'Risk_Assessment',
        'descriptive_texts_product_description': 'Product_Description',

        # Qualità dati
        'data_quality_completeness_score': 'Data_Quality_Score',
        'data_quality_completeness_score_numeric': 'Data_Quality_Numeric',
        'data_quality_extraction_confidence': 'Extraction_Confidence'
    }

    # Seleziona solo le colonne che esistono nel DataFrame
    available_columns = {}
    missing_columns = []

    for old_name, new_name in key_columns.items():
        # Cerca la colonna con match flessibile (case insensitive, spazi/underscore)
        matched_col = find_column_match(df_full, old_name)
        if matched_col:
            available_columns[matched_col] = new_name
        else:
            missing_columns.append(old_name)

    print(f"📋 Found {len(available_columns)} out of {len(key_columns)} key columns")
    if missing_columns:
        print(f"⚠️ Missing columns: {len(missing_columns)}")

    # Crea DataFrame conciso
    df_concise = df_full[list(available_columns.keys())].copy()
    df_concise = df_concise.rename(columns=available_columns)

    # Post-processing per migliorare i dati concisi
    df_concise = enhance_concise_dataframe(df_concise)

    print(f"✅ Created concise dataset: {len(df_concise)} rows × {len(df_concise.columns)} columns")

    return df_concise

def find_column_match(df, target_column):
    """
    Trova una colonna nel DataFrame con match flessibile
    """
    # Normalizza nome target
    target_normalized = target_column.lower().replace('_', ' ').replace('-', ' ')

    for col in df.columns:
        col_normalized = col.lower().replace('_', ' ').replace('-', ' ')
        if target_normalized in col_normalized or col_normalized in target_normalized:
            return col
    return None

def enhance_concise_dataframe(df):
    """
    Migliora il DataFrame conciso con conversioni e colonne calcolate
    """
    print("🔧 Enhancing concise DataFrame...")

    # Conversioni numeriche per costi e rendimenti
    numeric_cols = [
        'Risk_Level_Numeric', 'Entry_Costs_Pct', 'Total_Costs_1Y', 'Total_Costs_5Y',
        'Total_Costs_10Y', 'Data_Quality_Numeric'
    ]

    for col in numeric_cols:
        if col in df.columns:
            df[f'{col}_Clean'] = df[col].apply(extract_numeric_value)

    # Conversioni per rendimenti
    return_cols = [col for col in df.columns if 'Return_' in col]
    for col in return_cols:
        if col in df.columns:
            df[f'{col}_Numeric'] = df[col].apply(extract_numeric_value)

    # Categorizzazioni semplificate
    if 'Risk_Level_Numeric_Clean' in df.columns:
        df['Risk_Simple'] = df['Risk_Level_Numeric_Clean'].map({
            1: 'Very Low', 2: 'Low', 3: 'Medium-Low',
            4: 'Medium', 5: 'Medium-High', 6: 'High', 7: 'Very High'
        })

    # Flag per caratteristiche importanti
    if 'Capital_Protection' in df.columns:
        df['Has_Capital_Protection'] = df['Capital_Protection'].str.lower().isin(['si', 'sì', 'yes', 'true'])

    if 'Total_Costs_1Y_Clean' in df.columns:
        df['High_Costs'] = df['Total_Costs_1Y_Clean'] > 3.0  # Costi > 3%

    if 'Data_Quality_Numeric_Clean' in df.columns:
        df['Reliable_Data'] = df['Data_Quality_Numeric_Clean'] >= 8

    # Pulizia testi - MANTIENI TESTI COMPLETI
    # Rimuoviamo il troncamento automatico per preservare informazioni importanti
    if 'Executive_Summary' in df.columns:
        # Mantieni il testo completo, rimuovi solo caratteri problematici
        df['Executive_Summary_Clean'] = df['Executive_Summary'].str.replace('\n', ' ').str.replace('\r', ' ').str.strip()

    if 'Risk_Assessment' in df.columns:
        df['Risk_Assessment_Clean'] = df['Risk_Assessment'].str.replace('\n', ' ').str.replace('\r', ' ').str.strip()

    return df

def create_business_insights(df_concise):
    """
    Crea insights business dal DataFrame conciso
    """
    print("\n📊 BUSINESS INSIGHTS FROM CONCISE DATASET")
    print("=" * 60)

    insights = {}

    # 1. Distribuzione per livello di rischio
    if 'Risk_Simple' in df_concise.columns:
        risk_dist = df_concise['Risk_Simple'].value_counts()
        insights['risk_distribution'] = risk_dist
        print("🎯 RISK DISTRIBUTION:")
        for risk, count in risk_dist.items():
            print(f"  {risk}: {count} products ({count/len(df_concise)*100:.1f}%)")

    # 2. Prodotti più comuni
    if 'Product_Name' in df_concise.columns:
        product_dist = df_concise['Product_Name'].value_counts().head(5)
        insights['top_products'] = product_dist
        print(f"\n🏆 TOP 5 PRODUCTS:")
        for product, count in product_dist.items():
            print(f"  {product}: {count} variants")

    # 3. Analisi costi
    if 'Total_Costs_1Y_Clean' in df_concise.columns:
        costs_stats = df_concise['Total_Costs_1Y_Clean'].describe()
        insights['cost_statistics'] = costs_stats
        print(f"\n💰 COST ANALYSIS (1-Year):")
        print(f"  Average: {costs_stats['mean']:.2f}%")
        print(f"  Range: {costs_stats['min']:.2f}% - {costs_stats['max']:.2f}%")
        print(f"  Median: {costs_stats['50%']:.2f}%")

        high_cost_count = (df_concise['Total_Costs_1Y_Clean'] > 3.0).sum()
        print(f"  High cost products (>3%): {high_cost_count} ({high_cost_count/len(df_concise)*100:.1f}%)")

    # 4. Protezione capitale
    if 'Has_Capital_Protection' in df_concise.columns:
        protection_count = df_concise['Has_Capital_Protection'].sum()
        insights['capital_protection'] = protection_count
        print(f"\n🛡️ CAPITAL PROTECTION:")
        print(f"  Protected products: {protection_count} ({protection_count/len(df_concise)*100:.1f}%)")

    # 5. Qualità dati
    if 'Reliable_Data' in df_concise.columns:
        reliable_count = df_concise['Reliable_Data'].sum()
        insights['data_quality'] = reliable_count
        print(f"\n📊 DATA QUALITY:")
        print(f"  High quality extractions: {reliable_count} ({reliable_count/len(df_concise)*100:.1f}%)")

    return insights

def save_concise_dataset(df_concise, insights, output_file='kid_concise_dataset.xlsx'):
    """
    Salva il dataset conciso con insights
    """
    print(f"\n💾 Saving concise dataset to {output_file}...")

    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        # Foglio principale conciso
        df_concise.to_excel(writer, sheet_name='Sheet1', index=False)

        # Foglio con solo dati numerici per analisi
        numeric_cols = [col for col in df_concise.columns if any(x in col.lower() for x in ['numeric', 'clean', 'level'])]
        key_cols = ['KID_ID', 'Product_Name', 'Company', 'Risk_Simple']
        analysis_cols = key_cols + [col for col in numeric_cols if col in df_concise.columns]

        if analysis_cols:
            df_analysis = df_concise[analysis_cols].copy()
            df_analysis.to_excel(writer, sheet_name='Analysis Ready', index=False)

        # Foglio insights
        insights_data = []
        for key, value in insights.items():
            if hasattr(value, 'items'):  # Series o dict
                for item, count in value.items():
                    insights_data.append({'Category': key, 'Item': item, 'Count': count})
            else:
                insights_data.append({'Category': key, 'Item': 'Total', 'Count': value})

        if insights_data:
            df_insights = pd.DataFrame(insights_data)
            df_insights.to_excel(writer, sheet_name='Business Insights', index=False)

    print(f"✅ Concise dataset saved with {len(df_concise)} KIDs and business insights")

# Funzione principale per dataset conciso
def create_concise_kid_analysis(df_full, output_file='kid_concise_dataset.xlsx'):
    """
    Crea un dataset conciso e analisi business dai KID estratti

    Args:
        df_full (pd.DataFrame): DataFrame completo
        output_file (str): File di output

    Returns:
        tuple: (DataFrame conciso, insights business)
    """

    print("🎯 Creating concise KID dataset for business analysis...")

    # Crea dataset conciso
    df_concise = create_concise_kid_dataset(df_full)

    # Genera insights business
    insights = create_business_insights(df_concise)

    # Salva tutto
    save_concise_dataset(df_concise, insights, output_file)

    print(f"\n🎉 CONCISE DATASET CREATED!")
    print(f"📊 {len(df_concise)} KIDs × {len(df_concise.columns)} key columns")
    print(f"📁 Saved to: {output_file}")
    print(f"📈 Business insights included")

    return df_concise, insights

# Funzione principale
def create_complete_kid_dataset(jsonl_file='kid_extraction_results_valid.jsonl',
                               output_file='kid_complete_dataset.xlsx'):
    """
    Funzione principale per convertire tutti i KID in un dataset pandas completo

    Args:
        jsonl_file (str): File JSONL con i KID estratti
        output_file (str): File Excel di output

    Returns:
        pd.DataFrame: Dataset completo
    """

    print("🚀 Creating complete KID dataset...")

    # Converti JSON in DataFrame
    df = convert_kids_to_dataframe(jsonl_file)

    if df.empty:
        print("❌ No data to process!")
        return df

    # Crea statistiche riassuntive
    df = create_summary_statistics(df)

    # Salva dataset completo
    save_enhanced_dataset(df, output_file)

    print(f"\n🎉 SUCCESS! Complete KID dataset created:")
    print(f"📊 {len(df)} KIDs processed")
    print(f"📁 Saved to: {output_file}")
    print(f"📋 Columns: {len(df.columns)}")

    return df

# Esempio di utilizzo completo
if __name__ == "__main__":
    # 1. Crea il dataset completo
    df_full = create_complete_kid_dataset(
        jsonl_file='kid_extraction_results_valid.jsonl',
        output_file='kid_complete_dataset.xlsx'
    )

    # 2. Crea il dataset conciso
    df_concise, insights = create_concise_kid_analysis(
        df_full,
        output_file='kid_concise_dataset.xlsx'
    )

    print(f"\n📋 FINAL SUMMARY:")
    print(f"Full dataset: {df_full.shape[0]} rows × {df_full.shape[1]} columns")
    print(f"Concise dataset: {df_concise.shape[0]} rows × {df_concise.shape[1]} columns")
    print(f"Data reduction: {(1 - df_concise.shape[1]/df_full.shape[1])*100:.1f}% fewer columns")

In [30]:
# salvo in excel
df_full.to_excel('kid_complete_dataset.xlsx', index=False)
df_concise.to_excel('kid_concise_dataset.xlsx', index=False)

In [42]:
# Codice per l'uniformazione del dataset KID (Key Information Document) assicurativo
# Versione avanzata con gestione di range, valori condizionali e formule complesse
# Mantiene la terminologia tecnica assicurativa secondo gli standard normativi

import pandas as pd
import numpy as np
import re
from typing import Dict, List, Tuple, Union

def uniforma_dataset_kid(df_concise: pd.DataFrame) -> pd.DataFrame:
    """
    Uniforma le variabili categoriali del dataset KID mantenendo
    la terminologia tecnica assicurativa e gestendo range/valori condizionali
    """

    # Copia del dataframe per evitare modifiche al dataset originale
    df = df_concise.copy()

    print("=== UNIFORMAZIONE DATASET KID ASSICURATIVO (VERSIONE AVANZATA) ===")
    print(f"Dataset originale: {df.shape[0]} righe, {df.shape[1]} colonne")

    # 1. UNIFORMAZIONE PRODUCT_TYPE
    print("\n1. Uniformazione Product_Type...")
    df['Product_Type_Clean'] = df['Product_Type'].apply(uniforma_product_type)

    # 2. UNIFORMAZIONE INVESTMENT_TYPE
    print("2. Uniformazione Investment_Type...")
    df['Investment_Type_Clean'] = df['Investment_Type'].apply(uniforma_investment_type)

    # 3. UNIFORMAZIONE RISK_CATEGORY
    print("3. Uniformazione Risk_Category...")
    df['Risk_Category_Clean'] = df['Risk_Category'].apply(uniforma_risk_category)

    # 4. UNIFORMAZIONE CAPITAL_PROTECTION
    print("4. Uniformazione Capital_Protection...")
    df['Capital_Protection_Clean'] = df['Capital_Protection'].apply(uniforma_capital_protection)

    # 5. UNIFORMAZIONE CLIENT_TYPE
    print("5. Uniformazione Client_Type...")
    df['Client_Type_Clean'] = df['Client_Type'].apply(uniforma_client_type)

    # 6. UNIFORMAZIONE CONTRACT_DURATION
    print("6. Uniformazione Contract_Duration...")
    df['Contract_Duration_Clean'] = df['Contract_Duration'].apply(uniforma_contract_duration)

    # 7. UNIFORMAZIONE TIME_HORIZON
    print("7. Uniformazione Time_Horizon...")
    df['Time_Horizon_Clean'] = df['Time_Horizon'].apply(uniforma_time_horizon)

    # 8. UNIFORMAZIONE SURRENDER_AVAILABLE
    print("8. Uniformazione Surrender_Available...")
    df['Surrender_Available_Clean'] = df['Surrender_Available'].apply(uniforma_surrender_available)

    # 9. PULIZIA AVANZATA VARIABILI NUMERICHE CON GESTIONE RANGE
    print("9. Pulizia avanzata variabili numeriche con gestione range...")
    df = pulisci_variabili_numeriche_avanzate(df)

    # 10. CREAZIONE VARIABILI DERIVATE
    print("10. Creazione variabili derivate...")
    df = crea_variabili_derivate(df)

    # 11. GESTIONE MANAGEMENT_FEES CONDIZIONALI
    print("11. Gestione Management_Fees condizionali...")
    df = gestisci_management_fees_condizionali(df)

    # Report finale
    print(f"\nDataset finale: {df.shape[0]} righe, {df.shape[1]} colonne")
    print("Nuove colonne create:", [col for col in df.columns if col.endswith('_Clean') or col.startswith('Flag_') or col.endswith('_Min') or col.endswith('_Max')])

    return df

def estrai_range_numerici(valore: str) -> Tuple[float, float, float]:
    """
    Estrae valori min, max e medio da stringhe che contengono range
    Restituisce (min_val, max_val, avg_val)
    """
    if pd.isna(valore) or valore == "" or valore == "N/A":
        return np.nan, np.nan, np.nan

    valore_str = str(valore).replace(',', '.')

    # Pattern per range con trattino: "1,2% - 2,5%", "€1.000 - €5.000"
    range_pattern1 = r'([\d\.,]+).*?[-–—]\s*([\d\.,]+)'

    # Pattern per range con "da...a": "da 1% a 3%", "from 1000 to 5000"
    range_pattern2 = r'(?:da|from)\s*([\d\.,]+).*?(?:a|to)\s*([\d\.,]+)'

    # Pattern per "fino a": "fino a 2%", "up to 1000"
    fino_pattern = r'(?:fino a|up to|max|massimo)\s*([\d\.,]+)'

    # Pattern per "almeno": "almeno 1000", "at least 500"
    almeno_pattern = r'(?:almeno|at least|min|minimo)\s*([\d\.,]+)'

    # Pattern per "circa": "circa 1,5%", "approximately 1000"
    circa_pattern = r'(?:circa|approximately|~|±)\s*([\d\.,]+)'

    # Cerca range completo
    match_range1 = re.search(range_pattern1, valore_str)
    match_range2 = re.search(range_pattern2, valore_str)

    if match_range1:
        try:
            min_val = float(re.sub(r'[^\d\.]', '', match_range1.group(1)))
            max_val = float(re.sub(r'[^\d\.]', '', match_range1.group(2)))
            avg_val = (min_val + max_val) / 2
            return min_val, max_val, avg_val
        except:
            pass

    if match_range2:
        try:
            min_val = float(re.sub(r'[^\d\.]', '', match_range2.group(1)))
            max_val = float(re.sub(r'[^\d\.]', '', match_range2.group(2)))
            avg_val = (min_val + max_val) / 2
            return min_val, max_val, avg_val
        except:
            pass

    # Cerca "fino a" (solo max)
    match_fino = re.search(fino_pattern, valore_str)
    if match_fino:
        try:
            max_val = float(re.sub(r'[^\d\.]', '', match_fino.group(1)))
            return 0, max_val, max_val / 2
        except:
            pass

    # Cerca "almeno" (solo min)
    match_almeno = re.search(almeno_pattern, valore_str)
    if match_almeno:
        try:
            min_val = float(re.sub(r'[^\d\.]', '', match_almeno.group(1)))
            return min_val, np.nan, min_val
        except:
            pass

    # Cerca "circa" (valore approssimativo)
    match_circa = re.search(circa_pattern, valore_str)
    if match_circa:
        try:
            val = float(re.sub(r'[^\d\.]', '', match_circa.group(1)))
            return val * 0.9, val * 1.1, val  # ±10% di tolleranza
        except:
            pass

    # Se non è un range, prova a estrarre un singolo valore
    numeri = re.findall(r'[\d\.,]+', valore_str.replace(',', '.'))
    if numeri:
        try:
            val = float(re.sub(r'[^\d\.]', '', numeri[0]))
            return val, val, val
        except:
            pass

    return np.nan, np.nan, np.nan

def pulisci_variabili_numeriche_avanzate(df: pd.DataFrame) -> pd.DataFrame:
    """
    Pulisce e uniforma le variabili numeriche gestendo range, valori condizionali e formule
    """

    # Variabili numeriche da processare
    numeric_cols = [
        'Risk_Level_Numeric', 'Entry_Costs_Pct', 'Management_Fees',
        'Total_Costs_1Y', 'Total_Costs_5Y', 'Total_Costs_10Y',
        'Return_1Y_Moderate', 'Return_5Y_Moderate',
        'Return_1Y_Favorable', 'Return_5Y_Favorable',
        'Min_Premium'
    ]

    for col in numeric_cols:
        if col in df.columns:
            print(f"   Processando {col}...")

            # Estrai range per ogni valore
            ranges = df[col].apply(estrai_range_numerici)

            # Crea colonne separate per min, max e valore medio
            df[f'{col}_Min'] = ranges.apply(lambda x: x[0] if isinstance(x, tuple) else np.nan)
            df[f'{col}_Max'] = ranges.apply(lambda x: x[1] if isinstance(x, tuple) else np.nan)
            df[f'{col}_Avg'] = ranges.apply(lambda x: x[2] if isinstance(x, tuple) else np.nan)

            # Crea flag per indicare se è un range
            df[f'{col}_IsRange'] = ranges.apply(lambda x:
                isinstance(x, tuple) and not pd.isna(x[0]) and not pd.isna(x[1]) and x[0] != x[1]
            )

            # Colonna principale con valore rappresentativo (media se range, valore singolo altrimenti)
            df[f'{col}_Clean'] = df[f'{col}_Avg']

    return df

def gestisci_management_fees_condizionali(df: pd.DataFrame) -> pd.DataFrame:
    """
    Gestisce i Management_Fees con formule condizionali complesse
    """
    if 'Management_Fees' in df.columns:

        def estrai_fees_base_e_condizionali(fees_str):
            if pd.isna(fees_str) or fees_str == "" or fees_str == "N/A":
                return np.nan, np.nan, "Fisso"

            fees_str = str(fees_str).lower()

            # Pattern per fee base + maggiorazione condizionale
            # Es: "1,10% annuo sul rendimento della gestione separata; maggiorazione di 0,03% per ogni 0,1% di rendimento oltre il 3%"
            if 'maggiorazione' in fees_str or 'performance' in fees_str or 'oltre' in fees_str:
                # Estrae fee base
                base_match = re.search(r'([\d,\.]+)%', fees_str)
                base_fee = float(base_match.group(1).replace(',', '.')) if base_match else np.nan

                # Estrae maggiorazione
                magg_match = re.search(r'maggiorazione.*?([\d,\.]+)%', fees_str)
                extra_fee = float(magg_match.group(1).replace(',', '.')) if magg_match else np.nan

                return base_fee, extra_fee, "Performance-Based"

            # Pattern per fee variabile
            elif 'variabile' in fees_str or 'variable' in fees_str:
                min_val, max_val, avg_val = estrai_range_numerici(fees_str)
                return avg_val, max_val - min_val if not pd.isna(max_val) and not pd.isna(min_val) else np.nan, "Variabile"

            # Fee fisso
            else:
                min_val, max_val, avg_val = estrai_range_numerici(fees_str)
                return avg_val, np.nan, "Fisso"

        # Applica la funzione
        fees_data = df['Management_Fees'].apply(estrai_fees_base_e_condizionali)

        df['Management_Fees_Base'] = fees_data.apply(lambda x: x[0] if isinstance(x, tuple) else np.nan)
        df['Management_Fees_Extra'] = fees_data.apply(lambda x: x[1] if isinstance(x, tuple) else np.nan)
        df['Management_Fees_Type'] = fees_data.apply(lambda x: x[2] if isinstance(x, tuple) else "Non Specificato")

    return df

def uniforma_product_type(product_type: str) -> str:
    """Uniforma la tipologia di prodotto secondo classificazione Solvency II"""
    if pd.isna(product_type) or product_type == "":
        return "Non Specificato"

    product_type = str(product_type).lower()

    # Ramo I - Assicurazioni sulla vita
    if any(term in product_type for term in ['vita intera', 'vita_intera', 'whole life']):
        return "Ramo I - Vita Intera"
    elif any(term in product_type for term in ['mista', 'endowment']):
        return "Ramo I - Assicurazione Mista"
    elif any(term in product_type for term in ['rivalutabile', 'rivalutabil']):
        return "Ramo I - Rivalutabile"
    elif any(term in product_type for term in ['capitale differito', 'capital deferred']):
        return "Ramo I - Capitale Differito"
    elif any(term in product_type for term in ['temporanea', 'temporary']):
        return "Ramo I - Temporanea Caso Morte"

    # Ramo III - Unit Linked
    elif any(term in product_type for term in ['unit linked', 'unit_linked', 'ramo iii', 'ramo_iii']):
        return "Ramo III - Unit Linked"

    # Ramo V - Capitalizzazione
    elif any(term in product_type for term in ['capitalizzazione', 'capitalization']):
        return "Ramo V - Capitalizzazione"

    # Multiramo
    elif any(term in product_type for term in ['multiramo', 'multi-ramo']):
        return "Multiramo"

    # Previdenza Complementare
    elif any(term in product_type for term in ['previdenza', 'pensione', 'pension']):
        return "Previdenza Complementare"

    else:
        return "Ramo I - Altre Forme"

def uniforma_investment_type(investment_type: str) -> str:
    """Uniforma il tipo di investimento"""
    if pd.isna(investment_type) or investment_type == "" or investment_type == "N/A":
        return "Non Applicabile"

    investment_type = str(investment_type).lower()

    if 'gestione_separata' in investment_type and 'fondo_interno' in investment_type:
        return "Gestione Separata + Fondo Interno"
    elif 'gestione_separata' in investment_type or 'gestione separata' in investment_type:
        return "Gestione Separata"
    elif 'fondo_interno' in investment_type or 'fondo interno' in investment_type:
        return "Fondo Interno"
    elif 'fondo_esterno' in investment_type or 'fondo esterno' in investment_type:
        return "Fondo Esterno"
    elif 'etf' in investment_type:
        return "ETF"
    elif 'sicav' in investment_type:
        return "SICAV"
    else:
        return "Altro"

def uniforma_risk_category(risk_category: str) -> str:
    """Uniforma la categoria di rischio secondo scala 1-7 Solvency II"""
    if pd.isna(risk_category) or risk_category == "":
        return "Non Classificato"

    risk_category = str(risk_category).lower()

    if any(term in risk_category for term in ['molto basso', 'very low']):
        return "Molto Basso (1-2)"
    elif any(term in risk_category for term in ['basso', 'low']):
        return "Basso (2-3)"
    elif any(term in risk_category for term in ['medio-basso', 'medio basso', 'medium-low']):
        return "Medio-Basso (3-4)"
    elif any(term in risk_category for term in ['medio', 'medium']) and 'alto' not in risk_category:
        return "Medio (4-5)"
    elif any(term in risk_category for term in ['medio-alto', 'medio alto', 'medium-high']):
        return "Medio-Alto (5-6)"
    elif any(term in risk_category for term in ['alto', 'high']):
        return "Alto (6-7)"
    elif any(term in risk_category for term in ['molto alto', 'very high']):
        return "Molto Alto (7)"
    else:
        return "Non Classificato"

def uniforma_capital_protection(capital_protection: str) -> str:
    """Uniforma la protezione del capitale secondo terminologia MiFID II"""
    if pd.isna(capital_protection) or capital_protection == "":
        return "Non Specificato"

    capital_protection = str(capital_protection).lower()

    # Protezione totale
    if capital_protection == "si" or capital_protection == "sì":
        return "Protezione Totale"
    elif capital_protection == "no":
        return "Nessuna Protezione"

    # Protezioni condizionate
    elif 'solo decesso' in capital_protection or 'case of death' in capital_protection:
        return "Protezione Solo Caso Morte"
    elif 'finestre' in capital_protection or 'window' in capital_protection or 'specifiche' in capital_protection:
        return "Protezione a Finestre Temporali"
    elif 'scadenza' in capital_protection or 'maturity' in capital_protection:
        return "Protezione Solo a Scadenza"
    elif 'decesso' in capital_protection and ('riscatto' in capital_protection or 'finestre' in capital_protection):
        return "Protezione Mista (Decesso + Finestre)"
    elif '10 anni' in capital_protection or '10anni' in capital_protection:
        return "Protezione Dopo 10 Anni"

    # Protezioni parziali
    elif 'parziale' in capital_protection:
        return "Protezione Parziale"
    elif any(perc in capital_protection for perc in ['80%', '90%', '95%', '100%']):
        return "Protezione Parziale Percentuale"
    elif 'almeno' in capital_protection and any(perc in capital_protection for perc in ['80', '90', '95']):
        return "Protezione Parziale Percentuale"

    # Protezioni con rischi
    elif 'cambio' in capital_protection or 'currency' in capital_protection or 'usd' in capital_protection:
        return "Protezione con Rischio Cambio"
    elif 'penali' in capital_protection or 'penalty' in capital_protection:
        return "Protezione con Penalità"

    else:
        return "Protezione Condizionata"

def uniforma_client_type(client_type: str) -> str:
    """Uniforma il tipo di cliente secondo MiFID II"""
    if pd.isna(client_type) or client_type == "":
        return "Non Specificato"

    client_type = str(client_type).lower()

    if 'retail' in client_type and 'professionale' in client_type:
        return "Retail e Professionale"
    elif 'retail' in client_type:
        return "Clientela al Dettaglio"
    elif 'professionale' in client_type or 'professional' in client_type:
        return "Cliente Professionale"
    elif 'istituzionale' in client_type or 'institutional' in client_type:
        return "Cliente Istituzionale"
    elif 'controparte qualificata' in client_type or 'eligible counterparty' in client_type:
        return "Controparte Qualificata"
    else:
        return "Non Specificato"

def uniforma_contract_duration(duration: str) -> str:
    """Uniforma la durata contrattuale"""
    if pd.isna(duration) or duration == "":
        return "Non Specificato"

    duration = str(duration).lower()

    if 'vita intera' in duration or 'whole life' in duration:
        return "Vita Intera"
    elif 'anni' in duration:
        # Estrae il numero di anni
        anni = re.findall(r'\d+', duration)
        if anni:
            n_anni = int(anni[0])
            if n_anni <= 5:
                return "Breve Termine (≤5 anni)"
            elif n_anni <= 10:
                return "Medio Termine (6-10 anni)"
            elif n_anni <= 20:
                return "Lungo Termine (11-20 anni)"
            else:
                return "Lunghissimo Termine (>20 anni)"
    elif 'fino' in duration and 'decesso' in duration:
        return "Vita Intera"
    else:
        return "Durata Variabile"

def uniforma_time_horizon(time_horizon: str) -> str:
    """Uniforma l'orizzonte temporale raccomandato"""
    if pd.isna(time_horizon) or time_horizon == "":
        return "Non Specificato"

    time_horizon = str(time_horizon).lower()

    if any(term in time_horizon for term in ['breve', 'short', '1 anno', '2 anni']):
        return "Breve Termine"
    elif any(term in time_horizon for term in ['medio', 'medium']):
        return "Medio Termine"
    elif any(term in time_horizon for term in ['lungo', 'long']):
        return "Lungo Termine"
    elif 'almeno' in time_horizon:
        # Estrae gli anni
        anni = re.findall(r'\d+', time_horizon)
        if anni:
            n_anni = int(anni[0])
            if n_anni <= 3:
                return "Breve Termine"
            elif n_anni <= 7:
                return "Medio Termine"
            else:
                return "Lungo Termine"
    else:
        return "Non Specificato"

def uniforma_surrender_available(surrender: str) -> str:
    """Uniforma la disponibilità del riscatto"""
    if pd.isna(surrender) or surrender == "":
        return "Non Specificato"

    surrender = str(surrender).lower()

    if surrender == "si" or surrender == "sì":
        return "Riscatto Disponibile"
    elif surrender == "no":
        return "Riscatto Non Disponibile"
    elif 'limitato' in surrender or 'limited' in surrender:
        return "Riscatto Limitato"
    elif 'penalità' in surrender or 'penalty' in surrender:
        return "Riscatto con Penalità"
    else:
        return "Riscatto Condizionato"

def crea_variabili_derivate(df: pd.DataFrame) -> pd.DataFrame:
    """Crea variabili derivate per l'analisi attuariale"""

    # Assicuriamo che Risk_Level_Numeric sia numerico
    if 'Risk_Level_Numeric_Clean' in df.columns:
        risk_numeric = df['Risk_Level_Numeric_Clean']
    elif 'Risk_Level_Numeric' in df.columns:
        risk_numeric = pd.to_numeric(df['Risk_Level_Numeric'], errors='coerce')
    else:
        risk_numeric = pd.Series([np.nan] * len(df))

    # Flag per prodotti complessi secondo PRIIPs
    df['Flag_Prodotto_Complesso'] = (
        (df['Product_Type_Clean'].str.contains('Unit Linked|Multiramo', na=False)) |
        (df['Investment_Type_Clean'].isin(['Fondo Esterno', 'ETF', 'SICAV'])) |
        (risk_numeric >= 5)
    ).astype(int)

    # Flag per protezione del capitale
    df['Flag_Capital_Protection'] = (
        df['Capital_Protection_Clean'].isin([
            'Protezione Totale', 'Protezione Parziale',
            'Protezione Solo Caso Morte', 'Protezione Mista (Decesso + Finestre)',
            'Protezione Parziale Percentuale'
        ])
    ).astype(int)

    # Flag per protezione condizionata
    df['Flag_Capital_Protection_Conditioned'] = (
        df['Capital_Protection_Clean'].isin([
            'Protezione a Finestre Temporali', 'Protezione Solo a Scadenza',
            'Protezione Dopo 10 Anni', 'Protezione con Penalità'
        ])
    ).astype(int)

    # Categoria di costo basata su Total_Costs_1Y_Clean
    cost_col = None
    for col in ['Total_Costs_1Y_Clean', 'Total_Costs_1Y_Avg', 'Total_Costs_1Y_Numeric_Clean']:
        if col in df.columns:
            cost_col = col
            break

    if cost_col and df[cost_col].notna().sum() > 0:
        df['Categoria_Costo'] = pd.cut(
            df[cost_col],
            bins=[-np.inf, 1, 2.5, np.inf],
            labels=['Basso', 'Medio', 'Alto']
        )

    # Flag per clientela retail
    df['Flag_Retail'] = (
        df['Client_Type_Clean'].str.contains('Retail', na=False)
    ).astype(int)

    # Flag per management fees variabili/performance-based
    if 'Management_Fees_Type' in df.columns:
        df['Flag_Fees_Variabili'] = (
            df['Management_Fees_Type'].isin(['Performance-Based', 'Variabile'])
        ).astype(int)

    # Durata contratto in anni (estratta da Contract_Duration)
    df['Durata_Anni'] = df['Contract_Duration'].astype(str)\
        .str.extract(r'(\d+)').astype(float)

    # Flag per range di valori (indica incertezza nei dati)
    range_cols = [col for col in df.columns if col.endswith('_IsRange')]
    if range_cols:
        df['Flag_Dati_Con_Range'] = df[range_cols].any(axis=1).astype(int)

    # Categoria di rischio semplificata
    df['Risk_Category_Simple'] = risk_numeric.apply(categorizza_rischio_semplice)

    # Flag per prodotti ESG/sostenibili (basato su nomi e descrizioni)
    if 'Product_Name' in df.columns:
        df['Flag_ESG'] = df['Product_Name'].str.contains(
            'ESG|Sostenibil|Green|Climate|Environment|Social',
            case=False, na=False
        ).astype(int)

    return df

def categorizza_rischio_semplice(risk_level: float) -> str:
    """Categorizza il livello di rischio in modo semplificato"""
    if pd.isna(risk_level):
        return "Non Classificato"
    elif risk_level <= 2:
        return "Basso Rischio"
    elif risk_level <= 4:
        return "Medio Rischio"
    elif risk_level <= 6:
        return "Alto Rischio"
    else:
        return "Rischio Molto Alto"

def analizza_uniformazione_avanzata(df_original: pd.DataFrame, df_clean: pd.DataFrame):
    """Analizza i risultati dell'uniformazione avanzata"""

    print("\n=== ANALISI POST-UNIFORMAZIONE AVANZATA ===")

    # Variabili categoriali principali
    categorical_vars = [
        ('Product_Type', 'Product_Type_Clean'),
        ('Investment_Type', 'Investment_Type_Clean'),
        ('Risk_Category', 'Risk_Category_Clean'),
        ('Capital_Protection', 'Capital_Protection_Clean'),
        ('Client_Type', 'Client_Type_Clean')
    ]

    for original, clean in categorical_vars:
        if original in df_original.columns and clean in df_clean.columns:
            print(f"\n{original}:")
            print(f"  Valori unici originali: {df_original[original].nunique()}")
            print(f"  Valori unici uniformati: {df_clean[clean].nunique()}")
            print(f"  Valori uniformati: {list(df_clean[clean].unique())}")

    # Analisi range e valori condizionali
    print("\n=== GESTIONE RANGE E VALORI CONDIZIONALI ===")

    range_cols = [col for col in df_clean.columns if col.endswith('_IsRange')]
    if range_cols:
        for col in range_cols[:5]:  # Prime 5 colonne con range
            base_col = col.replace('_IsRange', '')
            n_ranges = df_clean[col].sum()
            print(f"{base_col}: {n_ranges} valori con range su {len(df_clean)} totali")

    # Analisi Management Fees condizionali
    if 'Management_Fees_Type' in df_clean.columns:
        print("\nTipologie Management Fees:")
        fees_types = df_clean['Management_Fees_Type'].value_counts()
        for fee_type, count in fees_types.items():
            print(f"  {fee_type}: {count}")

    # Analisi Capital Protection dettagliata
    if 'Capital_Protection_Clean' in df_clean.columns:
        print("\nTipologie Capital Protection:")
        protection_types = df_clean['Capital_Protection_Clean'].value_counts()
        for prot_type, count in protection_types.items():
            print(f"  {prot_type}: {count}")

def genera_report_qualita_dati(df: pd.DataFrame) -> pd.DataFrame:
    """Genera un report sulla qualità dei dati uniformati"""

    report_data = []

    # Analisi completezza dati per variabili principali
    main_vars = [
        'Product_Type_Clean', 'Investment_Type_Clean', 'Risk_Category_Clean',
        'Capital_Protection_Clean', 'Client_Type_Clean'
    ]

    for var in main_vars:
        if var in df.columns:
            completezza = (df[var] != 'Non Specificato').sum() / len(df) * 100
            report_data.append({
                'Variabile': var,
                'Completezza_%': round(completezza, 1),
                'Valori_Unici': df[var].nunique(),
                'Valori_Mancanti': (df[var] == 'Non Specificato').sum()
            })

    # Analisi range nelle variabili numeriche
    numeric_vars = [col for col in df.columns if col.endswith('_IsRange')]
    for var in numeric_vars:
        base_var = var.replace('_IsRange', '')
        if var in df.columns:
            n_ranges = df[var].sum()
            perc_ranges = n_ranges / len(df) * 100
            report_data.append({
                'Variabile': base_var,
                'Range_%': round(perc_ranges, 1),
                'N_Range': n_ranges,
                'N_Valori_Singoli': len(df) - n_ranges
            })

    return pd.DataFrame(report_data)

# Esempio di utilizzo completo
if __name__ == "__main__":
    # Supponiamo che df_concise sia già caricato
    # df_concise = pd.read_excel('kid_concise_dataset.xlsx')

    print("Inizio uniformazione avanzata dataset KID...")

    # Uniformazione del dataset
    df_uniformato = uniforma_dataset_kid(df_concise)

    # Analisi dei risultati
    analizza_uniformazione_avanzata(df_concise, df_uniformato)

    # Genera report qualità dati
    report_qualita = genera_report_qualita_dati(df_uniformato)
    print("\n=== REPORT QUALITÀ DATI ===")
    print(report_qualita.to_string(index=False))

    # Statistiche finali
    print(f"\n=== STATISTICHE FINALI ===")
    print(f"Dataset finale: {df_uniformato.shape[0]} righe, {df_uniformato.shape[1]} colonne")

    # Conta nuove colonne create
    new_cols = [col for col in df_uniformato.columns if
                col.endswith('_Clean') or col.endswith('_Min') or col.endswith('_Max') or
                col.endswith('_Avg') or col.endswith('_IsRange') or col.startswith('Flag_') or
                col.endswith('_Type') or col.endswith('_Base') or col.endswith('_Extra')]

    print(f"Nuove colonne create: {len(new_cols)}")
    print("Tipologie di colonne aggiunte:")
    print(f"  - Uniformazione categoriali: {len([c for c in new_cols if c.endswith('_Clean')])}")
    print(f"  - Gestione range (Min/Max/Avg): {len([c for c in new_cols if c.endswith(('_Min', '_Max', '_Avg'))])}")
    print(f"  - Flag di qualità: {len([c for c in new_cols if c.endswith('_IsRange')])}")
    print(f"  - Flag derivati: {len([c for c in new_cols if c.startswith('Flag_')])}")
    print(f"  - Management fees dettagliati: {len([c for c in new_cols if 'Management_Fees' in c and c != 'Management_Fees'])}")

    # Salvataggio del dataset uniformato
    # df_uniformato.to_excel('kid_dataset_uniformato_avanzato.xlsx', index=False)
    # df_uniformato.to_csv('kid_dataset_uniformato_avanzato.csv', index=False)
    # report_qualita.to_excel('report_qualita_dati_kid.xlsx', index=False)

    print("\nUniformazione avanzata completata con successo!")
    print("Dataset pronto per analisi attuariali avanzate, pricing e modellazione del rischio.")
    print("\nFunzionalità implementate:")
    print("✓ Gestione range e valori approssimativi")
    print("✓ Management fees condizionali e performance-based")
    print("✓ Capital protection dettagliata con tutte le casistiche")
    print("✓ Flag di qualità per identificare incertezze nei dati")
    print("✓ Variabili derivate per analisi di compliance e rischio")
    print("✓ Report automatico qualità dati")

Inizio uniformazione avanzata dataset KID...
=== UNIFORMAZIONE DATASET KID ASSICURATIVO (VERSIONE AVANZATA) ===
Dataset originale: 81 righe, 49 colonne

1. Uniformazione Product_Type...
2. Uniformazione Investment_Type...
3. Uniformazione Risk_Category...
4. Uniformazione Capital_Protection...
5. Uniformazione Client_Type...
6. Uniformazione Contract_Duration...
7. Uniformazione Time_Horizon...
8. Uniformazione Surrender_Available...
9. Pulizia avanzata variabili numeriche con gestione range...
   Processando Risk_Level_Numeric...
   Processando Entry_Costs_Pct...
   Processando Management_Fees...
   Processando Total_Costs_1Y...
   Processando Total_Costs_5Y...
   Processando Total_Costs_10Y...
   Processando Return_1Y_Moderate...
   Processando Return_5Y_Moderate...
   Processando Return_1Y_Favorable...
   Processando Return_5Y_Favorable...
   Processando Min_Premium...
10. Creazione variabili derivate...
11. Gestione Management_Fees condizionali...

Dataset finale: 81 righe, 119 col

In [44]:
df_uniformato.to_excel('kid_concise_dataset_CLEAN.xlsx', sheet_name='KID Clean',index=False)

In [None]:
# Carica il dataset pulito
df_clean = pd.read_excel('kid_concise_dataset_CLEAN.xlsx', sheet_name='KID Clean')

# Verifica la qualità
print(f"Dataset pulito: {df_clean.shape}")
print(f"Colonne numeriche: {len(df_clean.select_dtypes(include=[np.number]).columns)}")

# Prime analisi
df_clean['Risk_Level_Numeric_Clean'].value_counts()
df_clean['Company'].value_counts()
df_clean.groupby('Company')['Risk_Level_Numeric_Clean'].mean()