# Retrieval-Augmented Generation (RAG)

**2 Implementierung eines KI-Systems mit RAG  und Agentenkomponenten**

**2.1 RAG-System**

Wie bereits im ersten Teil des Assignments beschrieben, kommen RAG-Systeme (Retrieval-Augmented Generation) immer dann zum Einsatz, wenn externe Wissensquellen in ein Sprachmodell eingebunden werden sollen. Die Motivation liegt darin, dass ein LLM allein nur auf seinem Trainingswissen basiert, während sich durch RAG zusätzlich aktuelle oder spezialisierte Informationen berücksichtigen lassen. Das Vorgehen umfasst im Wesentlichen die Indexierung externer Daten, das semantische Retrieval relevanter Inhalte und die Generierung einer Antwort durch das LLM. Dieses Notebook zeigt exemplarisch, wie ein solches System aufgebaut wird und wie die einzelnen Bausteine zusammenwirken.



---



**Auswahl der externen Quelle**

Da eines meiner Hobbys das Anschauen von Sport ist, insbesondere **Tennis**, und wir uns derzeit in der Hochsaison befinden, in der gerade Wimbledon zu Ende gegangen ist und nun Cincinnati und die US Open direkt hintereinander folgen, habe ich beschlossen, diese Aufgabe auf diesen Bereich zu konzentrieren. Darüber hinaus hat sich nach der Ära der „Big Three“ eine neue Rivalität im Herrentennis entwickelt, die noch mehr Aufmerksamkeit und Begeisterung für diesen Sport mit sich bringt.  

