In [37]:
"""

This module is responsible for reading outage email files from a folder
and using an LLM (like GPT-4) to extract structured information
such as partner name, outage duration, cause, etc.

The output is returned as a pandas DataFrame that can later be loaded
into DuckDB for analytics and reporting.

High-level workflow:
1. Read all `.txt` email files from a folder.
2. For each email, send the text to the LLM.
3. Ask the LLM to return JSON data with fixed fields.
4. Convert all outputs into a clean DataFrame.

"""

'\n\nThis module is responsible for reading outage email files from a folder\nand using an LLM (like GPT-4) to extract structured information\nsuch as partner name, outage duration, cause, etc.\n\nThe output is returned as a pandas DataFrame that can later be loaded\ninto DuckDB for analytics and reporting.\n\nHigh-level workflow:\n1. Read all `.txt` email files from a folder.\n2. For each email, send the text to the LLM.\n3. Ask the LLM to return JSON data with fixed fields.\n4. Convert all outputs into a clean DataFrame.\n\n'

In [38]:
import os
import json
import pandas as pd
from datetime import datetime
from langchain_openai import ChatOpenAI  # LangChain's OpenAI chat model interface
from dotenv import load_dotenv 
from pathlib import Path
load_dotenv(override=True)  # take environment variables from .env file


True

In [39]:
# -------------------------------
# Read environment variables
# -------------------------------
HITEC_EMAIL_DIR = os.getenv("GLOBAL_HITEC_EMAIL_DIR")
HITEC_EMAIL_JSON = os.getenv("GLOBAL_HITEC_EMAIL_JSON")
print(HITEC_EMAIL_DIR, HITEC_EMAIL_JSON)

E:\AI Utils\Partner Performance Report\Global_Hitec_Outage_Emails GLOBAL_HITEC_OUTAGE_JSON


In [40]:
# -------------------------------------------------------------------
# STEP 1: Create a helper function to initialize the LLM model
# -------------------------------------------------------------------
def get_llm(model_name="gpt-4o-mini"):
    """
    This function initializes the Large Language Model (LLM).
    In this case, we‚Äôre using the GPT-4 model provided through LangChain.

    - model_name: defines which OpenAI model to use.
    - temperature: controls randomness (0 = deterministic answers).

    Returns:
        llm object ‚Üí can be used to call llm.predict(prompt)
    """
    
    return ChatOpenAI(model_name=model_name, temperature=0)

In [41]:
# -------------------------------------------------------------------
# STEP 2: Define the LLM prompt template
# -------------------------------------------------------------------
PARSER_PROMPT = """
You are an expert system that parses structured outage summary emails.
Your task is to extract outage-related details from the email below.

Extract the following fields and return ONLY valid JSON (no explanations):

- partner_name
- outage_type
- issue_details
- current_status
- business_impact
- manual_processing
- root_cause_available
- outage_start_time (ISO 8601 format)
- outage_end_time (ISO 8601 format)
- duration_hours (numeric)
- resolution_details
- email_subject
- email_date

Input Email:
-------------------
{email_text}
-------------------

Rules:
- Return only valid JSON.
- Convert all dates to ISO 8601 format (e.g., 2025-11-05T07:48:00-08:00).
- If data is missing, put null.
- If start and end times exist, calculate duration_hours.
"""




In [42]:


# -------------------------------------------------------------------
# STEP 3: Create a function to parse ONE email using LLM
# -------------------------------------------------------------------
from urllib import response


def parse_outage_email(llm, email_text: str):
    """
    Sends one email‚Äôs text to the LLM and returns a parsed JSON object.

    Parameters:
        llm        ‚Üí The initialized LLM model object (from get_llm()).
        email_text ‚Üí The raw text of one outage email.

    Returns:
        Dictionary containing all parsed fields + a timestamp.
    """

    # Prepare the LLM prompt by inserting the email text into the template
    prompt = PARSER_PROMPT.format(email_text=email_text)

    try:
        # Send the prompt to the LLM and capture its response
        response = llm.invoke(prompt)
        raw_response = response.content.strip()

        # The LLM might include extra text before/after JSON, so extract clean JSON only
        start = raw_response.find("{")
        end = raw_response.rfind("}")
        if start == -1 or end == -1:
            raise ValueError("No valid JSON object found in LLM response.")

        # Extract JSON substring
        json_str = raw_response[start:end+1]

        # Convert JSON string into Python dictionary
        parsed = json.loads(json_str)

    except Exception as e:
        # If something goes wrong (bad format, LLM failure, etc.)
        # we capture the error but still return a record (so processing continues)
        parsed = {
            "error": str(e),
            "raw_text": email_text[:2000]  # keep partial text for debugging
        }

    # Add a timestamp to track when this email was parsed
    parsed["parsed_at"] = datetime.utcnow().isoformat()

    # Return a Python dictionary of the parsed data
    return parsed



