In [1]:
import os
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
groq_api_key = os.getenv("GROQ_API_KEY")


In [None]:
from pprint import pprint
from langchain_core.messages import BaseMessage, HumanMessage


In [None]:
from typing import TypedDict,List
class ChatState(TypedDict):
    di_error_messages: List[BaseMessage]
    dq_error_messages: List[BaseMessage]
    dp_error_messages: List[BaseMessage]
    afl_job_name: str
    router_node_accessed: str
    di_node_accessed: str
    dp_nodes_accessed: str
    dq_node_accessed: str
    dummy_node_accessed: str

In [None]:
##Using an LLM as simple ChatBot

In [3]:
from langchain_groq import ChatGroq
llm = ChatGroq(
model="openai/gpt-oss-20b",
temperature=0,
max_tokens=None,
reasoning_format="parsed",
timeout=None,
max_retries=2)




In [4]:
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful L3 IT Ops assistant that bifurcates the  {input} to one of these 3 : DI module , DP Module or DQ Module. If the module name starts with DI then its DI and likewise . If you dont know you say Dummy Module.",
        ),
        ("human", "{input}"),
    ]
)

chain = prompt | llm
chain.invoke(
    {
        
        "input": "DMJB0001",
    }
)

AIMessage(content='Dummy Module', additional_kwargs={'reasoning_content': 'We need to parse the instruction: "You are a helpful L3 IT Ops assistant that bifurcates the DMJB0001 to one of these 3 : DI module , DP Module or DQ Module. If the module name starts with DI then its DI and likewise . If you dont know you say Dummy Module."\n\nWe have DMJB0001. The module name starts with "DMJB". That doesn\'t start with DI, DP, or DQ. So we don\'t know. According to instruction, if you don\'t know, say Dummy Module. So answer: Dummy Module.'}, response_metadata={'token_usage': {'completion_tokens': 130, 'prompt_tokens': 137, 'total_tokens': 267, 'completion_time': 0.136563619, 'prompt_time': 0.007018905, 'queue_time': 0.20016126, 'total_time': 0.143582524}, 'model_name': 'openai/gpt-oss-20b', 'system_fingerprint': 'fp_e99e93f2ac', 'service_tier': 'on_demand', 'finish_reason': 'stop', 'logprobs': None}, id='run--90dca8c6-53a3-46ff-a33a-2d99a6259378-0', usage_metadata={'input_tokens': 137, 'outp

In [5]:
import os
from dotenv import load_dotenv

load_dotenv()  # read from .env file

USER = os.getenv("DB_USER")
PASSWORD = os.getenv("DB_PASSWORD")
HOST = os.getenv("DB_HOST")
PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME")

In [15]:
import os
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

def get_latest_error(job_name: str) -> dict:
    """
    Connects to PostgreSQL, queries error_logs for the latest error for a given job, 
    and returns the result.
    Args:
    job_name : Airflow job name that as failed
    """

    # Connection details (could also be read from environment or secrets.toml)
    DB_USER = os.getenv("DB_USER", "your_user")
    DB_PASSWORD = os.getenv("DB_PASSWORD", "your_password")
    DB_HOST = os.getenv("DB_HOST", "localhost")
    DB_PORT = os.getenv("DB_PORT", "5432")
    DB_NAME = os.getenv("DB_NAME", "your_db")

    conn_str = (
        f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
        "?sslmode=require"
    )

    engine = create_engine(conn_str)

    query = text("""
        SELECT *
        FROM error_logs
        WHERE job_name = :job_name
        ORDER BY created_at 
        LIMIT 1;
    """)

    try:
        with engine.connect() as conn:
            result = conn.execute(query, {"job_name": job_name}).fetchone()
            if result:
                return dict(result._mapping)  # Convert Row to dict
            else:
                return {"message": f"No errors found for job: {job_name}"}

    except SQLAlchemyError as e:
        return {"error": f"SQLAlchemy error: {str(e)}"}
    except Exception as e:
        return {"error": f"Unexpected error: {str(e)}"}
    finally:
        engine.dispose()





In [16]:
job_name_input = "customer_ingestion"
latest_error = get_latest_error(job_name_input)
print(latest_error)

{'id': 1, 'module_name': 'DI', 'job_name': 'customer_ingestion', 'error_code': 'SRC001', 'error_message': 'Source file missing for 2025-09-30', 'stack_trace': None, 'severity': 'ERROR', 'created_at': datetime.datetime(2025, 10, 3, 19, 38, 36, 804027), 'created_by': 'ETL_Pipeline'}


In [17]:
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful L3 IT Ops assistant that bifurcates the  {input} to one of these 3 : DI module , DP Module or DQ Module. If the module name starts with DI then its DI and likewise . If you dont know you say Dummy Module. If you are asked to find the latest error for the job use the tool call available." 
            ,
        ),
        ("human", "{input}"),
    ]
)
llm_with_tools  = llm.bind_tools([get_latest_error])
chain = prompt | llm_with_tools


In [18]:
chain.invoke(
    {
        
        "input": "DIJB0001",
    }
)

AIMessage(content='DIJB0001 belongs to the **DI Module**.', additional_kwargs={'reasoning_content': 'We need to classify DIJB0001 into DI module, DP Module, or DQ Module based on module name starts with DI, DP, DQ. DIJB0001 starts with DI, so DI module. Also if asked to find latest error for the job, we can call tool. But user just gave DIJB0001. So respond: DI module.'}, response_metadata={'token_usage': {'completion_tokens': 97, 'prompt_tokens': 237, 'total_tokens': 334, 'completion_time': 0.101058245, 'prompt_time': 0.012386148, 'queue_time': 0.245050525, 'total_time': 0.113444393}, 'model_name': 'openai/gpt-oss-20b', 'system_fingerprint': 'fp_80501ff3a1', 'service_tier': 'on_demand', 'finish_reason': 'stop', 'logprobs': None}, id='run--ada12e5c-e22d-432f-8920-611caae35a8e-0', usage_metadata={'input_tokens': 237, 'output_tokens': 97, 'total_tokens': 334})

In [20]:
tool_call=chain.invoke(
    {
        
        "input": "What is the latest error for the job customer_ingestion",
    }
)

In [21]:
tool_call.tool_calls

[{'name': 'get_latest_error',
  'args': {'job_name': 'customer_ingestion'},
  'id': 'fc_f5c00cfb-7266-4851-9e08-e6925e97a27b',
  'type': 'tool_call'}]