# Sparklehorse - Der SQL-Chatbot der Magpie

## Vorbereitung der Arbeitsumgebung

In einem ersten Schritt definierne wir unser Arbeitsverzeichnis. 

In [None]:
import os
os.getcwd()
os.chdir("c:/Users/mhu/Documents/gitHub/magpie_chatbot")
# Pfad Privatrechner
# os.chdir("c:/Users/Hueck/OneDrive/Dokumente/GitHub/magpie_langchain")

Lade Umgebungsvariablen (inkl. OpenAI-API-Key) und initialisiere den Chatbot mit dem Modell "gpt-4o" von OpenAI.

In [None]:
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

load_dotenv()
llm = ChatOpenAI(model="gpt-4o")

  Im folgenden stellen wir Verbindung zur Magpie her. Wir schauen uns dan alle in der Magpie befindlichen Tabellen an. Schließlich wählen wir `view_daten_reichweite_menge` aus und speichern diese als Pandas Data Frame zur einfachen exploration der Datentabelle.

In [None]:
import pandas as pd
from langchain_community.utilities import SQLDatabase
from langchain_community.agent_toolkits import SQLDatabaseToolkit 

db = SQLDatabase.from_uri("duckdb:///data/view_magpie.db") 

db.run("SHOW TABLES")

query = "SELECT * FROM view_daten_reichweite_menge;"
df = pd.read_sql(query, db._engine)
df

## Tools

### Standardisierte Langchain Tools

Wir intitalsieren ein standardisiertes Toolkit. Es stellt Funktionen bereit, um SQL-Queries über natürliche Sprache zu erzeugen und auszuführen. Wir lassen uns Namen und Funktion der standardisierten Tools anzeigen:

In [None]:
toolkit = SQLDatabaseToolkit(db=db, llm=llm)

tools = toolkit.get_tools()

for tool in tools:
    print(f"Tool Name: {tool.name}")
    print(f"Description: {tool.description}")
    print("-" * 40)

Die Standardtools aus`SQLDatabaseToolkit`können also die folgenden Standardfunktionen übernehmen: 

- `sql_db_query`  
  Führt eine übergebene SQL-Abfrage aus. Gibt das Ergebnis oder eine Fehlermeldung zurück. Bei Fehlern wie „Unknown column“ sollte zuvor das Tabellenschema geprüft werden.

- `sql_db_schema`  
  Gibt das Schema (Spaltennamen und -typen) sowie Beispielzeilen für angegebene Tabellen zurück. Vorher sollte geprüft werden, ob die Tabellen existieren.

- `sql_db_list_tables`  
  Listet alle Tabellen in der verbundenen Datenbank auf.

- `sql_db_query_checker`  
  Prüft eine SQL-Abfrage auf syntaktische Korrektheit, bevor sie mit sql_db_query ausgeführt wird. Sollte immer vorher verwendet werden.


### Maßgeschneiderte Langchain Tools

#### Retriever `rt_beschr_variable`

`rt_beschr_variable` erlaubt die semantischen Suche über Werte aus einer Datenbankspalte. 

1. Wir sammeln sämtliche Unique Werte aus `beschr_variable` und wandeln diese mit OpenAIs Embeddings-Methdode `text-embedding-3-large` in Embeddings um. Die werden in einen Vektorstore gesichert.
2. Der Vektorstore wird in einen Retriever umgewandelt, der bei einer Anfrage die 5 ähnlichsten Begriffe zurückgibt.
3. Schließlich wird mit `create_retriever_tool` ein Tool erzeugt, das den Retriever kapselt. Dieses Tool kann von Sparklehorse genutzt werden, um Benutzereingaben mit unsicherer Schreibweise oder unvollständigen Begriffen mit den tatsächlichen Werten in der Datenbank abzugleichen.

In [None]:
from langchain_openai import OpenAIEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
from langchain.agents.agent_toolkits import create_retriever_tool
import ast
import re