In [43]:

# -------------------------------------------------------------------
# STEP 4: Parse ALL emails from a folder
# -------------------------------------------------------------------
def parse_emails_from_folder(folder_path: str, llm):
    """
    Reads all text (.txt) email files from the given folder,
    uses the LLM to parse each one, and returns a pandas DataFrame.

    Parameters:
        folder_path ‚Üí path to folder containing email files
        llm         ‚Üí LLM object returned by get_llm()

    Returns:
        pandas.DataFrame containing one row per email with structured fields
    """
    #Create full directory path
    full_folder_path = Path(folder_path)
    print("full folder path *********" + str(full_folder_path))
    # Create an empty list to hold results
    records = []

    # Loop through all files in the folder
    for file in os.listdir(full_folder_path):
        # Skip non-text files
        if not file.endswith(".txt"):
            continue

        # Full file path
        full_path = os.path.join(full_folder_path, file)
        print("full file path *********" + str(full_path))

        # Read the email content from the file
        with open(full_path, "r", encoding="utf-8", errors="ignore") as f:
            email_text = f.read()

        print(f"üîç Parsing email file: {file} ...")

        # Parse one email using the LLM
        parsed = parse_outage_email(llm, email_text)

        # Keep track of the original file name for traceability
        parsed["source_file"] = file

        # Add parsed dictionary to list
        records.append(parsed)

    # Convert list of dictionaries into a pandas DataFrame
    df = pd.DataFrame(records)

    print(f"‚úÖ Parsed {len(df)} emails successfully.")
    return df


In [44]:
"""
storage.py
-----------
This module handles storing the parsed outage email data
into a DuckDB database.

DuckDB is a lightweight, high-speed, SQL-based analytical database
that runs in a single file (like SQLite, but optimized for analytics).

The workflow:
1. Initialize a DuckDB connection and create a table (if not exists).
2. Insert parsed data (from a pandas DataFrame) into that table.
"""

import duckdb
import pandas as pd



# -------------------------------------------------------------------
# STEP 1: Define the database schema (table structure)
# -------------------------------------------------------------------
DB_SCHEMA = """
CREATE TABLE IF NOT EXISTS outages (
    partner_name VARCHAR,
    outage_type VARCHAR,
    issue_details VARCHAR,
    current_status VARCHAR,
    business_impact VARCHAR,
    manual_processing VARCHAR,
    root_cause_available VARCHAR,
    outage_start_time TIMESTAMP,
    outage_end_time TIMESTAMP,
    duration_hours DOUBLE,
    resolution_details VARCHAR,
    email_subject VARCHAR,
    email_date DATE,
    parsed_at TIMESTAMP,
    source_file VARCHAR
);
"""



# -------------------------------------------------------------------
# STEP 2: Function to initialize DuckDB
# -------------------------------------------------------------------
def init_db(in_memory=True):
    """
    Initializes the DuckDB database.

    Parameters:
        in_memory ‚Üí if True, creates a temporary DB (for testing);
                     if False, saves data to a local file "outages.duckdb".

    Returns:
        Connection object ‚Üí used to run SQL commands.
    """

    # Create a connection to the database
    if in_memory:
        con = duckdb.connect(database=":memory:")
        print("üß† Using in-memory DuckDB (data will be lost after shutdown).")
    else:
        con = duckdb.connect(database="outages.duckdb")
        print("üíæ Using persistent DuckDB file (outages.duckdb).")

    # Create table if it doesn‚Äôt exist yet
    con.execute(DB_SCHEMA)
    return con