Auf der Suche nach geeigneten Videos oder Dokumenten mit interessanten und relevanten Informationen für diese RAG-Aufgabe bin ich auf ein sehr gutes YouTube-Video gestoßen:  
**„Every TENNIS ERA Explained In 10 Minutes”** ([Link](https://www.youtube.com/watch?v=dxg1-JXKG5A)).  

Ich habe dieses Video transkribiert und die Transkription in mein GitHub-Repository hochgeladen ([Raw-Link](https://raw.githubusercontent.com/semauenal/assignment-rag-agentensysteme/refs/heads/main/Transcript_Every_Tennis_Era_Explained.txt)). Die Transkription umfasst **1.508 Wörter** und dient als externe Wissensquelle für das in diesem Notizbuch implementierte RAG-System.  



---



**Erklärungen zum Code**

Bevor endgültig der Code zum RAG-System gestartet wird, soll nochmal kurz erklärt werden, auf was Wert gelegt wird beim Implementieren des RAGs.

- Kommentare: Es wird versucht, verständliche, aber nicht zu viele Kommentare zu setzen.

- Erklären von Abstrahierung und Frameworks bzw. Funktionen: Vorhandene Abstraktionen oder Framework-Aufrufe werden kurz erläutert, damit klar ist, was im Hintergrund passiert.

- Immer wieder Print-Ausgaben: Zur Kontrolle werden Zwischenergebnisse ausgegeben, um den Ablauf und die Daten zu prüfen.

- Tokennutzung / Limits: Durch Parameter wie max_tokens und die Auswahl weniger Chunks wird der Tokenverbrauch begrenzt und kontrollierbar gehalten.



---



**Verbindung zu OpenAI**

In diesem Abschnitt wird die Verbindung zur OpenAI-API hergestellt. Dafür wird der API-Schlüssel aus den Secrets geladen.

Nutzung der OpenAI-Python-Bibliothek:
- Zugriff auf OpenAI-API vereinfacht
- Anstatt selbst HTTP-Anfragen an die Endpunkte (z. B. /v1/chat/completions, /v1/embeddings) zu schreiben, bietet die Bibliothek eine fertige Schnittstelle in Python
- Klasse OpenAI erstellt einen Client, der mit dem API-Key authentifiziert wird

  --> „Brücke“ zwischen dem Notebook und der OpenAI-Cloud dar und übernimmt die ganze low-level Arbeit der API-Kommunikation.



In [3]:
# API-Key aus Colab-Secrets laden
from google.colab import userdata
from openai import OpenAI
OPENAI_API_KEY = userdata.get('apikey_sm')
client = OpenAI(api_key=OPENAI_API_KEY)

**Modellkonfiguration**

Damit später bei der Generierung des Outputs nicht ständig erneut die Konfigurationen angegeben werden müssen, werden sie hier einmal zentral definiert.


In [4]:
# Konfiguration durch Variabeln
model_chat    = "gpt-4o-mini"
temperature   = 0               # steuert Kreativität
token_limit   = 4096            # internes Budget (Kontrolle, nicht das echte Kontextfenster)

top_k         = 4               # Anzahl der Chunks, die beim Retrieval zurückgegeben werden


In [5]:
# Transkript laden

# Importieren von requests: Python-Bibliothek für HTTP-Anfragen
import requests
url = "https://raw.githubusercontent.com/semauenal/assignment-rag-agentensysteme/refs/heads/main/Transcript_Every_Tennis_Era_Explained.txt"
transcript = requests.get(url).text
print(transcript[:500])  # Vorschau

﻿Back in the day, way before 1968, tennis wasn’t even a competition. It was more about tradition and having fun. The sport lived in black and white. And the players, well, it looked like they were heading to a party and not a professional game. To be fair to them, it wasn’t really a professional sport at the time, but those outfits were still outrageous. I’m talking about the pre-Open Era. This was one of the weirdest eras in the history of this sport. Why? Because this was the time when amateur


**Chunking**

Beim Aufbau eines RAG-Systems ist **Chunking** wichtig, weil große Dokumente nicht auf einmal in den Prompt passen.  
Die Texte werden in Abschnitte zerlegt, damit sie als Embeddings berechnet und später gezielt abgerufen werden können.  
Dabei gilt:  
- Chunks dürfen nicht zu kurz sein, sonst geht inhaltlicher Kontext verloren.  
- Eine **Überlappung** ist sinnvoll, damit am Rand keine relevanten Informationen abgeschnitten werden.  

Hier wurde bewusst eine eigene kleine Funktion implementiert, um den Ablauf nachvollziehbar zu machen.  
Es gibt zwar viele Frameworks (z. B. in LangChain oder LlamaIndex), die das Chunking mit einer einzigen Zeile erledigen,  
aber durch die eigene Funktion wird deutlich, wie der Prozess intern funktioniert.


Die Funktion `chunk_text` arbeitet auf Basis einer Schleife:  
- Der Text wird zuerst in Wörter aufgesplittet.  
- Mit einem Fenster von `chunk_size` Wörtern wird ein Abschnitt gebildet.  
- Jeder Abschnitt wird als String gespeichert und in die Liste `chunks` geschrieben.  
- Der Startindex verschiebt sich dann jeweils um `chunk_size - overlap`, sodass sich die Abschnitte leicht überlappen.  
- Am Ende liefert die Funktion eine Liste von Textabschnitten zurück.  



In [6]:
# Chunking
# Text in Abschnitte von ca. 500 Wörtern mit Überlappung 30 Wörter
def chunk_text(text, chunk_size=300, overlap=30):
    words = text.split()
    chunks = []
    start = 0
    while start < len(words):
        end = start + chunk_size
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        start += chunk_size - overlap
    return chunks

chunks = chunk_text(transcript)
print("Anzahl Chunks:", len(chunks))
# Vorschau der ersten 300 Zeichen des ersten Chunks
print("Beispiel Chunk:\n", chunks[0][:300])

Anzahl Chunks: 6
Beispiel Chunk:
 ﻿Back in the day, way before 1968, tennis wasn’t even a competition. It was more about tradition and having fun. The sport lived in black and white. And the players, well, it looked like they were heading to a party and not a professional game. To be fair to them, it wasn’t really a professional spo


**Embedding**

Nach dem Chunking werden die Textabschnitte in sogenannte Embeddings umgewandelt.  
Ein Embedding ist ein dichter Vektor, der die semantische Bedeutung eines Textes in einer hochdimensionalen Zahlen­darstellung abbildet. So können Inhalte mathematisch verglichen werden (z. B. über Cosine-Similarity), was für das semantische Retrieval die Grundlage bildet.  

Um die Chunks in Embeddings umzuwandeln wird der OpenAI-Client genutzt,  weil er einiges schon übernimmt, was man mit requests selbst machen müsste, wie zum Beispiel Header setzen, JSON bauen und Antworten parsen. Dadurch spart man Code und Fehlerquellen, abstrahiert aber auch nicht zu viel. LangChain wäre zwar noch kürzer, nimmt einem aber viele Details ab und versteckt den Ablauf stärker.


Gewähltes Modell  

Für die Embedding-Erstellung wird das Modell **`text-embedding-3-small`** von OpenAI genutzt.  
- Vorteil: kostengünstig und gleichzeitig qualitativ gut genug für ein mittellanges Transkript.  
- Output: pro Textabschnitt ein Vektor mit 1536 Dimensionen.  
  - Jede Dimension ist ein Zahlenwert, der eine bestimmte latente Eigenschaft des Textes repräsentiert.  
  - Man kann es sich wie einen „Punkt im 1536-dimensionalen Raum“ vorstellen, der die Bedeutung des Textes mathematisch beschreibt.  

Abstraktionen im Code  

- **`client.embeddings.create(...)`**:  
  API-Aufruf an den OpenAI-Endpunkt `/v1/embeddings`.  
  Abstrahiert werden: Authentifizierung, Aufbau der Anfrage, Versand und Empfang der JSON-Antwort.  
- **`np.array(vector)`**:  
  Umwandlung der zurückgegebenen Python-Liste in ein NumPy-Array → effizientere Verarbeitung.  
- **`np.vstack(embeddings)`**:  
Nimmt die einzelnen Embedding-Vektoren (jeder ist eine Liste mit 1536 Zahlen) und legt sie untereinander in eine Tabelle.
Ergebnis: eine 2D-Matrix mit der Form (Anzahl Chunks, 1536).
  - Jede Zeile = ein Chunk-Embedding
  - Jede Spalte = eine bestimmte Dimension des Vektorraums


Zusammenfassung  

In diesem Schritt wird also für jeden Chunk ein Embedding erzeugt und zu einer gemeinsamen Matrix zusammengefügt.  
Dies ist der zentrale Zwischenschritt, um später Anfragen mit denselben Methoden in denselben Vektorraum einzubetten und semantisch passende Chunks zurückzufinden.


In [7]:
import numpy as np
# Embeddings berechnen
embeddings_transcript = []
for chunk in chunks:
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=chunk
    )
    vector = response.data[0].embedding
    embeddings_transcript.append(np.array(vector))

embeddings_transcript = np.vstack(embeddings_transcript)

# shape von NumPy-Arrays zeigt Form des Arrays an (Dimensionen)
print(f"Anzahl Embeddings: {embeddings_transcript.shape[0]}")
print(f"Länge pro Embedding-Vektor: {embeddings_transcript.shape[1]} Dimensionen")
# Als Test anzeigen der ersten 10 Dimensionen des ersten Chunks
print(embeddings_transcript[0][:10])

Anzahl Embeddings: 6
Länge pro Embedding-Vektor: 1536 Dimensionen
[-0.00377981  0.02224585 -0.00795002  0.01334467  0.04593974 -0.00183667
  0.00059181 -0.02226005 -0.02113853  0.01954852]


**Retrieval**

Nach Umwandlung der Chunks in Embeddings, folgt Retrieval-Schritt. Ziel: Aus allen gespeicherten Embeddings die relevantesten Chunks zu einer Anfrage finden.

Für diesen Code wurde bewusst ein experimenteller Ansatz gewählt. Statt sofort auf integrierte Lösungen zurückzugreifen, werden hier zwei Hilfsfunktionen selbst definiert. Dadurch wird die Logik nachvollziehbarer.


---


**Cosine-Similarity**

Die Cosine-Similarity kommt aus der linearen Algebra und Geometrie.
Sie misst den Winkel zwischen zwei Vektoren.
Je kleiner der Winkel, desto ähnlicher die Vektoren.
- Formal:  

$$
\cos(\theta) = \frac{a \cdot b}{||a|| \cdot ||b||}
$$  

mit $a \cdot b$ = Skalarprodukt und $||a||$ = Länge (Norm) des Vektors.  

*Wertebereich*
- **1** → Vektoren zeigen exakt in dieselbe Richtung → Inhalte sehr ähnlich  
- **0** → Vektoren stehen senkrecht (orthogonal) → keine Ähnlichkeit  
- **-1** → Vektoren zeigen in entgegengesetzte Richtungen → theoretisch maximale Gegensätzlichkeit (bei Embeddings fast nie relevant)  

*Umsetzung im Code*
- `a` und `b` = zwei Vektoren (z. B. Query-Embedding und ein Chunk-Embedding)  
- `np.dot(a, b)` = Skalarprodukt (Summe der komponentenweisen Multiplikationen)  
- `np.linalg.norm(a)` = Länge des Vektors *a*, berechnet als Wurzel der Summe der Quadrate aller Dimensionen  

- Eine kleine Konstante `1e-12` wird hinzugefügt, um Division durch Null zu vermeiden.  

**Retrieval-Funktion**
retrieve_top_k:
- Query wird ebenfalls in ein Embedding umgewandelt
- Query-Embedding wird mit allen Chunk-Embeddings verglichen
- Die besten Treffer (Top-k) werden sortiert und zurückgegeben.


In [8]:
# Cosine Similarity zwischen zwei Vektoren
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
    return float(np.dot(a, b) / ((np.linalg.norm(a) * np.linalg.norm(b)) + 1e-12))

# Retrieval-Funktion: Top-k ähnlichste Chunks
def retrieve_top_k(query: str, embeddings: np.ndarray, chunks: list[str], k: int):
    # Query → Embedding
    embedding_response = client.embeddings.create(model="text-embedding-3-small", input=query)
    q_vec = np.array(embedding_response.data[0].embedding, dtype=np.float32)

    # Ähnlichkeiten berechnen
    sims = [cosine_similarity(q_vec, emb) for emb in embeddings]

    # Sortieren & Top-k wählen
    top_ids = np.argsort(sims)[::-1][:k]
    return [(i, sims[i], chunks[i]) for i in top_ids]


# Testaufruf (zur Kontrolle)
query_test = "Who are the Big Three in tennis?"
results = retrieve_top_k(query_test, embeddings_transcript, chunks, top_k)

for idx, score, txt in results:
    print(f"Chunk {idx} | Score = {score:.3f}\n'{txt[:200]}...'\n")


Chunk 2 | Score = 0.624
'set the tone for what was coming next, as it was the first time the best players were put up against each other for the world to watch. But what came next was something that nobody expected. After the...'

Chunk 3 | Score = 0.576
'than you’ll ever find anywhere else. But let’s not forget about one other guy who still left his mark in this era despite playing with such greats. Had he not played in the same era as the Big Three, ...'

Chunk 4 | Score = 0.515
'with a clean game and a cheeky celebration at the end. Alexander Zverev, with his massive serve and lethal forehand, also showed promise. He won an Olympic gold but is still hunting that all-important...'

Chunk 1 | Score = 0.498
'in 1968. The pros and amateurs were allowed to play together in the same tournaments. And let me say this, things got really spicy really quickly. Money started flowing into the sport. Television came...'



**Generierung des Outputs**

Nun geht es um den letzten Teil der RAG-Pipeline. Es wurde bewusst die Variante über den **OpenAI-Client** gewählt, anstatt Frameworks wie LangChain zu nutzen.  Ziel war es, später mit nur einer Zeile (`ask(...)`) Antworten generieren zu können.  Dabei lag der Fokus besonders auf dem Einbau von Kontrollmechanismen, um sicherzustellen,  dass das Tokenbudget nicht überschritten wird.  


Im Folgenden wird der Code etwas detaillierter erklärt:

- **Parameter**:  
  - `question`: Nutzerfrage  
  - `embeddings`, `chunks_in`: vorbereitete Datenbasis aus Embeddings + Textabschnitten  
  - `k`: wie viele Top-Chunks beim Retrieval genutzt werden  
  - `max_ctx_chars`: begrenzt die Länge des eingebauten Kontexts  
  - `max_answer_tokens`: maximale Länge der Modell-Antwort  

- **Ablauf**:  
  1. Die Nutzerfrage wird eingebettet und mit allen Chunks verglichen → Top-k relevante Chunks.  
  2. Der Kontext wird aus diesen Chunks zusammengesetzt, aber auf `max_ctx_chars` Zeichen begrenzt.  
  3. Prompt wird erstellt:  
     - *System-Message*: definiert Regeln („antworte nur aus dem Kontext“).  
     - *User-Message*: enthält Kontext + Nutzerfrage.  
  4. Mit `client.chat.completions.create` wird das LLM aufgerufen.  
  5. Die erste Modellantwort wird extrahiert und als String zurückgegeben.  

Damit ist die komplette RAG-Pipeline abgeschlossen: **eine Nutzerfrage genügt, und das System führt Retrieval, Kontextbau und Antwortgenerierung automatisch aus.*


In [9]:
# Pipeline-Funktion: Retrieval → LLM
def ask(question: str,
        embeddings: np.ndarray = embeddings_transcript,
        chunks_in: list[str] = chunks,
        k: int = top_k,
        max_context_chars: int = 3000,
        max_answer_tokens: int = 300):

    # Top-k Chunks holen
    retrieved = retrieve_top_k(question, embeddings, chunks_in, k)

    # Kontext zusammenbauen (begrenzt auf max_context_chars)
    context_parts, used = [], 0
    for idx, score, txt in retrieved:
        if used + len(txt) > max_context_chars:
            txt = txt[: max_context_chars - used]
        context_parts.append(txt.strip())
        used += len(txt)
        if used >= max_context_chars:
            break
    context = "\n\n---\n\n".join(context_parts)

    # Prompt definieren
    system_msg = (
        "You are a concise assistant. Answer ONLY using the provided context. "
        "If the answer is not in the context, say you don't know."
    )
    user_msg = f"Context:\n{context}\n\nQuestion: {question}\n\nAnswer in 3–6 sentences."

    # OpenAI-Call
    completion = client.chat.completions.create(
        model=model_chat,
        temperature=temperature,
        max_tokens=min(max_answer_tokens, token_limit),
        messages=[
            {"role": "system", "content": system_msg},
            {"role": "user", "content": user_msg},
        ],
    )

    return completion.choices[0].message.content

In [10]:
# Beispielfragen


#answer = ask("Fasse die Entwicklung des Tennissports von der Pre-Open Era bis heute in 3–4 Sätzen zusammen.")
#answer = ask("Welche Spielerinnen prägten das moderne Frauentennis und wie unterschieden sie sich?")
#answer = ask("Welche jungen Spieler werden als die Zukunft des Tennissports genannt und warum?")
answer = ask("Welche Spieler gehören zu den Big Three und wodurch zeichnen sie sich jeweils aus?")
print(answer)

Die Spieler der Big Three sind Roger Federer, Rafael Nadal und Novak Djokovic. Roger Federer zeichnet sich durch seinen butterweichen Spielstil und die schönste einhändige Rückhand aus; er gewann 20 Grand Slams und dominierte auf Rasenplätzen. Rafael Nadal, der König des Sandplatzes, ist bekannt für seine unermüdliche Energie und seinen Kampfgeist, was ihm 14 French Open Titel einbrachte. Novak Djokovic ist für seine außergewöhnliche mentale Stärke bekannt und hat zahlreiche Rekorde gebrochen, darunter die meisten Wochen als Nummer eins der Welt. Gemeinsam haben sie über 60 Grand Slam Titel gewonnen und zahlreiche unvergessliche Duelle ausgetragen.


# Agentensysteme

**Agent 1 – Structured Output**

Dieser Agent extrahiert strukturierte Informationen aus dem Transkript und gibt sie als JSON strukturiert zurück.

Das Besondere: Die Modellantwort wird auf ein festgelegtes Schema gezwungen und anschließend validiert. Dadurch entsteht ein Output, der sowohl für Menschen lesbar (Tabelle) als auch maschinell weiterverarbeitbar ist.

Ablauf im Code:
1. Kontextaufbau
    - Mit retrieve_top_k(...) werden die semantisch relevantesten Chunks zur Nutzerfrage geholt
    - Chunks werden zu einem Kontextstring (ctx) zusammengefügt, getrennt durch ---.   
    → So „sieht“ das Modell nur die Textpassagen, die wahrscheinlich relevant sind.

2. Schema-Vorgabe
- Es wird ein JSON-Schema (schema_hint) definiert:
  - type: array → die Ausgabe muss eine Liste von Objekten sein
  - properties → welche Felder erwartet werden (z. B. "entity", "category", "evidence")
  - max_items → begrenzt die Anzahl der Einträge

- Diese Vorgabe wird ins Prompt eingebaut, damit das Modell gezwungen ist, die Antwort strikt im JSON-Format zu liefern.

3. System- & User-Message
- System-Message: Weist das Modell strikt an, nur JSON ohne Prosa oder Markdown zurückzugeben.
- User-Message: Enthält den zusammengesetzten Kontext, die Frage, die Aufgabenbeschreibung und das informelle Schema.


4. LLM-Call (client.chat.completions.create)
- Verwendet dein definiertes Modell (model_chat) und temperature=0 für deterministische Ergebnisse.
- max_tokens ist begrenzt, damit die Antwort nicht zu lang wird.

5. Parsing & Validierung
- Mit json.loads(raw) wird die Antwort in ein Python-Objekt umgewandelt.
- Falls das Modell nicht exakt im Schema antwortet → Fehlerbehandlung mit Fallback.
- Für jeden Eintrag werden die Felder entity, category, evidence extrahiert und bereinigt.

6. Tabellarische Ausgabe
- Die Liste wird in ein pandas.DataFrame umgewandelt.



In [None]:
# Agent 1: Structured Output
import json
import pandas as pd

def agent_structured_summary(question: str,
                             fields=("entity", "category", "evidence"),
                             max_items: int = 5):
    """
    Structured Output Agent:
    - Holt Top-k Chunks (semantisches Retrieval)
    - Erzwingt JSON-Schema in der Modellantwort
    - Validiert JSON; gibt (list[dict], DataFrame) zurück
    """
    # Kontext via semantisches Retrieval
    retrieved = retrieve_top_k(question, embeddings_transcript, chunks, top_k)
    ctx = "\n\n---\n\n".join([txt for _, _, txt in retrieved])

    # Striktes JSON-Prompt
    schema_hint = {
        "type": "array",
        "max_items": max_items,
        "items": {
            "type": "object",
            "properties": {f: {"type": "string"} for f in fields},
            "required": list(fields),
            "additionalProperties": False
        }
    }
    system_msg = (
        "You output ONLY valid JSON that matches the given schema. "
        "No prose, no markdown, no comments."
    )
    user_msg = (
        f"Context:\n{ctx}\n\n"
        f"Task: Extract up to {max_items} items relevant to the question.\n"
        f"Question: {question}\n\n"
        f"JSON schema (informal): {json.dumps(schema_hint)}\n"
        f"Return ONLY a JSON array, e.g. "
        f"[{{\"{fields[0]}\": \"...\", \"{fields[1]}\": \"...\", \"{fields[2]}\": \"...\"}}]"
    )

    # LLM-Call
    resp = client.chat.completions.create(
        model=model_chat,
        temperature=0,
        max_tokens=min(400, token_limit),
        messages=[
            {"role": "system", "content": system_msg},
            {"role": "user", "content": user_msg},
        ],
    )
    raw = resp.choices[0].message.content.strip()

    # Validieren/Parsen
    try:
        data = json.loads(raw)
        if not isinstance(data, list):
            raise ValueError("Top-level JSON ist kein Array.")
        # weiche Schema-Prüfung
        cleaned = []
        for item in data[:max_items]:
            row = {k: str(item.get(k, "")).strip() for k in fields}
            cleaned.append(row)
    except Exception as e:
        # Fallback: leere Liste zurückgeben mit Fehlerhinweis
        cleaned = []
        print("JSON-Parse/Schema-Fehler:", e)
        print("Rohantwort war:\n", raw)

    # Optional: Tabelle
    df = pd.DataFrame(cleaned, columns=list(fields))
    return cleaned, df

# Beispielaufrufe
# items, df = agent_structured_summary("Nenne Big Three + Kategorie + kurze Evidenz aus dem Text")
# display(df)


**Agent 2 – Wetter-Agent**

Dieser Agent kombiniert ein LLM (über den bestehenden OpenAI-Client) mit einem
externen Tool (Open-Meteo API). Damit wird er zu einem „Agenten“, weil er nicht
nur Text generiert, sondern eigenständig Aktionen ausführen darf.

Struktur:
1. WeatherTool
   - Stellt ein einzelnes Tool bereit: Wetterdaten abfragen über Open-Meteo.
   - Enthält:
     * Geocoding (Stadtname → Koordinaten, via Open-Meteo Geocoding API)
     * Forecast (Koordinaten → aktuelles Wetter, Temperatur, Wind etc.)
   - Definiert zudem eine Tool-Spezifikation (JSON-Schema), die dem LLM erklärt,
     wie das Tool aufzurufen ist (Name, Beschreibung, Parameter).

2. WeatherAgent
   - Orchestrator, der Nachrichten zwischen User, LLM und Tool koordiniert.
   - Ablauf:
     a) User gibt Eingabe („Wie ist das Wetter in Mailand?“).
     b) Agent ruft das LLM an (client.chat.completions.create), übergibt System-
        Prompt + User-Eingabe + Tool-Spezifikation.
     c) Falls das LLM Tool-Aufrufe zurückgibt (Function Calling), führt der Agent
        die Python-Funktion (WeatherTool.run) aus und hängt deren Ergebnisse als
        „tool“-Nachrichten an den Nachrichtenverlauf an.
     d) Der Agent fragt das LLM erneut, bis es eine finale Antwort generiert.
   - Es gibt eine Schleife mit max. 4 Runden, um mehrfaches Nachfragen/Toolen
     zu ermöglichen.

