# Employee Directory Chatbot

Use this Databricks notebook to provision the `employees` table and run a SQL-aware chatbot that answers questions about your employee directory.

## Prerequisites

1. Run the `employees_table.sql` script (included in this repository) or execute the setup cell below to create and seed the `employees` table.
2. Deploy or reuse a Databricks Model Serving endpoint that supports Chat Completions (for example, DBRX Instruct).
3. Grant this notebook permission to query a SQL warehouse or Unity Catalog schema that holds the employees table.

In [None]:
# COMMAND ----------
# Widget-driven configuration so the notebook can run in multiple environments.
import os

try:
    dbutils
except NameError:  # pragma: no cover
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark)

def ensure_widget(name: str, default: str, label: str) -> None:
    if name not in dbutils.widgets.getArgumentNames():
        dbutils.widgets.text(name, default, label)

ensure_widget("catalog_name", "main", "Unity Catalog (or hive_metastore)")
ensure_widget("schema_name", "default", "Schema")
ensure_widget("serving_endpoint_name", "", "Serving Endpoint Name")
ensure_widget("table_name", "employees", "Employees Table")

CATALOG = dbutils.widgets.get("catalog_name") or "main"
SCHEMA = dbutils.widgets.get("schema_name") or "default"
TABLE = dbutils.widgets.get("table_name") or "employees"
SERVING_ENDPOINT = dbutils.widgets.get("serving_endpoint_name") or os.getenv("SERVING_ENDPOINT")

if not SERVING_ENDPOINT:
    raise ValueError("Set the `serving_endpoint_name` widget or SERVING_ENDPOINT env var.")

EMPLOYEE_TABLE_FQN = f"{CATALOG}.{SCHEMA}.{TABLE}" if CATALOG else f"{SCHEMA}.{TABLE}"
print(f"Using table: {EMPLOYEE_TABLE_FQN}")
print(f"Serving endpoint: {SERVING_ENDPOINT}")

In [None]:
# COMMAND ----------
# (Optional) Create and seed the employees table directly from the notebook.
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE {SCHEMA}")
spark.sql(
    f'''
    CREATE TABLE IF NOT EXISTS {EMPLOYEE_TABLE_FQN} (
      id INT GENERATED ALWAYS AS IDENTITY,
      name STRING,
      department STRING,
      role STRING,
      start_date DATE
    )
    '''
)
spark.sql(f"DELETE FROM {EMPLOYEE_TABLE_FQN}")
spark.sql(
    f'''
    INSERT INTO {EMPLOYEE_TABLE_FQN} (name, department, role, start_date) VALUES
      ('Alice', 'Data Analytics', 'Engineer', DATE '2022-01-15'),
      ('Bob', 'Data Science', 'Analyst', DATE '2023-03-10'),
      ('Carol', 'IT', 'Support', DATE '2021-07-22')
    '''
)
display(spark.sql(f"SELECT * FROM {EMPLOYEE_TABLE_FQN}"))

In [None]:
# COMMAND ----------
import json
from datetime import datetime
from typing import Dict, List

from mlflow.deployments import get_deploy_client

deploy_client = get_deploy_client("databricks")
SQL_SCHEMA_DESCRIPTION = f"Table {EMPLOYEE_TABLE_FQN} columns: id (INT, identity primary key), name (STRING), department (STRING), role (STRING), start_date (DATE). Sample rows include Alice (Data Analytics Engineer), Bob (Data Science Analyst), and Carol (IT Support)."
SQL_PLANNER_SYSTEM_PROMPT = (
    "You are an expert Databricks SQL planner. Given a user question and chat history, produce a safe SELECT query that only reads from the employees table."
    f" Use the fully qualified name {EMPLOYEE_TABLE_FQN}."
    " Return strict JSON with keys: sql_query (string), reasoning (string), requires_clarification (bool), clarification_prompt (string)."
    " If the answer cannot be determined, set requires_clarification=true and explain."
    " Never perform write operations."
    f" Table schema: {SQL_SCHEMA_DESCRIPTION}. Provide only JSON."
)
ANSWER_SYSTEM_PROMPT = (
    "You are an HR assistant who explains employee data retrieved from Databricks SQL. "
    "Given the question, SQL query, and rows, respond conversationally and cite available employee names. If the rows list is empty, say that no employees matched."
)
chat_history: List[Dict[str, str]] = []

def call_llm(messages: List[Dict[str, str]], temperature: float = 0.1, max_tokens: int = 400) -> str:
    response = deploy_client.predict(
        endpoint=SERVING_ENDPOINT,
        inputs={"messages": messages, "temperature": temperature, "max_tokens": max_tokens}
    )
    return response['choices'][0]['message']['content']

def plan_sql(question: str) -> Dict[str, str]:
    payload = {"question": question, "history": chat_history}
    messages = [
        {"role": "system", "content": SQL_PLANNER_SYSTEM_PROMPT},
        {"role": "user", "content": json.dumps(payload)}
    ]
    content = call_llm(messages, temperature=0.0)
    try:
        return json.loads(content)
    except json.JSONDecodeError as exc:
        raise ValueError(f"Planner returned non-JSON payload: {content}") from exc

def run_employee_query(sql_query: str) -> List[Dict[str, str]]:
    df = spark.sql(sql_query)
    return [row.asDict(recursive=True) for row in df.collect()]

def craft_answer(question: str, sql_query: str, rows: List[Dict[str, str]]) -> str:
    payload = {"question": question, "sql_query": sql_query, "rows": rows}
    messages = [
        {"role": "system", "content": ANSWER_SYSTEM_PROMPT},
        {"role": "user", "content": json.dumps(payload, default=str)}
    ]
    return call_llm(messages, temperature=0.2, max_tokens=350).strip()

def ask_employee_bot(question: str) -> Dict[str, str]:
    plan = plan_sql(question)
    if plan.get("requires_clarification"):
        clarification = plan.get("clarification_prompt", "Could you clarify your request?")
        chat_history.append({"role": "assistant", "content": clarification})
        return {"answer": clarification, "sql_query": None, "rows": []}

    sql_query = plan.get("sql_query")
    if not sql_query:
        raise ValueError(f"Planner did not return sql_query: {plan}")

    rows = run_employee_query(sql_query)
    answer = craft_answer(question, sql_query, rows)
    timestamp = datetime.utcnow().isoformat()
    chat_history.extend([
        {"role": "user", "content": question, "timestamp": timestamp},
        {"role": "assistant", "content": answer, "timestamp": timestamp}
    ])
    return {"answer": answer, "sql_query": sql_query, "rows": rows, "plan": plan}

In [None]:
        # COMMAND ----------
        # Example chat interactions. Replace the sample questions with user input or drive from a UI widget.
        sample_questions = [
            "Who works in Data Analytics?",
            "List employees hired after 2022."
        ]

        for question in sample_questions:
            result = ask_employee_bot(question)
            print(f"
Question: {question}")
            print(f"SQL: {result['sql_query']}")
            print(f"Answer: {result['answer']}")

In [None]:
# COMMAND ----------
# Inspect the running conversation history (optional).
display(chat_history)