# -------------------------------------------------------------------
# STEP 3: Function to load a DataFrame into DuckDB
# -------------------------------------------------------------------
def load_dataframe_to_duckdb(con, df: pd.DataFrame):
    """
    Inserts the parsed outage email data (from pandas DataFrame)
    into the 'outages' table in DuckDB.

    Parameters:
        con ‚Üí active DuckDB connection
        df  ‚Üí pandas DataFrame containing parsed email data
    """

    if df.empty:
        print("‚ö†Ô∏è No records found ‚Äî nothing to insert.")
        return

    # Register the DataFrame as a temporary table in DuckDB
    con.register("df_temp", df)

    # Insert data into the main outages table using SQL
    con.execute("""
        INSERT INTO outages
        SELECT 
            partner_name,
            outage_type,
            issue_details,
            current_status,
            business_impact,
            manual_processing,
            root_cause_available,
            outage_start_time,
            outage_end_time,
            duration_hours,
            resolution_details,
            email_subject,
            email_date,
            parsed_at,
            source_file
        FROM df_temp
    """)

    # Unregister temporary table
    con.unregister("df_temp")

    print(f"‚úÖ {len(df)} records loaded into DuckDB successfully.")


In [45]:
# llm_tools.py

import json
import io
import base64
import os
import tempfile
import matplotlib.pyplot as plt
from email.message import EmailMessage
import smtplib
from pptx import Presentation
from pptx.util import Inches
from langchain_core.tools import Tool
from langchain.tools import tool


def create_llm(model_name="gpt-4o-mini"):
    return ChatOpenAI(model_name=model_name, temperature=0)

def run_sql_tool_factory(con):
    print("Creating run_sql tool")
    def run_sql(sql):
        print(f"Executing SQL: {sql}")
        forbidden = ["INSERT","UPDATE","DELETE","DROP","ALTER","CREATE","--",";"]
        if any(k in sql.upper() for k in forbidden):
            return json.dumps({"error": "Forbidden SQL command"})
        try:
            df = con.execute(sql).df()
            # convert datetimes
            for c in df.columns:
                if "datetime" in str(df[c].dtype):
                    df[c] = df[c].astype(str)
            return df.to_json(orient="records")
        except Exception as e:
            return json.dumps({"error": str(e)})
    return run_sql

def create_chart_tool(data_json, chart_type="bar", x=None, y=None):
    data = json.loads(data_json)
    if not data:
        raise ValueError("Empty data")

    cols = list(data[0].keys())
    x = x or cols[0]
    y = y or cols[1]

    xs = [str(r[x]) for r in data]
    ys = [r[y] for r in data]

    plt.figure(figsize=(8,4))
    if chart_type == "line":
        plt.plot(xs, ys, marker="o")
    else:
        plt.bar(xs, ys)

    plt.xticks(rotation=45)
    plt.tight_layout()

    buf = io.BytesIO()
    plt.savefig(buf, format="png")
    buf.seek(0)
    plt.close()

    return "data:image/png;base64," + base64.b64encode(buf.read()).decode()

def create_ppt_tool_factory(con):
    def create_ppt(design_json):
        design = json.loads(design_json)
        prs = Presentation()

        for slide_spec in design.get("slides", []):
            layout = prs.slide_layouts[0] if slide_spec["type"] == "title" else prs.slide_layouts[1]
            slide = prs.slides.add_slide(layout)
            slide.shapes.title.text = slide_spec["title"]

            if slide_spec["type"] == "title":
                slide.placeholders[1].text = slide_spec.get("subtitle","")
                continue

            if slide_spec["type"] == "table":
                sql = slide_spec["sql"]
                table_json = run_sql_tool_factory(con)(sql)
                data = json.loads(table_json)

                if not data:
                    continue

                cols = list(data[0].keys())
                rows = len(data)

                table = slide.shapes.add_table(
                    rows+1, len(cols),
                    Inches(0.5), Inches(1.5),
                    Inches(9), Inches(3)
                ).table

                for i,c in enumerate(cols):
                    table.cell(0,i).text = c

                for r in range(rows):
                    for c in range(len(cols)):
                        table.cell(r+1, c).text = str(data[r][cols[c]])

            if slide_spec["type"] == "chart":
                sql = slide_spec["sql"]
                chart_data = run_sql_tool_factory(con)(sql)
                chart_uri = create_chart_tool(chart_data, slide_spec["chart_type"])

                img_bytes = base64.b64decode(chart_uri.split(",")[1])
                tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".png").name
                with open(tmp,"wb") as f:
                    f.write(img_bytes)

                slide.shapes.add_picture(tmp, Inches(1), Inches(1.5), width=Inches(8))
                os.remove(tmp)

        filename = design.get("filename","report.pptx")
        prs.save(filename)
        return filename

    return create_ppt