3. CLI-Demo (main)
   - Startet den Agenten im Terminal.
   - User kann interaktiv Fragen stellen.
   - Der Agent antwortet, nutzt dabei bei Bedarf das Tool.

Besonderheiten:
- Der API-Key wird **nicht** im Agenten gehandhabt. Stattdessen nutzt er den
  bestehenden OpenAI-Client (`client`) und das konfigurierte Modell (`model_chat`).
- Das Tool nutzt frei verfügbare Open-Meteo-Endpunkte, kein Key nötig.
- Mit der Hilfsfunktion `_tool_calls_from_msg` werden die Tool-Calls des SDK
  normalisiert (immer mit `type: "function"`), um API-Fehler zu vermeiden.
- Der Agent liefert Antworten in normalem Text (LLM-Generierung), angereichert
  mit echten Wetterdaten.


In [23]:

# Agent 2 – Wetter-Agent

import json
import requests
from typing import Any, Dict, List


# Wetter-Tool (Open-Meteo)
class WeatherTool:
    """Holt aktuelle Wetterdaten über Open-Meteo (kein API-Key nötig)."""

    GEO_URL = "https://geocoding-api.open-meteo.com/v1/search"
    WX_URL = "https://api.open-meteo.com/v1/forecast"

    def spec(self) -> Dict[str, Any]:
        return {
            "type": "function",
            "function": {
                "name": "weather",
                "description": "Gibt aktuelles Wetter für eine Stadt zurück.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "city": {"type": "string", "description": "Stadtname"},
                        "country": {"type": "string", "description": "optional: Land (2-Letter Code)"},
                    },
                    "required": ["city"],
                },
            },
        }

    def run(self, args: Dict[str, Any]) -> Dict[str, Any]:
        city = str(args.get("city", "")).strip()
        country = str(args.get("country", "")).strip() or None
        if not city:
            return {"error": "city missing"}

        try:
            # Stadt → Koordinaten
            params = {"name": city, "count": 1}
            if country:
                params["country"] = country
            geo_resp = requests.get(self.GEO_URL, params=params, timeout=10)
            geo_resp.raise_for_status()
            geo = geo_resp.json()
            if not geo.get("results"):
                return {"error": "city not found"}
            r0 = geo["results"][0]
            lat, lon = r0["latitude"], r0["longitude"]

            # Koordinaten → aktuelles Wetter
            wx_resp = requests.get(
                self.WX_URL,
                params={"latitude": lat, "longitude": lon, "current_weather": True},
                timeout=10,
            )
            wx_resp.raise_for_status()
            current = wx_resp.json().get("current_weather", {})

            return {
                "city": r0.get("name"),
                "country": r0.get("country_code"),
                "latitude": lat,
                "longitude": lon,
                "temperature_c": current.get("temperature"),
                "wind_speed": current.get("windspeed"),
                "wind_dir": current.get("winddirection"),
                "time": current.get("time"),
            }
        except Exception as e:
            return {"error": f"weather failed: {e}"}


