<a href="https://colab.research.google.com/github/varakugemini/hrcopilot.notifyauto.01/blob/main/AI_Agent_for_Notifications_Management.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# main.py

import os
import logging
import uuid

from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

from .email_adapter import EmailSender, SMTPEmailSender
from .langchain_components import get_email_template, generate_email_content
from .langgraph_agent import (
    ClarificationTool,
    RetrieveContextTool,
    EmailGenerationTool,
    SendEmailTool,
    graph,
)
from .supabase_client import supabase, initialize_supabase
from .llm_client import llm, initialize_llm
from .config import settings
from .models import EmailRequest, GenerateEmailRequest
from .job_store import get_job, set_job_status

# --- Logging Setup ---
import structlog
from structlog.processors import JSONRenderer

# Configure structlog for JSON logs with request_id
def add_request_id(logger, method_name, event_dict):
    """Adds a unique request ID to the log context."""
    event_dict["request_id"] = str(uuid.uuid4())
    return event_dict

structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        add_request_id,  # Add request ID to logs
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        JSONRenderer(),
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

logger = get_logger()

# --- Initialize FastAPI app ---
app = FastAPI()

# --- Security ---
security = HTTPBearer()


def validate_api_key(
    credentials: HTTPAuthorizationCredentials = Depends(security),
):
    """Validate the provided API key."""
    if (credentials.scheme != "Bearer"
            or credentials.credentials != settings.api_key):
        raise HTTPException(status_code=401, detail="Invalid or missing API key")
    return True


# Initialize clients
initialize_supabase()
initialize_llm()

# Initialize email sender
email_sender: EmailSender = SMTPEmailSender()

# --- FastAPI Endpoints ---


@app.post(
    "/v1/send-email/",  # Versioned route
    response_class=JSONResponse,
    dependencies=[Depends(validate_api_key)],
)
async def send_email_endpoint(
    request: EmailRequest,
    background_tasks: BackgroundTasks,
):
    try:
        # Generate a unique job ID (using UUID)
        job_id = str(uuid.uuid4())

        # Run the LangGraph agent (adapt input as needed)
        output = graph.run(initial_input={
            "to_email": request.recipient_email,
            "subject": request.subject,
            "body": request.body,
            "template_id": request.template_id
        })

        # Insert job into Supabase with initial status
        supabase.table("jobs").insert({"id": job_id, "status": "pending"}).execute()

        # Send email in background
        background_tasks.add_task(email_sender.send_email,
                                 request.recipient_email,
                                 request.subject,
                                 request.body,
                                 request.template_id,
                                 job_id)

        return JSONResponse({
            "message": "Email task added to background.",
            "job_id": job_id
        })
    except Exception as e:
        logger.error("Error processing email", error=str(e))
        raise HTTPException(status_code=500, detail="Failed to send email")


@app.post(
    "/v1/generate-email/",  # Versioned route
    response_class=JSONResponse,
    dependencies=[Depends(validate_api_key)],
)
async def generate_email_endpoint(request: GenerateEmailRequest):
    try:
        template = get_email_template(request.template_id)
        content = generate_email_content(template, **request.dict())
        return JSONResponse({"content": content})
    except Exception as e:
        logger.error("Error generating email", error=str(e))
        raise HTTPException(status_code=500, detail="Failed to generate email")


@app.get(
    "/v1/status/{job_id}",  # Versioned route
    response_class=JSONResponse,
    dependencies=[Depends(validate_api_key)],
)
async def get_job_status(job_id: str):
    try:
        status = get_job(job_id)
        if status:
            return JSONResponse({"job_id": job_id, "status": status["status"]})
        else:
            raise HTTPException(status_code=404, detail="Job not found")
    except Exception as e:
        logger.error("Error fetching job status", error=str(e))
        raise HTTPException(status_code=500,
                             detail="Failed to fetch job status")


# --- Error Handling ---
@app.exception_handler(Exception)  # Catch all unhandled exceptions
async def global_exception_handler(request, exc):
    logger.exception("Unhandled exception", exc_info=exc)  # Log stack trace
    return JSONResponse(status_code=500,
                        content={"detail": "Internal server error"})


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

ModuleNotFoundError: No module named 'fastapi'

In [None]:
# email_adapter.py

from abc import ABC, abstractmethod
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

from tenacity import retry, stop_after_attempt, wait_exponential

from .supabase_client import supabase
from .config import settings
from .job_store import set_job_status
from structlog import get_logger

logger = get_logger()


class EmailSender(ABC):
    @abstractmethod
    def send_email(self, to_email, subject, body, template_id=None, job_id=None):
        pass