def send_email_tool_factory(smtp_cfg):
    def send_email(to_email, subject, body, attachment_path):
        msg = EmailMessage()
        msg["To"] = to_email
        msg["From"] = smtp_cfg["user"]
        msg["Subject"] = subject
        msg.set_content(body)

        with open(attachment_path,"rb") as f:
            data = f.read()

        msg.add_attachment(
            data,
            maintype="application",
            subtype="vnd.openxmlformats-officedocument.presentationml.presentation",
            filename=os.path.basename(attachment_path)
        )

        with smtplib.SMTP(smtp_cfg["host"], smtp_cfg["port"]) as s:
            s.starttls()
            s.login(smtp_cfg["user"], smtp_cfg["password"])
            s.send_message(msg)

        return f"Email sent to {to_email}"

    return send_email

def build_tools(con, smtp_cfg):
    return [
        Tool(name="run_sql", func=run_sql_tool_factory(con), description="Run SQL on outages table"),
        Tool(name="create_chart", func=create_chart_tool, description="Create chart from data"),
        Tool(name="create_ppt", func=create_ppt_tool_factory(con), description="Generate PPT"),
        Tool(name="send_email", func=send_email_tool_factory(smtp_cfg), description="Send email with attachment")
    ]


In [None]:
# app.py

import streamlit as st
import os
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent


# ----------------- STREAMLIT CONFIG ------------------
st.set_page_config(page_title="Outage Monitoring POC", layout="wide")
st.title("üîå Outage Monitoring POC")

# ----------------- INIT DB & LLM ---------------------
con = init_db(in_memory=True)
llm = create_llm()

SMTP_CFG = {
    "host": "smtp.gmail.com",
    "port": 587,
    "user": "your_email@gmail.com",
    "password": "your_app_password"
}

tools = build_tools(con, SMTP_CFG)
# Create the agent
agent = create_react_agent(llm, tools)

# Create an executor to run it
#agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)


# ----------------- SIDEBAR ---------------------------
mode = st.sidebar.radio("Mode", ["Chat", "Report Builder", "Admin"])

# ----------------- CHAT MODE -------------------------
if mode == "Chat":
    st.header("üí¨ Chat with Outage Assistant")

    if "history" not in st.session_state:
        st.session_state.history = []

    user_msg = st.chat_input("Ask something about outages‚Ä¶")
    if user_msg:
        st.session_state.history.append(("user", user_msg))

        with st.chat_message("user"):
            st.write(user_msg)

        with st.chat_message("assistant"):
            resp = agent.invoke(user_msg)
            st.write(resp)
            st.session_state.history.append(("assistant", resp))

    for role, msg in st.session_state.history:
        with st.chat_message(role):
            st.write(msg)

# ----------------- REPORT BUILDER ---------------------
elif mode == "Report Builder":
    st.header("üìä Report Builder")

    prompt = st.text_area("Describe the report to generate")

    if st.button("Generate Report"):
        with st.spinner("Generating..."):
            resp = agent.invoke(prompt)

        st.write(resp)

        # Auto-detect PPT generated
        ppts = [f for f in os.listdir(".") if f.endswith(".pptx")]
        if ppts:
            latest = sorted(ppts)[-1]
            with open(latest,"rb") as f:
                st.download_button("‚¨á Download PPT", f, file_name=latest)

# ----------------- ADMIN MODE ------------------------
elif mode == "Admin":
    st.header("üõ† Admin Mode")

    st.subheader("1. Load Outage Emails from Local Folder (POC)")
    folder = st.text_input("Email folder path", "./emails")

    if st.button("Parse Emails"):
        df = parse_emails_from_folder(folder, llm)
        st.dataframe(df.head())
        load_dataframe_to_duckdb(con, df)
        st.success("Emails parsed and loaded into DuckDB!")

    st.markdown("---")
    st.subheader("2. ZIP Upload (coming soon)")
    st.info("This feature will be added later.")

    st.markdown("---")
    st.subheader("3. IMAP Email Ingestion (coming soon)")
    st.info("IMAP integration will be added in a future sprint.")


In [None]:

# Step 1: Initialize the LLM
llm = get_llm()

# Step 2: Parse all emails from folder
df = parse_emails_from_folder(HITEC_EMAIL_DIR, llm)
print(df.head())

# Step 3: Initialize database
con = init_db(in_memory=False)

# Step 4: Load parsed data into DuckDB
load_dataframe_to_duckdb(con, df)

# Step 5: Run a test SQL query
print(con.execute("SELECT * FROM outages").df())


In [None]:
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent


# --- Init LLM and DB ---
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