# Function-Calling Spezifikation
def weather_tool_spec() -> Dict[str, Any]:
    return WeatherTool().spec()

# Wetter-Agent (nutzt deinen bestehenden `client` & `model_chat`)
class WeatherAgent:
    def __init__(self, client, model: str, temperature: float = 0):
        """
        client  : dein OpenAI-Client (z. B. from openai import OpenAI; client = OpenAI())
        model   : Modellname (z. B. "gpt-4o-mini")
        """
        self.client = client
        self.model = model
        self.temperature = temperature
        self.tool = WeatherTool()
        self.system_prompt = "Du bist ein Assistent, der bei Wetterfragen das Wetter-Tool nutzt."

    def _tool_calls_from_msg(self, msg) -> List[Dict[str, Any]]:
        """
        Kompatibilitätsschicht:
        - Neues OpenAI-SDK: msg.tool_calls (Objekte)
        - Manche Rückgaben evtl. als dict
        Gibt eine Liste normalisierter tool_calls zurück: [{id, function:{name, arguments}}]
        """
        tc = getattr(msg, "tool_calls", None)
        if not tc:
            return []
        norm = []
        for t in tc:
            # OpenAI SDK
            if hasattr(t, "id"):
                norm.append({
                    "id": t.id,
                    "type": "function",
                    "function": {
                        "name": t.function.name,
                        "arguments": t.function.arguments or "{}",
                    },
                })
            else:
                # dict-ähnlich → sicherstellen, dass 'type' gesetzt ist
                d = dict(t)
                d.setdefault("type", "function")
                norm.append(d)
        return norm

    def run(self, user_input: str) -> str:
        messages: List[Dict[str, Any]] = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": user_input},
        ]

        for _ in range(4):
            completion = self.client.chat.completions.create(
                model=self.model,
                temperature=self.temperature,
                messages=messages,
                tools=[self.tool.spec()],
                tool_choice="auto",
            )
            choice = completion.choices[0]
            msg = choice.message

            tool_calls = self._tool_calls_from_msg(msg)
            if tool_calls:
                # Die vom Modell gewünschten Tool-Aufrufe ausführen
                # und als 'tool'-Nachrichten antworten
                messages.append({
                    "role": "assistant",
                    "content": None,
                    "tool_calls": tool_calls,
                })
                for tc in tool_calls:
                    name = tc["function"]["name"]
                    args_json = tc["function"].get("arguments", "{}")
                    try:
                        args = json.loads(args_json) if isinstance(args_json, str) else (args_json or {})
                    except json.JSONDecodeError:
                        args = {}
                    output = self.tool.run(args)
                    messages.append({
                        "role": "tool",
                        "tool_call_id": tc["id"],
                        "name": name,
                        "content": json.dumps(output, ensure_ascii=False),
                    })
                continue  # eine weitere Runde mit angereichertem Kontext

            # Finale Assistant-Antwort
            final = msg.content
            if final:
                return final.strip()

        return "Keine Antwort erhalten."


