# Datenextraktion
#### Marc Peter Gr√ºniger
---

## Bibliotheken & Plot-Einstellungen

In [49]:
# System & Standardbibliotheken
import os
import time
import json
import asyncio

# Drittanbieterbibliotheken
import requests
import nest_asyncio
from dotenv import load_dotenv
import tiktoken

# LlamaIndex ‚Äì Core
from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    StorageContext,
    load_index_from_storage,
    QueryBundle
)
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import TextNode

# LlamaIndex ‚Äì LLM & Parser
from llama_index.llms.openai import OpenAI
from llama_parse import LlamaParse


## API Keys

In [51]:
load_dotenv(override=True)

# API-Keys laden
llama_cloud_api_key = os.getenv("LLAMA_CLOUD_API_KEY")
azure_api_key = os.getenv("AZURE_OPENAI_API_KEY")
azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT").rstrip("/")
azure_embedding_deployment = os.getenv("AZURE_EMBEDDING_DEPLOYMENT")
azure_api_version = os.getenv("AZURE_API_VERSION")
openai_api_key = os.getenv("OPENAI_API_KEY")

---

## Funktionen

### Prompts laden

In [53]:
def load_prompt(prompt_name: str) -> str:
    """
    L√§dt den Inhalt einer Prompt-Datei aus dem Verzeichnis 'prompts'.

    Args:
        prompt_name (str): Der Name der Prompt-Datei (ohne Dateiendung '.txt').

    Returns:
        str: Der geladene Prompt-Text als String.
    """

    
    prompt_path = os.path.join("prompts", f"{prompt_name}.txt")
    with open(prompt_path, "r", encoding="utf-8") as f:
        return f.read()


### Extrahierte Werte speichern

In [None]:
def save_to_data_json(pdf_path, key, value):
    """
    Speichert einen extrahierten Wert in eine zentrale JSON-Datei, gruppiert nach Pensionskassenname.

    Args:
        pdf_path (str): Pfad zur PDF-Datei, aus dem der Pensionskassenname abgeleitet wird.
        key (str): Der Name des Datenpunkts, der gespeichert werden soll.
        value (Any): Der extrahierte Wert, der gespeichert werden soll.

    Raises:
        ValueError: Wenn der PDF-Pfad nicht gen√ºgend Informationen enth√§lt, um den Pensionskassennamen zu bestimmen.
    """
    
    # Pfad aufteilen und den Ordnernamen vor der PDF-Datei als Pensionskassenname verwenden
    parts = pdf_path.replace("\\", "/").split("/")
    if len(parts) < 2:
        raise ValueError("PDF-Pfad ist zu kurz, um den Pensionskassennamen zu extrahieren.")
    pk_name = parts[-2]

    json_path = "../0. Daten/extrahierte_daten.json"


    # Vorhandene Daten laden oder neues Dict anlegen
    if os.path.exists(json_path):
        with open(json_path, "r", encoding="utf-8") as f:
            data = json.load(f)
    else:
        data = {}

    # Neuen Eintrag f√ºr die Pensionskasse erstellen, falls noch nicht vorhanden
    if pk_name not in data:
        data[pk_name] = {}

    # Datenpunkt hinzuf√ºgen oder aktualisieren
    data[pk_name][key] = value

    # JSON-Datei speichern
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4, ensure_ascii=False)

    print(f"Wert f√ºr '{key}' unter '{pk_name}' gespeichert.")


---

## 1. Parsing

In [None]:
# Vorbereitung f√ºr Jupyter-Notebooks
nest_asyncio.apply()

# === Parsing-Konfiguration ===

# Sicherstellen, dass der API-Key f√ºr LlamaParse geladen wurde
if not "llama_cloud_api_key" in globals() or not llama_cloud_api_key:
    raise ValueError("Die Variable 'llama_cloud_api_key' ist nicht definiert. Bitte API-Keys zuvor laden.")

# Pfad zur obersten Ordnerstruktur mit den PDF-Berichten
input_root = "pensionskassen gb"

# Relevante PDF-Dateinamen, die extrahiert werden sollen
target_filenames = {"GB.pdf", "GB_FR.pdf", "GB_IT.pdf"}

# Funktion zur Verarbeitung einer einzelnen PDF-Datei mit LlamaParse
async def parse_single_pdf(pdf_path, output_path):
    """
    Parst eine einzelne PDF-Datei mithilfe von LlamaParse in Markdown-Text
    und speichert das Ergebnis in einer .md-Datei.
    """
    parser = LlamaParse(api_key=llama_cloud_api_key, result_type="markdown")
    reader = SimpleDirectoryReader(input_files=[pdf_path], file_extractor={".pdf": parser})
    documents = await reader.aload_data()

    if not documents:
        print(f"‚ö†Ô∏è Kein Text extrahiert: {pdf_path}")
        return False

    with open(output_path, "w", encoding="utf-8") as f:
        for doc in documents:
            f.write(doc.text + "\n\n")

    print(f"‚úÖ Markdown gespeichert: {output_path}")
    return True