con = init_db(in_memory=False)
SMTP_CFG = {
    "host": "smtp.gmail.com",
    "port": 587,
    "user": "your_email@gmail.com",
    "password": "your_app_password"
}
tools = build_tools(con, SMTP_CFG)

# --- ‚úÖ Bind tools to LLM so it can call them ---
llm_with_tools = llm.bind_tools(tools)


agent = create_agent(
    model=llm_with_tools,
    tools=tools,
    system_prompt=(
        "You are OutageAnalysisAgent ‚Äî an expert in analyzing IT outage data stored in DuckDB.\n"
        "You have these tools:\n"
        "  - run_sql(sql): runs a SQL query and returns JSON records.\n"
        "  - create_chart(data_json, chart_type, x, y): plots data.\n"
        "  - create_ppt(design_json): creates presentation slides.\n\n"
        "The outages table has these columns:\n"
        "  partner_name, outage_type, issue_details, current_status, business_impact,\n"
        "  manual_processing, root_cause_available, outage_start_time, outage_end_time,\n"
        "  duration_hours, resolution_details, email_subject, email_date, parsed_at, source_file.\n\n"
        "Follow this process:\n"
        "1Ô∏è‚É£ When asked to show, count, or compare outage data ‚Äî call `run_sql()` with an explicit SQL query.\n"
        "2Ô∏è‚É£ Example: to get outages per partner in last month:\n"
        "   run_sql('SELECT partner_name, COUNT(*) AS total_outages FROM outages WHERE email_date >= CURRENT_DATE - INTERVAL 30 DAY GROUP BY partner_name')\n"
        "3Ô∏è‚É£ Then summarize the results in plain English.\n"
        "Do not ask clarifying questions; just use the tools and return the answer."
    ),
)

# --- Run test query ---
response = agent.invoke({"input": "Show total outages per partner for the last six month."})
print(response)


In [None]:
import re
from langchain_openai import ChatOpenAI

# --- Helper functions -------------------------------------------------------

def _try_run_limit0(con, sql):
    """Try running SQL with LIMIT 0 to check if it's valid in DuckDB."""
    if not sql.strip():
        return False, "Empty SQL"
    try:
        test_sql = sql if re.search(r"\bLIMIT\b", sql, flags=re.IGNORECASE) else f"{sql.rstrip(';')} LIMIT 0"
        con.execute(test_sql)
        return True, None
    except Exception as e:
        return False, str(e)


def normalize_sql_to_duckdb(raw_sql: str, con, max_retries: int = 2) -> str:
    """
    Uses the LLM to automatically translate arbitrary SQL into DuckDB-compatible SQL.
    Falls back to safe regex replacements if validation fails.
    """
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    raw_sql = (raw_sql or "").strip()
    if not raw_sql:
        raise ValueError("Empty SQL provided to normalize_sql_to_duckdb")

    base_prompt = f"""
    You are a SQL dialect translator. Convert this SQL into valid DuckDB SQL.
    Return **only** the corrected SQL (no markdown, no code fences, no commentary).

    If the SQL uses non-DuckDB syntax like DATEADD(), CONVERT(), or INTERVAL -6 MONTH,
    rewrite it into DuckDB-compatible form (e.g., CURRENT_DATE - INTERVAL '6' MONTH).

    SQL to fix:
    {raw_sql}
    """

    candidate_sql = raw_sql
    for attempt in range(max_retries):
        resp = llm.invoke(base_prompt).content.strip()
        resp = re.sub(r"```(?:sql)?", "", resp, flags=re.IGNORECASE)
        resp = resp.replace("```", "").strip().rstrip(";")

        ok, err = _try_run_limit0(con, resp)
        if ok:
            print("‚úÖ LLM produced valid DuckDB SQL.")
            return resp

        # Retry: give the LLM the actual error
        print(f"‚ö†Ô∏è Attempt {attempt+1} failed: {err}")
        base_prompt = f"""
        The following SQL failed in DuckDB with error:
        {err}

        Please fix the SQL so it works in DuckDB.
        Return only the corrected SQL text (no markdown, no explanations).

        Previous SQL:
        {resp}
        """
        candidate_sql = resp

    # Final fallback ‚Äî regex-based sanitizer
    print("‚öôÔ∏è Falling back to conservative regex fix...")
    fallback = re.sub(r"```(?:sql)?", "", raw_sql, flags=re.IGNORECASE)
    fallback = re.sub(r"```", "", fallback).strip().rstrip(";")
    fallback = re.sub(
        r"DATEADD\s*\(\s*month\s*,\s*-(\d+)\s*,\s*CURRENT_DATE\s*\)",
        r"CURRENT_DATE - INTERVAL '\1' MONTH",
        fallback,
        flags=re.IGNORECASE,
    )
    fallback = re.sub(r"\bDATEADD\s*\(", "DATE_ADD(", fallback, flags=re.IGNORECASE)
    fallback = re.sub(r"INTERVAL\s*-\s*(\d+)\s*MONTH", r"INTERVAL '\1' MONTH", fallback, flags=re.IGNORECASE)
    ok, err = _try_run_limit0(con, fallback)
    if ok:
        print("‚úÖ Fallback regex produced valid SQL.")
        return fallback

    raise RuntimeError(f"‚ùå Could not fix SQL for DuckDB. Last error: {err}\nCandidate: {candidate_sql}")