# CLI-Demo
def main():
    try:
        client  # type: ignore  # existiert schon?
        model_chat  # type: ignore
    except NameError:
        raise RuntimeError(
            "Bitte definiere vorher `client` (OpenAI-Client) und `model_chat`, "
            "oder kommentiere den Fallback im main() ein."
        )

    agent = WeatherAgent(client, model_chat)
    print("Wetter-Agent gestartet. Frage mich nach dem Wetter (Strg+C beendet).\n")
    while True:
        try:
            user_input = input("> ").strip()
            if not user_input:
                continue
            answer = agent.run(user_input)
            print(f"\nAgent: {answer}\n")
        except KeyboardInterrupt:
            print("\nTschüss!")
            break


if __name__ == "__main__":
    main()


Wetter-Agent gestartet. Frage mich nach dem Wetter (Strg+C beendet).

> Wie ist das Wetter in Mailand

Agent: Das aktuelle Wetter in Mailand, Italien, ist wie folgt:

- **Temperatur:** 19,4 °C
- **Windgeschwindigkeit:** 6,0 km/h
- **Windrichtung:** 123°

Wenn du weitere Informationen benötigst, lass es mich wissen!

> Brauche ich einen Regenschirm? 

Agent: Um dir zu sagen, ob du einen Regenschirm brauchst, benötige ich den Namen deiner Stadt oder deines Standorts. Bitte teile mir diese Information mit!