class SMTPEmailSender(EmailSender):
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def send_email(self,
                  to_email,
                  subject,
                  body,
                  template_id=None,
                  job_id=None):
        message = MIMEMultipart()
        message['From'] = settings.sender_email
        message['To'] = to_email
        message['Subject'] = subject

        # Add both plain text and HTML versions
        message.attach(MIMEText(body, 'plain'))
        message.attach(MIMEText(f"<html><body>{body}</body></html>", 'html'))

        try:
            with smtplib.SMTP(settings.smtp_server,
                             settings.smtp_port) as server:
                server.starttls()
                server.login(settings.sender_email, settings.sender_password)
                server.sendmail(settings.sender_email, to_email,
                                message.as_string())
            logger.info("Email sent successfully",
                        to_email=to_email,
                        job_id=job_id)
            # Log success to Supabase with job_id
            supabase.table("notifications").insert({
                "type": "email",
                "recipient": to_email,
                "subject": subject,
                "template_id": template_id,
                "status": "sent",
                "job_id": job_id
            }).execute()
            set_job_status(job_id, "success")
        except Exception as e:
            logger.error("Error sending email",
                         to_email=to_email,
                         error=str(e),
                         job_id=job_id)
            # Log error to Supabase with job_id
            supabase.table("notifications").insert({
                "type": "email",
                "recipient": to_email,
                "subject": subject,
                "template_id": template_id,
                "status": "failed",
                "error": str(e),
                "job_id": job_id
            }).execute()
            set_job_status(job_id, "failed")

In [None]:
# langchain_components.py

from typing import Dict, Any

from langchain import PromptTemplate, LLMChain

from .llm_client import llm
from .supabase_client import supabase
from structlog import get_logger

logger = get_logger()


def get_email_template(template_id: int) -> Dict[str, Any]:
    """Fetches an email template from Supabase."""
    try:
        data = supabase.table("email_templates").select("*").eq(
            "id", template_id).execute()
        if data.data:
            return data.data[0]
        else:
            raise ValueError(f"Template with ID {template_id} not found.")
    except Exception as e:
        logger.error("Error fetching email template", error=str(e))
        raise HTTPException(status_code=500,
                             detail="Failed to fetch email template")


def generate_email_content(template: Dict[str, Any], **kwargs) -> str:
    """Generates email content using LangChain with Chain of Thought prompting."""
    try:
        # Load prompt template from an external file
        with open("prompts/email_generation_prompt.txt", "r") as f:
            prompt_template = f.read()

        prompt = PromptTemplate(
            input_variables=["topic", "recipient_name", "context"],
            template=prompt_template,
        )
        email_chain = LLMChain(llm=llm, prompt=prompt)
        return email_chain.run(kwargs)
    except Exception as e:
        logger.error("Error generating email content", error=str(e))
        # Provide a fallback email body
        return "Error generating email content. Please try again later."

In [None]:
# langgraph_agent.py

from langgraph import Graph, Node, Tool

from .email_adapter import email_sender
from .langchain_components import get_email_template, generate_email_content
from .llm_client import llm
from .supabase_client import supabase


# --- LangGraph Tools ---
class ClarificationTool(Tool):
    def __init__(self, llm):
        super().__init__()
        self.llm = llm

    def run(self, topic: str, recipient_name: str = None,
            **kwargs) -> dict:
        # Implement logic to ask clarifying questions if needed
        if not recipient_name:
            recipient_name = self.llm.predict(
                f"Who should the email about {topic} be sent to?")
        return {
            "topic": topic,
            "recipient_name": recipient_name,
            **kwargs
        }


class RetrieveContextTool(Tool):
    def __init__(self, llm, supabase):
        super().__init__()
        self.llm = llm
        self.supabase = supabase

    def run(self, topic: str, recipient_name: str, **kwargs) -> dict:
        # Implement RAG flow to retrieve relevant context
        retrieved_context = (
            "Sample retrieved context from Supabase..."
        )  # Replace with actual logic
        return {
            "topic": topic,
            "recipient_name": recipient_name,
            "context": retrieved_context,
            **kwargs
        }


class EmailGenerationTool(Tool):
    def run(self, template_id: int, **kwargs) -> str:
        template = get_email_template(template_id)
        return generate_email_content(template, **kwargs)


class SendEmailTool(Tool):
    def __init__(self, email_sender):
        super().__init__()
        self.email_sender = email_sender

    def run(self,
            to_email: str,
            subject: str,
            body: str,
            template_id: int = None):
        self.email_sender.send_email(to_email, subject, body, template_id)
        return "Email sent successfully!"


# Initialize LangGraph tools
clarification_tool = ClarificationTool(llm)
retrieve_context_tool = RetrieveContextTool(llm, supabase)
email_generation_tool = EmailGenerationTool()
send_email_tool = SendEmailTool(email_sender)