# --- Main LangGraph node: generate_sql() -----------------------------------

def generate_sql(state: dict):
    """
    Step 2 in the LangGraph workflow:
    Takes a natural-language user query ‚Üí asks LLM to generate initial SQL ‚Üí validates/fixes for DuckDB.
    """
    from datetime import datetime

    user_query = state.get("user_query")
    con = state.get("db_con")

    if not user_query:
        raise KeyError("Missing 'user_query' in state")
    if not con:
        raise KeyError("Missing 'db_con' in state")

    print(f"üß† Generating SQL for user query: {user_query}")

    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    prompt = f"""
    Generate a DuckDB-compatible SQL SELECT query that answers this question:
    {user_query}

    Table: outages
    Columns: partner_name, outage_type, issue_details, duration_hours, email_date, outage_start_time, outage_end_time

    Rules:
    - Output only SQL (no markdown or explanation)
    - Ensure syntax works in DuckDB
    - Use CURRENT_DATE - INTERVAL 'x' MONTH for date filters
    """
    raw_sql = llm.invoke(prompt).content.strip()
    print(f"üßÆ Raw SQL (before normalization):\n{raw_sql}")

    clean_sql = normalize_sql_to_duckdb(raw_sql, con)
    print(f"üß© Final Clean SQL:\n{clean_sql}")

    return {"sql_query": clean_sql, "generated_at": datetime.utcnow().isoformat()}


In [None]:
"""
Outage Analytics Agent using LangGraph
--------------------------------------

‚úÖ Parses user queries (like "show average outage duration per partner for last 6 months")
‚úÖ Generates SQL dynamically using LLM
‚úÖ Runs SQL against DuckDB
‚úÖ Creates charts + PPTs
‚úÖ Provides natural summaries
‚úÖ Auto-retries with relaxed SQL if query returns empty (self-healing)

"""

import os
import json
import re
import io
import base64
import tempfile
import matplotlib.pyplot as plt
import duckdb
from pptx import Presentation
from pptx.util import Inches
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from typing import TypedDict, Optional, Dict, Any


# ----------------------------------------------------------------------
# 1Ô∏è‚É£ Helper Functions
# ----------------------------------------------------------------------

def init_db(in_memory=False):
    """Initialize DuckDB database."""
    db_file = ":memory:" if in_memory else "outages.duckdb"
    con = duckdb.connect(database=db_file)
    print(f"üíæ Connected to {db_file}")
    con.execute("""
        CREATE TABLE IF NOT EXISTS outages (
            partner_name VARCHAR,
            outage_type VARCHAR,
            issue_details VARCHAR,
            current_status VARCHAR,
            business_impact VARCHAR,
            manual_processing VARCHAR,
            root_cause_available VARCHAR,
            outage_start_time TIMESTAMP,
            outage_end_time TIMESTAMP,
            duration_hours DOUBLE,
            resolution_details VARCHAR,
            email_subject VARCHAR,
            email_date DATE,
            parsed_at TIMESTAMP,
            source_file VARCHAR
        );
    """)
    return con


def run_sql(con, sql: str):
    """Run SQL safely on DuckDB."""
    print(f"Executing SQL: {sql}")
    try:
        df = con.execute(sql).df()
        print(f"‚úÖ SQL returned {len(df)} rows")
        return df.to_json(orient="records")
    except Exception as e:
        print(f"‚ùå SQL Error: {e}")
        return json.dumps({"error": str(e)})