> In Mailand

Agent: Das aktuelle Wetter in Mailand (Italien) ist wie folgt:

- **Temperatur:** 19,4 °C
- **Windgeschwindigkeit:** 6,0 km/h
- **Windrichtung:** 123°

Wenn du weitere Informationen benötigst, lass es mich wissen!

> Brauche ich einen Regenschirm in Mailand?

Agent: Das aktuelle Wetter in Mailand zeigt eine Temperatur von 19,4 °C und es gibt keine Hinweise auf Regen. Daher benötigst du heute keinen Regenschirm.

> Danke

Agent: Gern geschehen! Wie kann ich Ihnen weiterhel

**Agent 3 – Prompt-Chaining**

Dieser Agent dient als mehrstufiger Orchestrator für Aufgaben im Tennis-Kontext. Er kombiniert:
- LLM-Planung & Textproduktion (Prompt-Chaining)
- faktenbasierte Antworten aus deinem RAG-System, das auf dem Tennis-Transkript (YouTube-Video) aufsetzt.

So entstehen strukturierte, nachvollziehbare Ergebnisse: Planung → Ausführung (inkl. RAG) → Review → sauberer Markdown-Output.


Abhängigkeiten & Kontext

- Nutzen von bereits initialisierten OpenAI-Client (client) + Modell (model_chat) sowie Globale Settings (temperature, token_limit)
- Verwendung der RAG-Funktion ask(query):
ask(...) macht Retrieval → baut Kontext → ruft LLM → liefert die Antwort nur auf Basis des Transkripts

Architektur im Überblick
1. Helpers
  - _chat_llm(...): Dünner Wrapper um client.chat.completions.create(...) für normale Textausgaben (z. B. Finale).
  - _chat_json(...): Erzwingt wohlgeformtes JSON (über response_format={"type": "json_object"}); hat Fallbacks (Regex-Extraktion + sichere Defaults), damit die Pipeline nicht bei JSON-Fehlern stoppt.
  - _extract_json(...): Schneidet JSON aus einer gemischten Antwort (erste { bis letzte }).

2. Agent-Klasse TennisRAGAgent
Der Agent kapselt fünf Phasen (Methoden) und eine run(...)-Pipeline:
  - understand(task): Extrahiert Ziele, Kriterien, Annahmen, Constraints als JSON.
  → sorgt für klaren Auftrag und Bewertungsmaßstäbe.

  - plan(task, understanding): Baut einen   Schritte-Plan (max. 6). Jeder Schritt hat mode:
    - "rag" → Schritt muss die Quelle nutzen (führt später ask(query) aus)
    - "llm" → reicht LLM-Begründung/Struktur/Meta
  Dazu: query (für RAG) oder instruction (für LLM) + output_spec (erwartete Ausgabeform).

- execute(task, plan, understanding): Führt alle Schritte sequenziell aus:
    - RAG: ruft ask(query) auf (bei Liste von Queries: iteriert und kombiniert Ergebnisse).
    - LLM: generiert Text anhand instruction und output_spec. Die Zwischenergebnisse werden gesammelt (pro Schritt: step_id, title, mode, output).

- review(...): Strenger Self-Check als JSON (passed, issues, fixes, quality_score_0_10).
Kriterien: Quellenkonsistenz (bei RAG), Klarheit, Kürze, Logik.

- finalize(...): Baut kompakten Markdown-Output (Deutsch, klare Gliederung, Wortlimit).
Integriert nötige Fixes aus review.

- run(...): Orchestriert die komplette Kette:
    1. understand → 2. plan → 3. execute → 4. review → 5. finalize und gibt ein Dictionary mit allen Zwischenergebnissen + final_markdown zurück.

Interaktion Agent und RAG
- In der Plan-Phase erzwingt der Prompt, dass bei faktenpflichtigen Schritten mode: "rag" gesetzt wird und eine konkrete query definiert ist.

- In der Execute-Phase erkennt der Agent mode == "rag" und ruft ask(query) auf → damit wird wirklich das Transkript befragt (semantisches Retrieval, kontrollierter Kontext, Antwort nur aus Quelle).
- Für Meta-Aufgaben (z. B. Struktur bündeln, kurze Einordnung) nutzt der Agent mode == "llm".

