![GenAI Banner](https://raw.githubusercontent.com/ralf-42/Image/main/genai-banner-2.jpg)



<p><font size="5" color='grey'> <b> Chat & Memory </b></font> </br></p>

---

In [None]:
#@title üîß Umgebung einrichten{ display-mode: "form" }
!uv pip install --system -q git+https://github.com/ralf-42/GenAI.git#subdirectory=04_modul
!uv pip install --system -q langgraph langchain_openai
from genai_lib.utilities import check_environment, get_ipinfo, setup_api_keys, mprint, install_packages
setup_api_keys(['OPENAI_API_KEY'], create_globals=False)
print()
check_environment()
print()
get_ipinfo()

In [None]:
#@title üîß Installationen { display-mode: "form" }
install_packages([('langgraph-checkpoint-sqlite', 'langgraph.checkpoint.sqlite')])



# 1 | Intro
---


<p><font color='black' size="5">
Zustandslosigkeit von LLMs
</font></p>

Large Language Models (LLMs) wie GPT sind von Natur aus **zustandslos** ‚Äì sie verf√ºgen √ºber kein eingebautes Ged√§chtnis. Jede Anfrage wird isoliert verarbeitet, ohne Bezug zu vorherigen Interaktionen. Deshalb muss der Chatverlauf (Historie) bei jeder Anfrage neu √ºbergeben werden.

```
Ohne Memory:
User: "Mein Name ist Max"
AI: "Hallo Max!"
User: "Wie hei√üe ich?"
AI: "Das habe ich nicht gespeichert." ‚ùå
```


# 2 | Short-term Memory
---


Kurzzeit-Memory speichert den Gespr√§chsverlauf einer aktiven Sitzung (eines Threads) und erm√∂glicht kontextbezogene Antworten.

## 2.1 | ... mit Python-Liste

Um zu verstehen, warum LangGraph in vielen F√§llen n√∂tig ist, hier ein Beispiel mit *manueller* Verwaltung.

In [None]:
# Importe
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser

# System-Prompt
system_prompt = "Du bist ein hilfreicher und humorvoller KI-Assistent."

# Prompt-Template mit Historie (MessagesPlaceholder nimmt die Historie entgegen)
prompt = ChatPromptTemplate.from_messages([
    ("system", "{system_prompt}"),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{user_input}")
])

# LLM
model_provider="openai"
model_name = "gpt-4o-mini"
temperature = 0
llm = init_chat_model(model_name, model_provider=model_provider, temperature=temperature)

# Parser
parser = StrOutputParser()

# Chain
chain = prompt | llm | parser

In [None]:
# Chat-Funktion mit manueller Historien-Verwaltung
def chat_manually(system_prompt, chat_history, user_input):
    """F√ºhrt eine Chat-Interaktion mit manueller Historien-Verwaltung durch."""

    # Chain aufrufen (Historie wird im Prompt mitgeschickt)
    response = chain.invoke({
        'system_prompt': system_prompt,
        'chat_history': chat_history,
        'user_input': user_input
    })

    # Ausgabe
    mprint(f"### üßë‚Äçü¶± Mensch: \n{user_input}")
    mprint(f"### ü§ñ KI: \n{response}\n")

    # ‚≠ê WICHTIG: Memory (Liste) muss MANUELL nach JEDEM Call aktualisiert werden
    chat_history.extend([HumanMessage(content=user_input), AIMessage(content=response)])

    return chat_history

In [None]:
# Historie initialisieren
chat_history = []

# Konversation
chat_manually(system_prompt, chat_history, "Mein Name ist Max")
chat_manually(system_prompt, chat_history, "Ich mag Python-Programmierung")
chat_manually(system_prompt, chat_history, "Wei√üt du noch, wie ich hei√üe und was ich mag?")

mprint("### üìù Gespeicherte Nachrichten (Liste):\n---")
for msg in chat_history:
    mprint(f"  **{msg.type}**:   {msg.content}")

mprint("\n\n‚ùå Problem: Keine Session-Verwaltung und fehleranf√§lliges, manuelles Memory-Management.")

## 2.2 | ... mit LangGraph



**LangGraph** automatisiert das Session- und Memory-Management mit dem **Checkpointer**. Dieser speichert den gesamten Zustand (`MessagesState`) eines **Threads** und l√§dt ihn beim n√§chsten Aufruf automatisch wieder. Das manuelle Aktualisieren entf√§llt.

Der Workflow/Graph ist sehr einfach: **START** -> **Chat** (LLM-Aufruf) -> **END**. Die Magie passiert durch den **`MessagesState`** und den **`Checkpointer`**.



In [None]:
# Importe
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage, SystemMessage

from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver

from IPython.display import Image, display

In [None]:
# LLM
model_provider="openai"
model_name = "gpt-4o-mini"
temperature = 0.2
llm = init_chat_model(model_name, model_provider=model_provider, temperature=temperature)

<p><font color='black' size="5">
Workflow
</font></p>

In [None]:
# Workflow: Ruft das Modell auf (der eigentliche Chatbot)
def chat_node(state: MessagesState):
    """ Diese Funktion wird bei jedem Chat-Schritt aufgerufen. Der 'state' enth√§lt automatisch alle bisherigen Nachrichten.  """

    # System-Prompt hinzuf√ºgen (vor der Historie)
    # NOTE: Der MessagesState reduziert die Nachrichten beim Hinzuf√ºgen automatisch.
    messages = [
        SystemMessage(content=system_prompt)
    ] + state["messages"]

    # LLM aufrufen
    response = llm.invoke(messages)

    # üõë WICHTIG: Die R√ºckgabe MUSS eine Liste von Nachrichten sein!
    # Der MessagesState Reducer f√ºgt die Liste der Historie hinzu.
    return {"messages": [response]}

<p><font color='black' size="5">
Graph
</font></p>

In [None]:
# Graph
workflow = StateGraph(state_schema=MessagesState)

# Knoten hinzuf√ºgen
workflow.add_node("chat", chat_node)

# Kanten definieren
workflow.add_edge(START, "chat")
workflow.add_edge("chat", END)

# Checkpointer (MemorySaver) verwaltet den Zustand pro Thread_ID
checkpointer = MemorySaver()

# Graph kompilieren
graph =  workflow.compile(checkpointer=checkpointer)
display(Image(graph.get_graph().draw_mermaid_png()))

<p><font color='black' size="5">
Ausf√ºhrung
</font></p>

In [None]:
# Ausf√ºhrung
def chat(thread_id, user_input):
    """Chattet mit dem Bot in einem bestimmten Thread."""

    # Config enth√§lt die Thread-ID (Session-ID)
    config = {"configurable": {"thread_id": thread_id}}

    # Input vorbereiten
    input_message = {"messages": [HumanMessage(content=user_input)]}

    # Graph aufrufen - Memory wird vom Checkpointer automatisch geladen/gespeichert!
    result = graph.invoke(input_message, config=config)

    # Ausgabe
    mprint(f"**üßë‚Äçü¶± [Thread: {thread_id}] Mensch:** \n{user_input}")
    mprint(f"**ü§ñ [Thread: {thread_id}] KI:** \n{result['messages'][-1].content}\n")

    return result['messages'][-1].content


<p><font color='black' size="5">
Multi-User (Parallele Threads)
</font></p>

Jede **`thread_id`** erh√§lt ihre **eigene, isolierte Historie**.

In [None]:
# Thread 1: Max
chat("thread_max", "Ich mag Python-Programmierung!")

# Thread 2: Emma (komplett separate Konversation)
chat("thread_emma", "Ich interessiere mich f√ºr Machine Learning!")

# Zur√ºck zu Thread 1 - Max's Memory bleibt erhalten!
chat("thread_max", "Was war nochmal mein Interesse?")

# Zur√ºck zu Thread 2 - Emma's Memory bleibt erhalten!
chat("thread_emma", "An welchem Thema bin ich interessiert?")
print()

In [None]:
def show_thread_history(thread_id):
    """Zeigt die komplette Historie eines Threads."""

    config = {"configurable": {"thread_id": thread_id}}
    state = graph.get_state(config)

    mprint(f"### üìù Thread '{thread_id}' - {len(state.values['messages'])} Nachrichten:\n")
    mprint("---")

    for msg in state.values["messages"]:
        role = "üßë‚Äçü¶±" if msg.type == "human" else "ü§ñ"
        mprint(f"{role} {msg.type.upper()}: {msg.content}")

# Beispiel
show_thread_history("thread_max")
show_thread_history("thread_emma")


# 3 | Memory Management
---

**Problem:** Lange Konversationen sprengen das **Kontextfenster** und werden teuer (Kosten, Latenz). Auch moderne, gro√üe Modelle leiden unter dem **"Lost in the middle"**-Problem, bei dem wichtige Informationen in der Mitte eines langen Prompt leicht ignoriert werden.

**L√∂sung:** Intelligentes Memory-Management.

<p><font color='black' size="5">
Funktionsstruktur mit Management
</font></p>

Hinzuf√ºgen eines **Pre-Processing-Schritts** vor dem Chat, um die Nachrichten zu trimmen oder zusammenzufassen.




## 3.1 | Trimming (Sliding Window)



**Strategie:** Behalte nur die letzten *n* Nachrichten. Alles √§ltere wird entfernt (oder nur der letzte Teil des Prompts wird genutzt). LangChain bietet hierf√ºr eingebaute Utilities.

In [None]:
# Import
from langchain_core.messages import trim_messages

In [None]:
# Memory-Einstellungen
max_messages_before_trim = 10
messages_to_summarize = 8
recent_messages_to_keep = 2

<p><font color='black' size="5">
Workflow
</font></p>

In [None]:
# Trimming: Nur die letzten n Token/Nachrichten behalten
def chat_node_with_trimming(state: MessagesState):
    """Ruft das Modell mit getrimmter Historie auf."""

    # System-Prompt
    system_msg = SystemMessage(content=system_prompt)

    # Nur die letzten N Nachrichten (hier: max_messages_before_trim)
    # 'token_counter=len' z√§hlt die Anzahl der Nachrichten statt Token
    trimmed = trim_messages(
        state["messages"],
        max_tokens=max_messages_before_trim,
        strategy="last",
        token_counter=len, # ‚Üê Z√§hlt Nachrichten statt Tokens
        include_system=False # System-Nachricht nicht in das Limit einbeziehen
    )

    messages = [system_msg] + trimmed
    response = llm.invoke(messages)

    return {"messages": [response]}

<p><font color='black' size="5">
Graph
</font></p>

In [None]:
# Graph mit Trimming
workflow_trimmed = StateGraph(state_schema=MessagesState)
workflow_trimmed.add_node("chat", chat_node_with_trimming)
workflow_trimmed.add_edge(START, "chat")
workflow_trimmed.add_edge("chat", END)

graph_trimmed = workflow_trimmed.compile(checkpointer=MemorySaver())
display(Image(graph.get_graph().draw_mermaid_png()))

<p><font color='black' size="5">
Ausf√ºhrung
</font></p>

In [None]:
# Ausf√ºhrung
def chat_with_trimming(thread_id, user_input):
    """Chattet mit dem Bot unter Verwendung der Summary-Strategie."""

    config = {"configurable": {"thread_id": thread_id}}
    # User-Input ist die NEUE Nachricht und wird vom Checkpointer zur Historie hinzugef√ºgt.
    input_message = {"messages": [HumanMessage(content=user_input)]}

    result = graph_trimmed.invoke(input_message, config=config)

    mprint(f"### üßë‚Äçü¶± Mensch: \n{user_input}")
    mprint(f"### ü§ñ KI: \n{result['messages'][-1].content}\n")

    return result['messages'][-1].content

In [None]:
# Beispiel: Lange Konversation (mehr als max_messages_before_trim)
thread = "trimming_test"

# Erste Nachrichten (unterhalb des Limits)
chat_with_trimming(thread, "Mein Name ist Max")
chat_with_trimming(thread, "Ich wohne in K√∂ln")
chat_with_trimming(thread, "Ich mag Python")
chat_with_trimming(thread, "Ich habe eine Katze namens Neo")

# Jetzt wird das Limit √ºberschritten und die Zusammenfassung ausgel√∂st
for i in range(7):
    chat_with_trimming(thread, f"Dies ist Test-Nachricht Nummer {i+1} zur F√ºllung der Historie.")

# Nach der Zusammenfassung: Die KI sollte trotzdem wichtige Infos kennen
chat_with_trimming(thread, "Wie hie√ü meine Katze?")
print()

## 3.2 | Summarization (Zusammenfassung)



**Strategie:** Fasse alte Nachrichten zusammen. Die Zusammenfassung ersetzt dann die √§lteren Nachrichten in der Historie, wodurch Platz gespart wird.

In [None]:
# Import
from langchain_core.prompts import ChatPromptTemplate

<p><font color='black' size="5">
Workflow
</font></p>

In [None]:
# Zusammenfassungs-Prompt
summarize_prompt = ChatPromptTemplate.from_messages([
    ("system", "Fasse die folgende Konversation in 2-3 S√§tzen zusammen. Behalte wichtige Fakten und Pr√§ferenzen."),
    ("human", "{conversation}")
])

def summarize_conversation(messages):
    """Erstellt eine Zusammenfassung der Nachrichten."""

    # Konversation als Text formatieren
    conversation_text = "\n".join([
        f"{msg.type}: {msg.content}" for msg in messages
    ])

    # Zusammenfassung erstellen
    summary_chain = summarize_prompt | llm | StrOutputParser()
    summary = summary_chain.invoke({"conversation": conversation_text})

    return summary

# Zusammenfassung erstellen
def chat_node_with_summary(state: MessagesState):
    """Nutzt Zusammenfassung statt alter Nachrichten, wenn die Historie zu lang wird."""

    all_messages = state["messages"]
    summary = "‚Äî"

    # Wenn die Historie zu lang ist: Zusammenfassen
    if len(all_messages) > max_messages_before_trim:
        mprint(f"\n‚ö†Ô∏è Historie zu lang ({len(all_messages)}). Fasse √§ltere Nachrichten zusammen.\n")

        # 1. √Ñltere Nachrichten zum Zusammenfassen ausw√§hlen
        to_summarize = all_messages[:messages_to_summarize]
        summary = summarize_conversation(to_summarize)

        # 2. Den Prompt mit der Zusammenfassung und nur den letzten Nachrichten f√ºllen
        messages_for_prompt = [
            SystemMessage(content=f"{system_prompt}\n\nBisheriger Kontext (Zusammenfassung): {summary}"),
        ] + all_messages[-recent_messages_to_keep:]

    else:
        # Wenn Historie kurz genug: Gesamte Historie (inkl. System-Prompt) verwenden
        messages_for_prompt = [SystemMessage(content=system_prompt)] + all_messages

    # LLM aufrufen (antwortet auf die letzte User-Nachricht + Kontext)
    response = llm.invoke(messages_for_prompt)

    # Wichtig: Wir geben NUR die KI-Antwort zur√ºck, LangGraph speichert die User-Nachricht und die KI-Antwort im State!
    return {"messages": [response]}

<p><font color='black' size="5">
Graph
</font></p>

In [None]:
# Graph mit Summary-Funktion erstellen
workflow_summary = StateGraph(state_schema=MessagesState)
workflow_summary.add_node("chat", chat_node_with_summary)
workflow_summary.add_edge(START, "chat")
workflow_summary.add_edge("chat", END)

# Mit Memory kompilieren
graph_summary = workflow_summary.compile(checkpointer=MemorySaver())
display(Image(graph.get_graph().draw_mermaid_png()))

<p><font color='black' size="5">
Ausf√ºhrung
</font></p>

In [None]:
# Ausf√ºhrung
def chat_with_summary(thread_id, user_input):
    """Chattet mit dem Bot unter Verwendung der Summary-Strategie."""

    config = {"configurable": {"thread_id": thread_id}}
    # User-Input ist die NEUE Nachricht und wird vom Checkpointer zur Historie hinzugef√ºgt.
    input_message = {"messages": [HumanMessage(content=user_input)]}

    result = graph_summary.invoke(input_message, config=config)

    mprint(f"### üßë‚Äçü¶± Mensch: \n{user_input}")
    mprint(f"### ü§ñ KI: \n{result['messages'][-1].content}\n")

    return result['messages'][-1].content

In [None]:
# Beispiel: Lange Konversation (mehr als max_messages_before_trim)
thread = "summary_test"

# Erste Nachrichten (unterhalb des Limits)
chat_with_summary(thread, "Mein Name ist Max")
chat_with_summary(thread, "Ich wohne in K√∂ln")
chat_with_summary(thread, "Ich mag Python")
chat_with_summary(thread, "Ich habe eine Katze namens Neo")

# Jetzt wird das Limit √ºberschritten und die Zusammenfassung ausgel√∂st
for i in range(7):
    chat_with_summary(thread, f"Dies ist Test-Nachricht Nummer {i+1} zur F√ºllung der Historie.")

# Nach der Zusammenfassung: Die KI sollte trotzdem wichtige Infos kennen
chat_with_summary(thread, "Wie hie√ü meine Katze?")
print()


# 4 | Long-term Memory
---

**SQLite Checkpointer f√ºr persistente Konversationen**

**Problem:**

Ohne Persistenz geht die Konversationshistorie beim Neustart der Anwendung verloren. In-Memory-Speicher (wie MemorySaver) ist nur f√ºr Tests und Demos geeignet, nicht f√ºr Production-Anwendungen, die Benutzer-Sessions √ºber l√§ngere Zeitr√§ume aufrechterhalten m√ºssen.    
**L√∂sung:**

SQLite Checkpointer (lokale Entwicklung) bzw. PostgreSQL Checkpointer (Production). Der Checkpointer speichert jeden Graph-State automatisch in einer Datenbank und organisiert Konversationen √ºber thread_ids. Dadurch k√∂nnen:

+ Benutzer-Sessions nach Neustart fortgesetzt werden
+ Mehrere parallele Konversationen verwaltet werden (Multi-User/Multi-Thread)
+ Konversationshistorien abgerufen und analysiert werden
+ Fehlerhafte Ausf√ºhrungen vom letzten erfolgreichen Checkpoint wiederaufgenommen werden (Error Recovery)

<p><font color='darkblue' size="4">
‚ú® <b>Empfehlung:</b>
</font></p>
SQLite f√ºr lokale Entwicklung und z.B. PostgreSQL f√ºr Produktion

<p><font color='black' size="5">
Importe
</font></p>

In [None]:
from typing import Annotated
from typing_extensions import TypedDict
from langchain.chat_models import init_chat_model
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite import SqliteSaver
from IPython.display import Image, display
import os

import sqlite3
import uuid

# Konfiguration
DB_PATH = "./chatbot_memory.db"

<p><font color='black' size="5">
Workflow
</font></p>

In [None]:
class ChatState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]