# Funktion zur Verarbeitung aller relevanten PDFs
async def parse_pdfs():
    """
    Durchsucht die Ordnerstruktur nach relevanten PDFs und erzeugt Markdown-Dateien,
    sofern sie noch nicht vorhanden sind.
    """
    tasks = []
    any_parsed = False  # Marker, ob etwas verarbeitet wurde

    for root, _, files in os.walk(input_root):
        for file in files:
            if file in target_filenames:
                pdf_path = os.path.join(root, file)
                md_path = os.path.join(root, "Text.md")

                # √úberspringe Datei, wenn bereits Markdown vorhanden ist
                if os.path.exists(md_path):
                    continue

                # Async-Task zur Verarbeitung hinzuf√ºgen
                tasks.append(parse_single_pdf(pdf_path, md_path))

    # F√ºhre alle Tasks parallel aus
    if tasks:
        results = await asyncio.gather(*tasks)
        any_parsed = any(results)

    # Abschluss-Info
    if any_parsed:
        print("\nüéâ Alle fehlenden PDF-Dateien wurden erfolgreich geparst.")
    else:
        print("\n‚ÑπÔ∏è Keine neuen PDF-Dateien mussten geparst werden ‚Äì alles bereits vorhanden.")

# Starte den Parsing-Prozess
await parse_pdfs()


‚ÑπÔ∏è Keine neuen PDF-Dateien mussten geparst werden ‚Äì alles bereits vorhanden.


## 2. Ingestion

In [105]:
# Initialisiere den SentenceSplitter zur Textzerlegung
parser = SentenceSplitter(chunk_size=512, chunk_overlap=64)

# Marker, ob neue Indizes erstellt wurden
any_index_created = False

# Durchlaufe alle Unterordner im input_root-Verzeichnis
for root, _, files in os.walk(input_root):
    if "Text.md" not in files:
        continue  # √úberspringen, wenn keine Text.md vorhanden ist

    storage_path = os.path.join(root, "index_storage")

    # Falls der Index bereits vorhanden ist, √ºberspringen
    if os.path.exists(storage_path):
        continue

    print(f"\nüìÇ Erzeuge Index (ohne Embedding) f√ºr: {root}")

    # Lade das Text.md-Dokument
    docs = SimpleDirectoryReader(input_files=[os.path.join(root, "Text.md")]).load_data()

    # Zerlege das Dokument in TextNodes mit automatischer Metadatenerkennung
    nodes = parser.get_nodes_from_documents(docs)

    # Erzeuge den Index ohne Embedding-Modell (reine Struktur & Metadaten)
    index = VectorStoreIndex(nodes, embed_model=None)

    # Speichere den Index dauerhaft im jeweiligen Ordner
    index.storage_context.persist(persist_dir=storage_path)

    print(f"‚úÖ Index gespeichert mit Metadaten (ohne Embeddings): {os.path.basename(root)}")
    any_index_created = True

# Abschluss-Info
if not any_index_created:
    print("‚ÑπÔ∏è Keine neuen Indizes erforderlich ‚Äì alle bereits vorhanden.")
else:
    print("üéâ Neue Indizes wurden erfolgreich erstellt und gespeichert.")

‚ÑπÔ∏è Keine neuen Indizes erforderlich ‚Äì alle bereits vorhanden.


## 3. Embedding

In [115]:
# Azure Embedding-Funktion
def get_azure_embeddings(texts):
    """
    Sendet eine Liste von Texten an die Azure OpenAI API und gibt Embeddings zur√ºck.
    """
    url = f"{azure_endpoint}/openai/deployments/{azure_embedding_deployment}/embeddings?api-version={azure_api_version}"
    headers = {"Content-Type": "application/json", "api-key": azure_api_key}
    data = {"input": texts, "model": "text-embedding-3-large"}

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        return [entry["embedding"] for entry in response.json()["data"]]
    elif response.status_code == 429:
        print("‚ö†Ô∏è Rate Limit ‚Äì warte 10 Sekunden...")
        time.sleep(10)
        return get_azure_embeddings(texts)
    else:
        raise ValueError(f"‚ùå Fehler bei Embedding: {response.status_code} - {response.text}")

# === Verarbeitung: Embeddings hinzuf√ºgen und speichern ===
any_embedded = False  # Marker, ob √ºberhaupt neue Embeddings generiert wurden