Ergebnis: Transparenz, Nachvollziehbarkeit und Faktenbindung dort, wo es wichtig ist.

Token-Kontrolle & Robustheit
- _chat_llm wählt ein konservatives max_tokens-Default (abhängig vom globalen token_limit), damit man nicht aus Versehen über Budget geht.
- JSON-Robustheit (_chat_json):
    - Erzwingt JSON-Antworten (falls Modell/SDK das Feature unterstützt).
    - Hat Fallbacks (Regex-Extraktion, sichere leere Defaults), damit die Pipeline nicht mit JSONDecodeError abbricht.

- Wortlimit im finalize(...): Der finale Markdown bleibt kurz & lesbar; man kann max_words im run(...) setzen.


In [28]:
# Agent 3 – Prompt-Chaining mit RAG-Interaktion


import re
import json
from typing import List, Dict, Any, Optional
from IPython.display import Markdown, display


def _extract_json(s: str) -> str:
    """Versucht JSON aus einem beliebigen String zu schneiden (erstes { bis letztes })."""
    if not s:
        return ""
    i = s.find("{")
    j = s.rfind("}")
    return s[i:j+1] if (i != -1 and j != -1 and j > i) else s

def _chat_llm(
    messages: List[Dict[str, str]],
    *,
    temp: Optional[float] = None,
    max_tokens: Optional[int] = None
) -> str:
    """Normaler Chat (Text). Nutzt globale Variablen aus deinem Setup."""
    t = temperature if temp is None else temp
    # Budget, um Überschreitungen zu vermeiden
    mt = max_tokens if max_tokens is not None else min(800, max(128, token_limit - 1200))
    resp = client.chat.completions.create(
        model=model_chat,
        temperature=t,
        max_tokens=mt,
        messages=messages,
    )
    return (resp.choices[0].message.content or "").strip()

def _chat_json(
    messages: List[Dict[str, str]],
    *,
    temp: float = 0.0,
    max_tokens: Optional[int] = None
) -> Dict[str, Any]:
    """
    JSON-erzwungener Chat:
      1) response_format={"type":"json_object"} (falls unterstützt)
      2) Fallback: JSON per _extract_json ausschneiden und parsen
      3) Fallback: {} zurückgeben statt Exception
    """
    mt = max_tokens if max_tokens is not None else min(800, max(128, token_limit - 1200))
    try:
        resp = client.chat.completions.create(
            model=model_chat,
            temperature=temp,
            max_tokens=mt,
            messages=messages,
            response_format={"type": "json_object"},
        )
        raw = (resp.choices[0].message.content or "").strip()
        return json.loads(raw) if raw else {}
    except Exception:
        raw = _chat_llm(messages, temp=temp, max_tokens=mt)
        try:
            return json.loads(_extract_json(raw)) if raw else {}
        except json.JSONDecodeError:
            return {}