# LLM einmal initialisieren
llm = init_chat_model("gpt-4o-mini", temperature=0.7)

# Chatbot-Knoten
def chatbot_node(state: ChatState) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

<p><font color='black' size="5">
Graph
</font></p>

In [None]:
def create_graph(checkpointer):
    builder = StateGraph(ChatState)
    builder.add_node("chatbot", chatbot_node)
    builder.add_edge(START, "chatbot")
    builder.add_edge("chatbot", END)
    return builder.compile(checkpointer=checkpointer)

# SQLite Connection DIREKT erstellen
conn = sqlite3.connect(
    DB_PATH,
    check_same_thread=False  # Wichtig f√ºr Jupyter Notebooks!
)

# Checkpointer mit bestehender Connection
checkpointer = SqliteSaver(conn=conn)
checkpointer.setup()
graph = create_graph(checkpointer)

# Visualisierung
print("üìä Graph-Struktur:")
display(Image(graph.get_graph().draw_mermaid_png()))

<p><font color='black' size="5">
Funktionen
</font></p>

In [None]:
# --- Config-Helper
cfg = lambda tid: {"configurable": {"thread_id": tid}}

# --- Chat
def chat(tid: str, msg: str) -> str:
    return graph.invoke({"messages": [HumanMessage(msg)]}, cfg(tid))["messages"][-1].content