for root, _, files in os.walk(input_root):
    storage_path = os.path.join(root, "index_storage")
    if not os.path.exists(storage_path):
        continue  # Kein Index vorhanden ‚Üí √ºberspringen

    # Index laden
    storage_context = StorageContext.from_defaults(persist_dir=storage_path)
    index = load_index_from_storage(storage_context)

    # Nodes laden
    docstore = storage_context.docstore
    vector_store = storage_context.vector_store
    all_nodes = list(docstore.docs.values())

    # Check: Gibt es bereits Embeddings?
    already_embedded = [
        node.node_id in vector_store._data.embedding_dict
        for node in all_nodes
    ]

    if all(already_embedded):
        continue  # √úberspringen

    # Embeddings generieren
    texts = [node.text for node in all_nodes]
    embeddings = get_azure_embeddings(texts)

    if embeddings is None or len(embeddings) != len(all_nodes):
        print(f"‚ùå Fehler ‚Äì Anzahl Embeddings stimmt nicht")
        continue

    # Embeddings zuweisen
    for node, emb in zip(all_nodes, embeddings):
        node.embedding = emb

    # Speichern
    storage_context.vector_store.add(all_nodes)
    storage_context.persist(persist_dir=storage_path)

    print(f"‚úÖ Embeddings gespeichert f√ºr: {os.path.basename(root)}")
    any_embedded = True

# Hinweis am Schluss, wenn nichts gemacht wurde
if not any_embedded:
    print("‚ÑπÔ∏è Keine neuen Embeddings erforderlich ‚Äì alle bereits vorhanden.")


‚ÑπÔ∏è Keine neuen Embeddings erforderlich ‚Äì alle bereits vorhanden.


## 4. Querying-Funktion

In [107]:
# Embedding-Funktion √ºber Azure
def get_query_embedding(prompt):
    url = f"{azure_endpoint}/openai/deployments/{azure_embedding_deployment}/embeddings?api-version={azure_api_version}"
    headers = {"Content-Type": "application/json", "api-key": azure_api_key}
    data = {"input": [prompt], "model": "text-embedding-3-large"}

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        return response.json()["data"][0]["embedding"]
    else:
        raise ValueError(f"Fehler bei Embedding: {response.status_code} - {response.text}")

## 5. Prompting + Output

In [118]:
# Definition der Kennzahlen f√ºr die Extraktion
# Jede Kennzahl besteht aus:
# - "key": Name f√ºr die JSON-Speicherung
# - "query": Suchbegriff f√ºr semantische Embedding-Suche im PDF
# - "prompt": spezifische GPT-Aufforderung zur Antwort

metrics = {
    "verm√∂gensverwaltungskosten": {
        "key": "Verm√∂gensverwaltungskosten",
        "query": "Verm√∂gensverwaltungskosten per 31.12.2023 ‚Äì Verwaltungsaufwand der Verm√∂gensanlagen, Kostenquote, Ausweis in % der transparenten Verm√∂gensanlagen, TER, OAK",
        "prompt": load_prompt("vvk")
    },
    "bilanzsumme": {
        "key": "Bilanzsumme",
        "query": "Bilanzsumme per 31.12.2023 ‚Äì Total Aktiven, Gesamtverm√∂gen, Summe Aktiven laut Jahresrechnung oder Bilanz",
        "prompt": load_prompt("bilanzsumme")
    },
    "aktienquote": {
        "key": "Total Aktien (%)",
        "query": "Aktienquote per 31.12.2023 ‚Äì Aktien Schweiz, Ausland, Total Aktien laut Verm√∂gensaufteilung nach Anlagekategorien",
        "prompt": load_prompt("aktienquote")
    },
    "immobilienquote": {
        "key": "Total Immobilien (%)",
        "query": "Immobilienquote per 31.12.2023 ‚Äì Immobilien Schweiz, Ausland, Total Immobilien laut Verm√∂gensaufteilung nach Anlagekategorien",
        "prompt": load_prompt("immobilienquote")
    },
    "nominalwertquote": {
        "key": "Total Nominalwerte (%)",
        "query": "Nominalwertquote per 31.12.2023 ‚Äì Obligationen, Hypotheken, Liquidit√§t laut Verm√∂gensaufteilung nach Anlagekategorien",
        "prompt": load_prompt("nominalwertquote")
    },
    "alternativquote": {
        "key": "Total Alternative Anlagen (%)",
        "query": "Alternative Anlagen per 31.12.2023 ‚Äì Private Equity, Hedgefonds, Infrastruktur laut Verm√∂gensaufteilung nach Anlagekategorien",
        "prompt": load_prompt("alternativquote")
    }
}