##################################################################
# Generiere `rt_beschr_variable`
##################################################################

def query_as_list(db, query):
    res = db.run(query)
    res = [el for sub in ast.literal_eval(res) for el in sub if el]
    res = [re.sub(r"\b\d+\b", "", string).strip() for string in res]
    return list(set(res))


beschr_variable = query_as_list(db, "SELECT variable_beschr FROM view_daten_reichweite_menge")
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = InMemoryVectorStore(embeddings)
_ = vector_store.add_texts(beschr_variable)

retriever_beschr_variable  = vector_store.as_retriever(search_kwargs={"k": 5})

description = (
    "Verwenden, um Werte für Filterabfragen nachzuschlagen. Die Eingabe ist eine ungefähre Schreibweise "
    "eines Eigennamens, die Ausgabe sind gültige Eigennamen. Verwende den Begriff, der der Eingabe am ähnlichsten ist."
)

rt_beschr_variable = create_retriever_tool(
    retriever_beschr_variable,
    name="rt_beschr_variable",
    description=description,
)

Wir testen nun den Retriever:

In [None]:
def print_clean_result(result):
    print("\n".join(result.split("\n\n")))

result = rt_beschr_variable.invoke("Wie viele Studienanfänger ohne Abitur gab es 2016?")

print_clean_result(result)

#### Retriever `rt_reichweite_variable` & `rt_werteinheit_variable`

Wir bauen einen baugleichen Retriever nun noch für die Variablen `reichweite_beschr_list` und `wert_einheit`. 

In [59]:
##################################################################
# Generiere `rt_reichweite_variable`
##################################################################

reichweite_variable = query_as_list(db, "SELECT reichweite_beschr_list FROM view_daten_reichweite_menge")

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

vector_store = InMemoryVectorStore(embeddings)

_ = vector_store.add_texts(reichweite_variable)

retriever_reichweite_variable = vector_store.as_retriever(search_kwargs={"k": 5})

description = (
"Verwenden, um Werte für Filterabfragen nachzuschlagen. Die Eingabe ist eine ungefähre Schreibweise "
"eines Eigennamens, die Ausgabe sind gültige Eigennamen. Verwende den Begriff, der der Suche am ähnlichsten ist."
)

rt_reichweite_variable = create_retriever_tool(
    retriever_reichweite_variable,
    name="rt_reichweite_variable",
    description=description,
)

##################################################################
# Generiere `rt_beschr_wert_einheit`
##################################################################

werteinheit_variable = query_as_list(db, "SELECT wert_einheit FROM view_daten_reichweite_menge")

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

vector_store = InMemoryVectorStore(embeddings)

_ = vector_store.add_texts(werteinheit_variable)

retriever_beschr_wert_einheit = vector_store.as_retriever(search_kwargs={"k": 5})

description = (
    "Verwenden, um Werte für Filterabfragen nachzuschlagen. Die Eingabe ist eine ungefähre Schreibweise "
    "eines Eigennamens, die Ausgabe sind gültige Eigennamen. Verwende den Begriff, der der Suche am ähnlichsten ist."
)

rt_werteinheit_variable = create_retriever_tool(
    retriever_beschr_wert_einheit,
    name="rt_beschr_wert_einheit",
    description=description,
)

In [60]:
result = rt_reichweite_variable.invoke("Wie hoch war die Anzahl von Studierenden ohne Abitur 2006?")

print_clean_result(result)

Sonstiges Orientierungsstudium
Hochschulsektor | Deutschland
Berlin | Schulfach Mathematik | . Schulklasse | Ohne Migrationshintergrund
Deutschland | Schulfach Mathematik | . Schulklasse | Niedriger sozioökonomischer Status (ISEI)
Sonstige Hochschule | Deutschland


In [61]:
result = rt_werteinheit_variable.invoke("Wie hoch war die Anzahl von Studierenden ohne Abitur 2006?")
print_clean_result(result)