def create_chart(data_json, chart_type="bar", x=None, y=None):
    """Generate chart as base64 image."""
    data = json.loads(data_json)
    if not data:
        raise ValueError("Empty data for chart generation.")

    cols = list(data[0].keys())
    x = x or cols[0]
    y = y or cols[1]
    xs = [str(r[x]) for r in data]
    ys = [r[y] for r in data]

    plt.figure(figsize=(8, 4))
    if chart_type == "line":
        plt.plot(xs, ys, marker="o")
    else:
        plt.bar(xs, ys)
    plt.xticks(rotation=45)
    plt.tight_layout()

    buf = io.BytesIO()
    plt.savefig(buf, format="png")
    buf.seek(0)
    plt.close()
    return "data:image/png;base64," + base64.b64encode(buf.read()).decode()


def create_ppt(data_json, filename="outage_summary.pptx"):
    """Generate a simple PPT with table data."""
    data = json.loads(data_json)
    prs = Presentation()
    slide = prs.slides.add_slide(prs.slide_layouts[5])
    slide.shapes.title.text = "Outage Summary"

    if not data:
        slide.placeholders[0].text = "No outage data found."
    else:
        cols = list(data[0].keys())
        rows = len(data)
        table = slide.shapes.add_table(
            rows+1, len(cols), Inches(0.5), Inches(1.5), Inches(9), Inches(3)
        ).table
        for i, c in enumerate(cols):
            table.cell(0, i).text = c
        for r in range(rows):
            for c in range(len(cols)):
                table.cell(r+1, c).text = str(data[r][cols[c]])

    prs.save(filename)
    return filename


# ----------------------------------------------------------------------
# 2Ô∏è‚É£ Universal Unwrapping + Helpers
# ----------------------------------------------------------------------

def unwrap_state(state):
    """Universal fix ‚Äî unpacks LangGraph‚Äôs nested state safely."""
    if isinstance(state, dict) and "input" in state and isinstance(state["input"], dict):
        print("ü™Ñ Auto-unwrapped LangGraph state.")
        merged = {**state, **state["input"]}
        merged.pop("input", None)
        return merged
    return state


def get_db_con(state):
    """Retrieve DB connection from state."""
    con = state.get("db_con")
    if not con:
        raise ValueError("Missing database connection in state.")
    return con


# ----------------------------------------------------------------------
# 3Ô∏è‚É£ Agent State Definition
# ----------------------------------------------------------------------

class AgentState(TypedDict, total=False):
    user_query: str
    sql_query: Optional[str]
    data_json: Optional[str]
    chart_uri: Optional[str]
    ppt_path: Optional[str]
    final_answer: Optional[str]
    intent: Optional[Dict[str, Any]]
    db_con: Any


# ----------------------------------------------------------------------
# 4Ô∏è‚É£ Core Nodes
# ----------------------------------------------------------------------

def interpret_query(state: AgentState):
    state = unwrap_state(state)
    user_query = state.get("user_query")
    if not user_query:
        raise KeyError("Missing 'user_query' in state input")

    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.0)
    prompt = f"""
    Determine the intent type for this user request and whether a chart or PPT is requested.
    Return a JSON object exactly, e.g. {{ "intent": "sql_query", "needs_chart": true, "notes": "..." }}
    Possible intents: sql_query, summary, report
    User query: {user_query}
    """
    resp = llm.invoke(prompt).content.strip()
    try:
        intent = json.loads(resp)
    except Exception:
        intent = {"intent": "sql_query", "needs_chart": ("chart" in user_query.lower()), "notes": ""}
    print(f"üß≠ Intent: {intent}")
    return {"intent": intent, "user_query": user_query}



def execute_sql(state: AgentState):
    state = unwrap_state(state)
    sql_query = state.get("sql_query")
    con = get_db_con(state)

    data_json = run_sql(con, sql_query)
    return {"data_json": data_json}


def retry_if_empty(state: AgentState):
    state = unwrap_state(state)
    data_json = state.get("data_json")
    sql_query = state.get("sql_query")
    con = get_db_con(state)

    if not data_json or data_json.strip() in ["", "[]", "null", "None"]:
        print("‚ö†Ô∏è No data returned, retrying with relaxed SQL...")
    else:
        try:
            data = json.loads(data_json)
            if data and len(data) > 0:
                print("‚úÖ Data present ‚Äî skipping retry.")
                return {"data_json": data_json, "sql_query": sql_query}
        except Exception:
            pass

    relaxed_sql = re.sub(
        r"WHERE\s+email_date\s*>=\s*CURRENT_DATE\s*-\s*INTERVAL\s+\d+\s+DAY",
        "",
        sql_query,
        flags=re.IGNORECASE,
    )
    relaxed_sql = re.sub(r"WHERE\s*(AND|OR)?\s*$", "", relaxed_sql, flags=re.IGNORECASE).strip()
    print(f"üîÅ Retrying with relaxed SQL:\n{relaxed_sql}")

    data_json_retry = run_sql(con, relaxed_sql)
    return {"data_json": data_json_retry, "sql_query": relaxed_sql}