# --- History
def show_history(tid: str):
    msgs = graph.get_state(cfg(tid)).values.get("messages", [])
    if not msgs:
        return print(f"‚ö†Ô∏è Keine Nachrichten in '{tid}'\n")

    print(f"üìú {tid} ({len(msgs)} Nachrichten)\n" + "=" * 60)
    for i, m in enumerate(msgs, 1):
        print(f"{i}. {'üë§' if isinstance(m, HumanMessage) else 'ü§ñ'} {m.content}\n")
    print("=" * 60 + "\n")

# --- List Threads mit allen Nachrichten
def list_all_threads():
    with sqlite3.connect(DB_PATH) as conn:
        threads = conn.execute(
            "SELECT DISTINCT thread_id, COUNT(*) FROM checkpoints GROUP BY thread_id"
        ).fetchall()

    print("üìã Threads:\n" + "-" * 60)
    for tid, cnt in threads:
        state = graph.get_state(cfg(tid))
        messages = state.values.get("messages", [])
        mcnt = len(messages)
        print(f"  ‚úÖ {tid}: {mcnt} Nachrichten, {cnt} Checkpoints")
        print("    Nachrichten:")
        for i, msg in enumerate(messages, 1):
            print(f"      {i}. {msg.content}")
    print()

# --- Stats
def get_thread_stats():
    with sqlite3.connect(DB_PATH) as conn:
        tc, cc = conn.execute("SELECT COUNT(DISTINCT thread_id), COUNT(*) FROM checkpoints").fetchone()
    print(f"üìä {tc} Threads, {cc} Checkpoints\n")