Anzahl
Prozent
in Tsd. Euro
Punkte
Mittelwert


In [70]:
from langchain_core.tools import tool
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

@tool
def variable_beschr(user_question: str) -> str:
    """
    Sucht nach variable_beschr in Tabelle
    """
    docs = retriever_beschr_variable.get_relevant_documents(user_question)
    if not docs:
        return "Error: Keine passende Variable gefunden."
    
    best_match = docs[0].page_content.strip()
    query = f"SELECT variable_beschr FROM view_daten_reichweite_menge WHERE variable_beschr = '{best_match}' LIMIT 1;" 
    result = db.run_no_throw(query)

    return result if result else "Error: Keine passende Variable gefunden."


@tool
def get_reichweite_beschr_list(user_question: str) -> str:
    """
    Ermittelt die passende Reichweite für eine SQL-Abfrage.

    Gibt standardmäßig 'Deutschland' zurück, außer die Frage enthält Hinweise 
    auf spezifischere Reichweiten (z. B. Bundesländer oder Sektoren). In dem Fall 
    wird ein semantischer Retriever genutzt.

    Rückgabe ist ein Wert aus 'reichweite_beschr_list' oder eine Fehlermeldung.
    """
    spezifisch_keywords = [
        "hochschulsektor", "privater träger", "berufsakademie", "bundesland", 
        "bayern", "nrw", "sachsen", "baden-württemberg", "region", "sektor"
    ]

    user_question_lower = user_question.lower()

    if any(kw in user_question_lower for kw in spezifisch_keywords):
        matches = rt_reichweite_variable.invoke(user_question).split("\n")
        best_match = matches[0].strip()
    else:
        best_match = "Deutschland"

    query = f"""
        SELECT reichweite_beschr_list 
        FROM view_daten_reichweite_menge 
        WHERE reichweite_beschr_list = '{best_match}' 
        LIMIT 1;
    """
    result = db.run_no_throw(query)
    return result if result else "Error: Keine passende Variable gefunden."


# @tool
# def get_wert_einheit_id(user_question: str) -> str:
#     """Sucht in der Tabelle 'wert_einheit' nach einer passenden ID basierend auf der Beschreibung."""
#     query = f"SELECT id FROM wert_einheit WHERE beschr LIKE '%{description}%' LIMIT 1;"
#     result = db.run_no_throw(query)
#     return result if result else "Error: Keine passende Werteinheit gefunden."

 


tools.extend([variable_beschr, get_reichweite_beschr_list])

In [73]:
test_input = "Wie viele Studienanfänger ohne Abitur gab es 2016?"
output = variable_beschr(test_input)
print(output)

  docs = retriever_beschr_variable.get_relevant_documents(user_question)


[('Anzahl der Studienanfänger ohne Abitur',)]


In [None]:
from langchain import hub

prompt_template = hub.pull("langchain-ai/sql-agent-system-prompt")

assert len(prompt_template.messages) == 1, "Die Anzahl der Nachrichten im Template ist nicht 1!"
# Bearbeite die bestehende Nachricht, indem du Text hinzufügst
prompt_template.messages[0].prompt.template += (
    "\nYou are Sparklehorse, a chatbot for the Stifterverband organization. "
    "Your primary task is to answer questions related to the Magpie database."
)

prompt_template.messages[0].pretty_print()

In [None]:
system_message = prompt_template.format(
    dialect=db.dialect, 
    top_k=5
)

print(system_message)

In [76]:
from langchain_core.messages import HumanMessage
from langgraph.prebuilt import create_react_agent