In [None]:
# Funktion: Stelle GPT-gest√ºtzte semantische Abfrage an einen Index
def query_pensionskasse(path_to_index, retrieval_query, user_prompt, top_k=5):
    """
    F√ºhrt eine Retrieval-Augmented Generation (RAG) durch:
    1. Findet relevante Textstellen im Index basierend auf einem semantischen Suchbegriff (Query).
    2. Stellt eine gezielte Frage an GPT-4o mit diesem Kontext.

    Args:
        path_to_index (str): Pfad zum Ordner mit dem gespeicherten LlamaIndex.
        retrieval_query (str): Semantischer Suchbegriff f√ºr die Kontextsuche.
        user_prompt (str): Konkrete GPT-Aufforderung.
        top_k (int): Anzahl der Top-Kontextabschnitte f√ºr die GPT-Antwort (Default: 5).

    Returns:
        str: Die Antwort von GPT-4o als Klartext.
    """
    # 1. Index & Retriever laden
    storage_context = StorageContext.from_defaults(persist_dir=path_to_index)
    index = load_index_from_storage(storage_context)
    retriever = index.as_retriever(similarity_top_k=top_k)

    # 2. Embedding f√ºr die semantische Suche erzeugen
    retrieval_embedding = get_query_embedding(retrieval_query)
    query_bundle = QueryBundle(query_str=retrieval_query, embedding=retrieval_embedding)
    nodes = retriever.retrieve(query_bundle)

    # 3. Kontext aus den gefundenen Textstellen erstellen
    context = "\n\n".join([node.text.strip() for node in nodes])
    full_prompt = f"Kontext:\n{context}\n\nPrompt:\n{user_prompt}"

    # 4. Anfrage an GPT-4o stellen
    llm = OpenAI(model="gpt-4o", api_key=openai_api_key)
    response = llm.complete(full_prompt)

    # 5. Ausgabe f√ºr Debug-Zwecke
    pk_name = path_to_index.rstrip("/").split("/")[-1]
    print(f"üîç {pk_name} ‚Üí {retrieval_query}\nüß† GPT-4o antwortet:\n\n{response.text.strip()}")

    return response.text.strip()


# Funktion: Extrahiere alle definierten Metriken f√ºr alle Pensionskassen im Verzeichnis
def extract_all_metrics(root_dir="pensionskassen gb"):
    """
    F√ºhrt f√ºr alle vorhandenen Indexe im Zielverzeichnis eine automatische Extraktion der Metriken durch.
    Die Ergebnisse werden in einer JSON-Datei pro PDF gespeichert.

    Args:
        root_dir (str): Hauptordner mit allen Pensionskassen-Unterordnern und jeweiligen Indexen.
    """
    for folder in os.listdir(root_dir):
        pk_dir = os.path.join(root_dir, folder)
        index_dir = os.path.join(pk_dir, "index_storage")

        if not os.path.isdir(index_dir):
            continue  # Kein g√ºltiger Index vorhanden ‚Üí √ºberspringen

        for metric_id, metric in metrics.items():
            try:
                print(f"üîç {folder} ‚Üí {metric['key']}")
                
                # 1. Semantische Suche & GPT-Antwort
                response = query_pensionskasse(
                    path_to_index=index_dir,
                    retrieval_query=metric["query"],
                    user_prompt=metric["prompt"],
                    top_k=5
                )

                # 2. Extraktion des numerischen Werts aus der GPT-Antwort
                value_raw = str(response)
                value = float(value_raw.replace("'", "").replace(" ", "").replace(",", "").replace("‚Äô", ""))

                # 3. Zuordnen zum richtigen PDF (Deutsch, FR oder IT)
                pdf_path = os.path.join(pk_dir, "GB.pdf")
                if not os.path.exists(pdf_path):
                    for alt in ["GB_FR.pdf", "GB_IT.pdf"]:
                        alt_path = os.path.join(pk_dir, alt)
                        if os.path.exists(alt_path):
                            pdf_path = alt_path
                            break

                if not os.path.exists(pdf_path):
                    print(f"‚ö†Ô∏è Kein PDF gefunden f√ºr {folder}")
                    continue

                # 4. Ergebnis speichern
                save_to_data_json(pdf_path, metric["key"], value)
                print(f"‚úÖ {metric['key']}: {value}")

            except Exception as e:
                print(f"‚ö†Ô∏è Fehler bei {folder} ‚Üí {metric['key']}: {e}")


In [113]:
# === Finaler Abruf der Extraktionsfunktion ===
# ‚ùó Hinweis: Auskommentiert, da jeder Abruf GPT-4o verwendet und somit API-Kosten verursacht.
# ‚ùó Nur ausf√ºhren, wenn alle Indizes und Embeddings bereit sind.

# extract_all_metrics()