<p><font color='black' size="5">
Ausf√ºhrung
</font></p>

<p><font color='black' size="4">
Test-Historie aufbauen
</font></p>

In [None]:
def test_historie():
    # 1. Neue Konversation erstellen
    test_thread = "test-history"
    print("1Ô∏è‚É£ Sende Test-Nachrichten...")
    chat(test_thread, "Hallo, ich bin ein Test")
    chat(test_thread, "Wie geht es dir?")
    print("‚úÖ Nachrichten gesendet\n")

    # 2. State direkt pr√ºfen
    print("2Ô∏è‚É£ State direkt pr√ºfen...")
    config = {"configurable": {"thread_id": test_thread}}
    state = graph.get_state(config)
    print(f"State Type: {type(state)}")
    print(f"State Values: {state.values}")
    print(f"Messages: {state.values.get('messages', [])}\n")

    # 3. History-Funktion testen
    print("3Ô∏è‚É£ History-Funktion aufrufen...")
    show_history(test_thread)

test_historie()

<p><font color='black' size="4">
Interaktiver Thread
</font></p>

In [None]:
def main():
    thread = "interactive"
    msg_count = 0

    # Das commands-Dictionary beh√§lt die print-Anweisung bei, f√ºhrt aber kein exit() mehr aus.
    commands = {
        "exit": lambda: print(f"\nüëã Auf Wiedersehen! ({msg_count} Nachrichten)\n"), # <-- Nur Textausgabe, kein Exit mehr!
        "history": lambda: show_history(thread),
        "all": lambda: list_all_threads(),
        "stats": lambda: get_thread_stats(),
    }

    # Help-Text
    print("""ü§ñ Interaktiver Chatbot
    ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
    Befehle: exit | new | history | thread | all | stats
    ‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê
    """)

    while True:
        user_input = input("üë§ Du: ").strip()

        if not user_input:
            continue

        # --- exit
        if user_input.lower() == "exit":
            commands["exit"]()
            break
        # --- new
        if user_input == "new":
            old, thread = thread, f"session-{str(uuid.uuid4())[:8]}"
            print(f"üÜï {old} ‚Üí {thread}\n")
            msg_count = 0
        # --- thread
        elif user_input in ["thread", "threads"]:
            print(f"üìã {thread} ({msg_count} Nachrichten)\n")
        # --- history, all, stats
        elif user_input in commands:
            print()
            commands[user_input]() # F√ºhrt die anderen Befehle aus (history, all, stats)
        else:
            # Chat
            try:
                print(f"ü§ñ {chat(thread, user_input)}\n")
                msg_count += 1
            except Exception as e:
                print(f"‚ùå {e}\n")