# Systemnachricht mit extra Anweisungen
suffix = (
    "Bevor eine SQL-Abfrage generiert wird, beachte folgendes:\n"
    "1. Verwende in Deinen Antworten immer die Werte von 'variable_beschr' bzw. 'get_reichweite_beschr_list', die du in deiner SQL-Abfrage benutzt hast, um Irrtümer zu vermeiden.\n"
    "2. Verwende immer das Tool 'variable_beschr', um die korrekte Variablenbeschreibung aus der Nutzeranfrage zu bestimmen.\n"
    "3. Verwende in der SQL-Abfrage ausschließlich den exakten Rückgabewert dieses Tools.\n"
    "3. Verwende immer das Tool 'get_reichweite_beschr_list', um die passende Reichweite zu bestimmen.\n"
    "4. Nutze immer die Tabelle 'view_daten_reichweite_menge', um Daten abzufragen.\n"
    "6. Gib immer die finale SQL-Abfrage aus und erkläre sie. Rate niemals einen Wert oder eine ID – nutze immer die bereitgestellten Tools.\n"
    "7. Falls eine ID oder ein Wert nicht gefunden werden kann, gib eine klare Fehlermeldung und begründe dies.\n"
    "8. Filter Zeiträume mit: date_part('year', zeit_start) = <Jahr>\n"
    "9. Beachte auch 'wert_einheit': Das sind mögliche Werte wie 'in Tsd. Euro', 'Anzahl', 'Prozent', 'VZÄ', 'Mitarbeiter'."
)

# wert_einheit
# in Tsd. Euro    158051
# Anzahl           88177
# Prozent          11387
# VZÄ               4078
# Mitarbeiter       1380
# Mittelwert        1379
# Faktorlevel        262
# Punkte              70
# Name: count, dtype: int64


system = f"{system_message}\n\n{suffix}"

# Neuen ReAct-Agent erstellen mit den vollständigen Tools
agent_executor = create_react_agent(llm, tools, state_modifier=system)


In [77]:
# Testanfrage an den Agenten
question = "Wie viele Studienanfänger ohne Abitur gab es 2016?"

for step in agent_executor.stream(
    {"messages": [HumanMessage(content=question)]}, 
    stream_mode="values"
):
    step["messages"][-1].pretty_print()


Wie viele Studienanfänger ohne Abitur gab es 2016?
Tool Calls:
  variable_beschr (call_solyRsVREtB18IOrWBwZj1kv)
 Call ID: call_solyRsVREtB18IOrWBwZj1kv
  Args:
    user_question: Wie viele Studienanfänger ohne Abitur gab es?
  get_reichweite_beschr_list (call_unfSbvGoa878O26x7olRRsDA)
 Call ID: call_unfSbvGoa878O26x7olRRsDA
  Args:
    user_question: Wie viele Studienanfänger ohne Abitur gab es?
Name: get_reichweite_beschr_list

[('Deutschland',)]
Tool Calls:
  sql_db_list_tables (call_0NqimURZUdWcelmQtegNSrHr)
 Call ID: call_0NqimURZUdWcelmQtegNSrHr
  Args:
Name: sql_db_list_tables

view_daten_reichweite_menge
Tool Calls:
  sql_db_schema (call_DDbWgS6xlqXi6dnvUbvsFkIF)
 Call ID: call_DDbWgS6xlqXi6dnvUbvsFkIF
  Args:
    table_names: view_daten_reichweite_menge
Name: sql_db_schema


CREATE TABLE view_daten_reichweite_menge (
	daten_id INTEGER, 
	variable_id INTEGER, 
	variable_beschr VARCHAR, 
	zeit_start TIMESTAMP WITHOUT TIME ZONE, 
	zeit_ende TIMESTAMP WITHOUT TIME ZONE, 
	zeit_ei

In [None]:
%%sql duckdb:///C:/Users/mhu/Documents/github/magpie_chatbot/data/view_magpie.db
SELECT * FROM view_daten_reichweite_menge LIMIT 10;


In [None]:
%%sql duckdb:///C:/Users/mhu/Documents/github/magpie_chatbot/data/view_magpie.db
SELECT * FROM view_daten_reichweite_menge LIMIT 10;