def maybe_chart_and_ppt(state: AgentState):
    state = unwrap_state(state)
    data_json = state.get("data_json")
    intent = state.get("intent", {})
    needs_chart = intent.get("needs_chart", True)

    chart_uri = ppt_path = None
    if data_json and data_json.strip() not in ["[]", "null", "None"]:
        if needs_chart:
            chart_uri = create_chart(data_json)
            print("üìà Chart generated.")
        ppt_path = create_ppt(data_json)
        print(f"üíæ PPT created: {ppt_path}")
    else:
        print("‚ö†Ô∏è No data available for chart or PPT.")

    return {"chart_uri": chart_uri, "ppt_path": ppt_path}


def summarize_results(state: AgentState):
    state = unwrap_state(state)
    data_json = state.get("data_json")
    user_query = state.get("user_query")

    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.0)
    prompt = f"""
    Summarize the following outage analytics result for this query:
    {user_query}
    Data: {data_json[:2000]}
    Return a concise business-friendly summary.
    """
    summary = llm.invoke(prompt).content.strip()
    print(f"üìù Summary: {summary}")
    return {"final_answer": summary}


# ----------------------------------------------------------------------
# 5Ô∏è‚É£ Graph Definition
# ----------------------------------------------------------------------

def build_outage_agent_graph():
    graph = StateGraph(AgentState)

    graph.add_node("interpret", interpret_query)
    graph.add_node("generate_sql", generate_sql)
    graph.add_node("execute_sql", execute_sql)
    graph.add_node("retry_if_empty", retry_if_empty)
    graph.add_node("maybe_chart_and_ppt", maybe_chart_and_ppt)
    graph.add_node("summarize", summarize_results)

    graph.add_edge("interpret", "generate_sql")
    graph.add_edge("generate_sql", "execute_sql")
    graph.add_edge("execute_sql", "retry_if_empty")
    graph.add_edge("retry_if_empty", "maybe_chart_and_ppt")
    graph.add_edge("maybe_chart_and_ppt", "summarize")

    graph.set_entry_point("interpret")
    graph.set_finish_point("summarize")
    return graph.compile()





In [75]:
# ----------------------------------------------------------------------
# 6Ô∏è‚É£ Run Example
# ----------------------------------------------------------------------

if __name__ == "__main__":
    con = init_db(in_memory=False)
    app = build_outage_agent_graph()

    user_query = "Show average outage duration per partner for the last 6 months"

    print("\nüöÄ Running outage analytics agent...\n")
    response = app.invoke({"user_query": user_query, "db_con": con})

    print("\n=== FINAL RESPONSE ===")
    print(response.get("final_answer") or "No final answer generated.")
    if response.get("chart_uri"):
        print("Chart URI (truncated):", response["chart_uri"][:200])
    if response.get("ppt_path"):
        print("PPT saved at:", response["ppt_path"])

üíæ Connected to outages.duckdb

üöÄ Running outage analytics agent...

üß≠ Intent: {'intent': 'report', 'needs_chart': True, 'notes': 'The user is requesting a report on average outage duration, which typically involves data visualization.'}
üßÆ Generated Clean SQL:
SELECT partner_name, AVG(duration_hours) AS average_outage_duration
FROM outages
WHERE email_date >= DATE_ADD(CURRENT_DATE, INTERVAL -6 MONTH)
GROUP BY partner_name
Executing SQL: SELECT partner_name, AVG(duration_hours) AS average_outage_duration
FROM outages
WHERE email_date >= DATE_ADD(CURRENT_DATE, INTERVAL -6 MONTH)
GROUP BY partner_name
‚ùå SQL Error: Parser Error: syntax error at or near "MONTH"

LINE 3: WHERE email_date >= DATE_ADD(CURRENT_DATE, INTERVAL -6 MONTH)
                                                               ^
‚úÖ Data present ‚Äî skipping retry.


KeyError: 0