In [None]:
main()

# A | Aufgaben
---


Die Aufgabenstellungen unten bieten Anregungen. Sie k√∂nnen aber auch gerne eigene Aufgaben verwenden.



<p><font color='black' size="5">
Aufgabe 1: Multi-User Chatbot mit LangGraph (Short-term)
</font></p>

**Schwierigkeit:** ‚≠ê‚≠ê

Erstellen Sie einen Chatbot unter Verwendung von **Abschnitt 2** (`graph`), der:
- Mindestens 3 verschiedene User-Threads verwaltet (z.B. "max_session", "emma_session", "ralf_session").
- Jeden User beim Namen kennt.
- Beim Thread-Wechsel den Kontext des jeweiligen Users korrekt beh√§lt (zeigen Sie dies, indem Sie zwischen zwei Threads hin- und herwechseln).


<p><font color='black' size="5">
Aufgabe 2: Memory mit Trimming-Strategie
</font></p>

**Schwierigkeit:** ‚≠ê‚≠ê‚≠ê

Implementieren Sie einen Chatbot unter Verwendung von **Abschnitt 3.2** (`graph_summary`), der:
1. **`max_messages_before_trim`** auf einen kleinen Wert (z.B. 5) setzt.
2. Eine l√§ngere Konversation (z.B. 7 Schritte) durchf√ºhrt, sodass die **Zusammenfassungslogik** ausgel√∂st wird (achten Sie auf den `‚ö†Ô∏è` Hinweis).
3. Pr√ºfen Sie mit `show_thread_history(thread_id)`, ob der Checkpointer immer noch **alle 7 Nachrichten** speichert, aber der Chat-Node **nur die relevanten Nachrichten** verwendet.