# --- LangGraph Workflow ---
graph = Graph()

clarify_node = Node(name="clarify", tool=clarification_tool)
retrieve_context_node = Node(name="retrieve_context",
                             tool=retrieve_context_tool)
generate_node = Node(name="generate_content", tool=email_generation_tool)
send_node = Node(name="send", tool=send_email_tool)

# Connect nodes with clear comments
graph.add_edge(
    clarify_node, retrieve_context_node
)  # Clarify input first  (e.g., missing recipient)
graph.add_edge(
    retrieve_context_node, generate_node
)  # Then retrieve context (e.g., from Supabase)
graph.add_edge(
    generate_node, send_node
)  # Finally, generate and send the email

In [None]:
# supabase_client.py

from supabase import create_client, Client

from .config import settings
from structlog import get_logger

logger = get_logger()

supabase: Client = None


def initialize_supabase():
    global supabase
    try:
        supabase = create_client(settings.supabase_url,
                                 settings.supabase_api_key)
    except Exception as e:
        logger.error("Failed to create Supabase client", error=str(e))
        # Consider exiting or marking the service as unhealthy if critical
        raise e

In [None]:
# llm_client.py

from langchain.llms import OpenAI

from .config import settings
from structlog import get_logger

logger = get_logger()

llm = None


def initialize_llm():
    global llm
    try:
        # Enable caching
        llm = OpenAI(openai_api_key=settings.openai_api_key, cache=True)
    except Exception as e:
        logger.error("Failed to initialize LLM", error=str(e))

In [None]:
# config.py

from pydantic import BaseSettings, validator

class Settings(BaseSettings):
    supabase_url: str
    supabase_api_key: str
    openai_api_key: str
    sender_email: str
    sender_password: str
    smtp_server: str
    smtp_port: int
    api_key: str

    class Config:
        env_file = f".env.{os.environ.get('ENV', 'development')}"

    @validator("smtp_port")
    def validate_smtp_port(cls, value):
        if not 1 <= value <= 65535:
            raise ValueError("Invalid SMTP port number")
        return value

settings = Settings()

In [None]:
# models.py

from enum import Enum
from typing import Optional, Dict, Any

from pydantic import BaseModel

class NotificationType(str, Enum):
    EMAIL = "email"
    SMS = "sms"
    # ... add more notification types

class EmailRequest(BaseModel):
    recipient_email: str
    subject: str
    body: str
    template_id: Optional[int] = None
    context: Optional[Dict[str, Any]] = None

class GenerateEmailRequest(BaseModel):
    template_id: int
    topic: str
    recipient_name: Optional[str] = None
    context: Optional[str] = None

In [None]:
# job_store.py

from .supabase_client import supabase
from structlog import get_logger

logger = get_logger()

def get_job(job_id: str):
    """Retrieves a job from Supabase."""
    try:
        data = supabase.table("jobs").select("*").eq("id", job_id).execute()
        if data.data:
            return data.data[0]
        return None
    except Exception as e:
        logger.error("Error retrieving job from Supabase", error=str(e))
        raise  # Or handle the error appropriately

def set_job_status(job_id: str, status: str):
    """Updates the status of a job in Supabase."""
    try:
        supabase.table("jobs").update({"status": status}).

<div class="md-recitation">
  Sources
  <ol>
  <li><a href="https://limkopi.me/a-better-logging-with-python-logging-structlog-and-pythonjsonlogger/">https://limkopi.me/a-better-logging-with-python-logging-structlog-and-pythonjsonlogger/</a></li>
  <li><a href="https://snyk.io/advisor/python/structlog/functions/structlog.processors.StackInfoRenderer">https://snyk.io/advisor/python/structlog/functions/structlog.processors.StackInfoRenderer</a></li>
  <li><a href="https://blog.hszofficial.site/TutorialForPython/%E8%BE%93%E5%85%A5%E8%BE%93%E5%87%BA%E7%AF%87/%E6%8E%A5%E5%8F%A3%E6%9C%8D%E5%8A%A1/RPC%E6%9C%8D%E5%8A%A1/ZERORPC%E6%8E%A5%E5%8F%A3%E6%9C%8D%E5%8A%A1.html">https://blog.hszofficial.site/TutorialForPython/%E8%BE%93%E5%85%A5%E8%BE%93%E5%87%BA%E7%AF%87/%E6%8E%A5%E5%8F%A3%E6%9C%8D%E5%8A%A1/RPC%E6%9C%8D%E5%8A%A1/ZERORPC%E6%8E%A5%E5%8F%A3%E6%9C%8D%E5%8A%A1.html</a></li>
  <li><a href="https://github.com/Jnnamchi/soar-platform-api">https://github.com/Jnnamchi/soar-platform-api</a></li>
  </ol>
</div>