class TennisRAGAgent:
    """
    Prompt-Chaining für Tennis – mit expliziter RAG-Interaktion.
    Schritt-Schema (Plan):
    {
      "id": "S1",
      "title": "Kurzer Titel",
      "mode": "rag" | "llm",
      "query": "RAG-Frage(n) ..."          # für mode="rag" (str oder Liste[str])
      "instruction": "LLM-Anweisung ..."   # für mode="llm"
      "output_spec": "Was genau erwartet wird (knapp)."
    }
    """

    # 1) Verständnis
    def understand(self, task: str) -> Dict[str, Any]:
        msgs = [
            {"role": "system", "content": "You are a tennis RAG task analyst. Be precise, concise, German output."},
            {"role": "user", "content": f"""Aufgabe: {task}

Extrahiere NUR als JSON:
- goals: 1–3 Hauptziele im Tennis-Kontext
- criteria: messbare Erfolgs-/Akzeptanzkriterien
- assumptions: sinnvolle Annahmen (z. B. Fokus Herren, Zeitraum, Turniere)
- constraints: relevante Nebenbedingungen (Stil: prägnant; Deutsch; Länge; Struktur)
"""}]
        data = _chat_json(msgs, temp=0.1)
        return {
            "goals": data.get("goals", []),
            "criteria": data.get("criteria", []),
            "assumptions": data.get("assumptions", []),
            "constraints": data.get("constraints", []),
        }

    # 2) Plan
    def plan(self, task: str, understanding: Dict[str, Any], max_steps: int = 6) -> List[Dict[str, Any]]:
        msgs = [
            {"role": "system", "content": "You are a pragmatic planner for a Tennis RAG. Return ONLY a JSON array of steps."},
            {"role": "user", "content": f"""Aufgabe: {task}
Kontext (JSON):
{json.dumps(understanding, ensure_ascii=False, indent=2)}

Erzeuge höchstens {max_steps} Schritte. Jeder Schritt:
- id (S1..), title
- mode: "rag" (nutzt ask(query)) oder "llm"
- query (für "rag") ODER instruction (für "llm")
- output_spec (knapp definieren)

Nutze "rag" bei faktenpflichtigen Passagen (Epochen/Spieler/Rivalitäten, Turniere, historische Aussagen).
Antworte NUR mit JSON-Liste.
"""}]
        data = _chat_json(msgs, temp=0.1)
        if isinstance(data, dict):  # falls das Modell ein Objekt statt Liste liefert
            data = [data] if data else []
        return data if isinstance(data, list) else []

    # Ausführen
    def execute(self, task: str, plan: List[Dict[str, Any]], understanding: Dict[str, Any]) -> List[Dict[str, Any]]:
        results: List[Dict[str, Any]] = []
        for step in plan:
            mode = (step.get("mode") or "").lower().strip()
            output = ""

            if mode == "rag":
                q = step.get("query")
                if isinstance(q, list):
                    parts = []
                    for subq in q:
                        try:
                            parts.append(ask(subq))  # deine RAG-Pipeline
                        except Exception as e:
                            parts.append(f"[RAG-Fehler: {e}]")
                    output = "\n\n".join(parts)
                else:
                    try:
                        output = ask(q or "")
                    except Exception as e:
                        output = f"[RAG-Fehler: {e}]"

            elif mode == "llm":
                instruction = step.get("instruction") or ""
                spec = step.get("output_spec") or "Knappe, präzise Antwort."
                msgs = [
                    {"role": "system", "content": "You are a concise tennis analyst. German output."},
                    {"role": "user", "content": f"Aufgabe: {task}\nAnweisung: {instruction}\nErwartung: {spec}"},
                    {"role": "assistant", "content": f"Zwischenergebnisse:\n{json.dumps(results, ensure_ascii=False)[:1200]}"},
                ]
                output = _chat_llm(msgs, temp=0.2)

            else:
                output = "[Planfehler: mode muss 'rag' oder 'llm' sein]"

            results.append({
                "step_id": step.get("id"),
                "title": step.get("title"),
                "mode": mode,
                "output": output
            })
        return results

    # 4) Review
    def review(self, task: str, understanding: Dict[str, Any], plan: List[Dict[str, Any]], exec_results: List[Dict[str, Any]]) -> Dict[str, Any]:
        msgs = [
            {"role": "system", "content": "You are a meticulous reviewer for Tennis RAG outputs. Be strict and specific."},
            {"role": "user", "content": f"""Task: {task}
Understanding:
{json.dumps(understanding, ensure_ascii=False, indent=2)}
Plan:
{json.dumps(plan, ensure_ascii=False, indent=2)}
Results:
{json.dumps(exec_results, ensure_ascii=False, indent=2)}

Return ONLY JSON:
{{"passed": true/false, "issues": ["..."], "fixes": ["..."], "quality_score_0_10": <int>}}
Fokussiere auf: Konsistenz mit Quelle (RAG-Schritte), Klarheit, Kürze, logische Struktur.
"""}]
        data = _chat_json(msgs, temp=0)
        return {
            "passed": bool(data.get("passed", False)),
            "issues": data.get("issues", []),
            "fixes": data.get("fixes", []),
            "quality_score_0_10": data.get("quality_score_0_10", 0),
        }

    # 5) Finalisieren (Markdown)
    def finalize(
        self,
        task: str,
        understanding: Dict[str, Any],
        plan: List[Dict[str, Any]],
        exec_results: List[Dict[str, Any]],
        review: Dict[str, Any],
        *,
        max_words: int = 280,
        style: str = "Deutsch, prägnant, klar gegliedert, keine Ausschmückung."
    ) -> str:
        msgs = [
            {"role": "system", "content": "You are a precise writer. Integrate necessary fixes; output ONLY Markdown."},
            {"role": "user", "content": f"""Task: {task}
Understanding:
{json.dumps(understanding, ensure_ascii=False, indent=2)}
Plan:
{json.dumps(plan, ensure_ascii=False, indent=2)}
Results (Auszüge):
{json.dumps(exec_results, ensure_ascii=False, indent=2)[:2000]}
Review:
{json.dumps(review, ensure_ascii=False, indent=2)}

Stil: {style}
Limit: ≤ {max_words} Wörter.
Gib NUR Markdown zurück (Überschriften + knappe Bullet-Points).
"""}]
        return _chat_llm(msgs, temp=0.25, max_tokens=min(900, token_limit - 1024))

    # Pipeline
    def run(self, task: str, *, max_steps: int = 6, max_words: int = 280, style: str = "Deutsch, prägnant, klar gegliedert.") -> Dict[str, Any]:
        understanding = self.understand(task)
        plan = self.plan(task, understanding, max_steps=max_steps)
        exec_results = self.execute(task, plan, understanding)
        review = self.review(task, understanding, plan, exec_results)
        final_md = self.finalize(task, understanding, plan, exec_results, review, max_words=max_words, style=style)
        return {
            "understanding": understanding,
            "plan": plan,
            "results": exec_results,
            "review": review,
            "final_markdown": final_md,
        }

# Beispielausführung
agent = TennisRAGAgent()
task = ("Erstelle eine kompakte Übersicht zur Rivalität der neuen Spitze im Herren-Tennis "
         "(Post-'Big Three'), inkl. kurzer Epochen-Einordnung und 3 Q&A-Antworten aus der Quelle (RAG).")
out = agent.run(task, max_steps=5, max_words=260)
display(Markdown(out["final_markdown"]))


# Übersicht zur Rivalität der neuen Spitze im Herren-Tennis (Post-'Big Three')

## 1. Identifikation der neuen Top-Spieler
- **Carlos Alcaraz**: 2 Grand-Slam-Titel, ATP-Ranking: 1
- **Daniil Medvedev**: 1 Grand-Slam-Titel, ATP-Ranking: 3
- **Jannik Sinner**: 0 Grand-Slam-Titel, ATP-Ranking: 4
- **Stefanos Tsitsipas**: 0 Grand-Slam-Titel, ATP-Ranking: 5

## 2. Analyse der Rivalität
- **Direkte Duelle**:
  - Alcaraz vs. Medvedev: 2-1
  - Alcaraz vs. Sinner: 3-0
  - Medvedev vs. Sinner: 1-1
  - Tsitsipas vs. Alcaraz: 1-2

## 3. Bewertung der Auswirkungen auf die Tennislandschaft
- Die Rivalität zwischen Alcaraz, Medvedev, Sinner und Tsitsipas bringt frischen Wind in die Tenniswelt.
- Neue Spielstile und Strategien prägen das Spiel, was zu spannenden Matches führt.
- Die Ära nach den 'Big Three' fördert jüngere Talente und erhöht das Interesse an Herren-Tennis.

## 4. Epochen-Einordnung
- **'Big Three'-Ära**: Dominanz von Federer, Nadal und Djokovic mit zahlreichen Rekorden.
- **Post-'Big Three'-Ära**: Aufstieg junger Spieler, die um die Spitze kämpfen und die Tradition herausfordern.

## 5. Q&A zur Rivalität
- **Frage 1**: Wer sind die Hauptakteure der neuen Rivalität?
  - **Antwort**: Carlos Alcaraz, Daniil Medvedev, Jannik Sinner und Stefanos Tsitsipas.
  
- **Frage 2**: Wie beeinflussen diese Spieler die Zuschauerzahlen?
  - **Antwort**: Ihre aufregenden Matches ziehen neue Zuschauer an und revitalisieren das Interesse am Tennis.
  
- **Frage 3**: Gibt es Parallelen zur 'Big Three'-Ära?
  - **Antwort**: Ja, die Rivalität zeigt ähnliche Spannungen und Dramatik, jedoch mit einem jüngeren, dynamischeren Ansatz.