*(Hinweis: Der Checkpointer speichert immer die volle Historie; die Management-Strategie entscheidet, was dem LLM pr√§sentiert wird.)*

# B | Datenbank auslesen
---

In [None]:
import sqlite3
import msgpack
import os
import re
from typing import List, Dict, Any

DB_PATH = "chatbot_memory.db"

In [None]:
def decode_langchain_ext_type(code, data):
    """Gibt die rohen Bytes zur√ºck, da die innere Dekodierung fehlschl√§gt."""
    if code == 5:
        return msgpack.ExtType(code, data)
    return msgpack.ExtType(code, data)

def read_all_threads_from_db():
    """
    Liest den neuesten Checkpoint aller Threads aus der Datenbank und extrahiert
    die bereinigten Nachrichten mithilfe der String-Such-Heuristik.
    """
    if not os.path.exists(DB_PATH):
        print(f"‚ùå Fehler: Datenbankdatei '{DB_PATH}' wurde nicht gefunden.")
        return

    # --- SQL-Abfrage 1: Alle Threads und deren neuesten Checkpoint holen ---
    # Die Abfrage wird vereinfacht, um alle Threads ohne 'WHERE thread_id = ?' zu holen.
    sql_query_all_threads = """
    SELECT
        t1.thread_id,
        t1.checkpoint AS state_data
    FROM
        checkpoints t1
    INNER JOIN (
        SELECT
            thread_id,
            MAX(checkpoint_id) AS max_checkpoint_id
        FROM
            checkpoints
        GROUP BY
            thread_id
    ) AS t2 ON t1.thread_id = t2.thread_id AND t1.checkpoint_id = t2.max_checkpoint_id;
    """

    mprint(f"### üì¨ Lese alle Threads aus der Datenbank '{DB_PATH}'...")
    mprint("---")

    try:
        with sqlite3.connect(DB_PATH) as conn:
            # Jetzt werden alle neuesten Checkpoints der DB abgerufen
            all_thread_results = conn.execute(sql_query_all_threads).fetchall()

        if not all_thread_results:
            print(f"‚ÑπÔ∏è Keine Threads in der Datenbank gefunden.")
            return

        print(f"‚úÖ {len(all_thread_results)} Threads gefunden. Starte Extraktion...")

        # --- Iteration √ºber alle gefundenen Threads ---
        for thread_id, state_blob in all_thread_results:

            # --- Deserialisierung und Extraktion (Ihr alter Funktionskern) ---
            try:
                # 1. √ÑUSSERE DESERIALISIERUNG: MessagePack
                state_data = msgpack.unpackb(state_blob, raw=False, ext_hook=decode_langchain_ext_type)

                # Pfad zur Nachrichtenliste
                channel_values = state_data.get('channel_values', {})
                messages_ext_list = channel_values.get('messages', [])

                final_messages_list: List[Dict[str, str]] = []

                if isinstance(messages_ext_list, list):
                    for ext_obj in messages_ext_list:
                        msg_bytes = ext_obj.data if isinstance(ext_obj, msgpack.ExtType) else ext_obj

                        if isinstance(msg_bytes, bytes):
                            # 2. ROH-SUCHE & BEREINIGUNG
                            full_string = msg_bytes.decode('utf-8', errors='ignore')

                            # Typ
                            type_start = full_string.find("langchain_core.messages.")
                            type_text = full_string[type_start:].split()[0].replace("langchain_core.messages.", "").replace("Message", "")

                            # Inhalt
                            content_start = full_string.find("content")
                            content_text = ""
                            if content_start != -1:
                                potential_content = full_string[content_start + 7:].strip()
                                next_key_start = potential_content.find("additional_kwargs")
                                if next_key_start != -1:
                                    content_text = potential_content[:next_key_start].strip()
                                else:
                                    content_text = potential_content.strip()

                            # --- BEREINIGUNG DER EXTRAHIERTEN STRINGS ---
                            match = re.search(r'^(Human|Ai)', type_text, re.IGNORECASE)
                            clean_type = match.group(0).capitalize() if match else "Unbekannt"
                            clean_content = content_text.lstrip(' \x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !@#$%^&*-+,.')

                            final_messages_list.append({
                                "type": clean_type,
                                "content": clean_content
                            })

                # --- Ausgabe pro Thread ---
                if final_messages_list:
                    mprint(f"\n‚úÖ Thread ID: **{thread_id}** (GEFUNDEN: {len(final_messages_list)} Nachrichten)")
                    print("-" * 30)

                    for i, msg_data in enumerate(final_messages_list, 1):
                        mprint(f"  {i}. [**{msg_data['type']}**] {msg_data['content'][:100].strip()}...")
                else:
                    print(f"\n‚ö†Ô∏è Thread ID: **{thread_id}**: Konnte keine lesbaren Nachrichten finden.")

            except Exception as e:
                print(f"‚ùå Thread ID: **{thread_id}**: Fehler bei der Verarbeitung des Checkpoints: {e}")

    except Exception as e:
        print(f"\n‚ùå Allgemeiner Fehler beim Datenbankzugriff: {e}")

In [None]:
read_all_threads_from_db()