In [37]:
import sys
import os
# sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from logger_config import logger

from pydantic import BaseModel, Field
from typing import Optional, Literal, TypedDict, List
from langchain_groq import ChatGroq
from langchain_core.messages import BaseMessage
from langchain.schema import SystemMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph, END 
from pprint import pprint

from dotenv import load_dotenv

load_dotenv()

True

In [28]:
llm = ChatGroq(model="llama-3.1-8b-instant")

In [29]:
class PrimaryFields(BaseModel):
    sender_name: Optional[str] = Field(None, description="Name of the person or entity who sent the email")
    topic: Optional[str] = Field(None, description="Main topic or subject discussed in the email")
    action_required: Optional[bool] = Field(None, description="Whether the recipient is expected to take any action")
    amount: Optional[str] = Field(None, description="Amount mentioned if related to a bill, receipt or payment")
    due_date: Optional[str] = Field(None, description="Deadline for any required action (ISO format)")
    location: Optional[str] = Field(None, description="Relevant location mentioned in the email, if any")

class PromotionsFields(BaseModel):
    product: Optional[str] = Field(None, description="Name or type of product being promoted")
    discount: Optional[str] = Field(None, description="Discount amount or percentage mentioned")
    valid_until: Optional[str] = Field(None, description="Expiration date of the promotion (ISO format)")
    vendor: Optional[str] = Field(None, description="Vendor or brand offering the promotion")
    promo_code: Optional[str] = Field(None, description="Promotional code provided, if any")
    urgency: Optional[str] = Field(None, description="Time-sensitive language such as 'limited time' or 'today only'")

class SocialFields(BaseModel):
    platform: Optional[str] = Field(None, description="Social media platform (e.g., Facebook, Twitter)")
    notification_type: Optional[str] = Field(None, description="Type of notification (e.g., friend request, comment)")
    from_user: Optional[str] = Field(None, description="User who triggered the notification")
    action_summary: Optional[str] = Field(None, description="Brief description of the social interaction")

class UpdatesFields(BaseModel):
    entity: Optional[str] = Field(None, description="Entity involved in the update (e.g., bank, utility)")
    amount: Optional[str] = Field(None, description="Amount billed or referenced in the update")
    due_date: Optional[str] = Field(None, description="Due date for the payment or action (ISO format)")
    statement_type: Optional[str] = Field(None, description="Type of statement or update (e.g., credit card, utility bill)")
    account_ref: Optional[str] = Field(None, description="Account number or reference")
    status: Optional[str] = Field(None, description="Current status or outcome (e.g., paid, overdue, closed)")

class ReplyDraft(BaseModel):
    to: Optional[str] = Field(None, description="Recipient email address")
    subject: Optional[str] = Field(None, description="Subject of the reply email")
    body: Optional[str] = Field(None, description="Main content of the reply email")


class Email(BaseModel):
    sender: str = Field(..., description="Email address of the sender")
    date: Optional[str] = Field(None, description="Date the email was sent")
    subject: Optional[str] = Field(None, description="Subject line of the email")
    body: Optional[str] = Field(None, description="Full text or HTML content of the email body")



In [30]:
# Graph state definition
class EmailGraphState(TypedDict, total=False):
    email: Email
    intent: Optional[Literal["primary", "promotions", "social", "updates"]] # Optional because when we first build state, we will have only email variable and all other fields will be populated later
    intent_reason: Optional[str]
    extracted_fields: Optional[BaseModel]
    summary: Optional[str]
    is_spam: Optional[bool]
    spam_reason: Optional[str]
    reply_needed: Optional[bool]
    reply_draft: Optional[ReplyDraft]
    event_present: Optional[bool]
    calendar_event: Optional[dict]
    messages: Optional[List[BaseMessage]]

    model_config = {
        "frozen": False  # Allows mutation
    }



In [31]:
class IntentOutput(BaseModel):
    intent: Literal["primary", "promotions", "social", "updates"] = Field(..., description="Email intent category: primary, promotions, social, or updates")
    reason: str = Field(..., description="Justification or explanation for the selected intent")


def classify_intent_node(state: EmailGraphState) -> EmailGraphState:
    email = state["email"]
    logger.info("state: ")
    logger.info(state)
    human_message = HumanMessage(content=(
        f"From: {email.sender}\n"
        f"Date: {email.date or ''}\n"
        f"Subject: {email.subject or ''}\n"
        f"Body:\n{email.body or ''}\n"
    ))

    system_message = SystemMessage(content=(
        """
        You are an email classification assistant. Your task is to classify incoming emails into one of the following categories:

        1. **Primary** - Personal conversations, bills, receipts, and important updates from services used.
        2. **Promotions** - Marketing emails, discount offers, sales, newsletters, and product recommendations.
        3. **Social** - Notifications from social networks like LinkedIn, Facebook, Twitter (X), Instagram, etc.
        4. **Updates** - Service-related updates, confirmations, statements, invoices, or other automated system messages.

        Return a structured JSON with the intent and a short reason.

        **Important Note** - Use only the content provided to you. Do not hallucinate values.
        """ 
    ))
    response = llm.with_structured_output(IntentOutput).invoke([system_message, human_message])
    logger.info(f"intent classification response: {response}")
    return EmailGraphState(
        **state,
        intent=response.intent,
        intent_reason=response.reason
    )

In [32]:
sample_email = Email(
    sender="jobs@example.com",
    subject="Interview Invitation",
    body="We are pleased to invite you...",
    date="2025-05-20"
)

initial_state = {"email": sample_email}

In [33]:
class ReplyOutput(BaseModel):
    reply_needed: bool = Field(..., description="Whether a reply is needed to this email")
    reason: Optional[str] = Field(None, description="Justification for reply decision")
    draft: Optional[ReplyDraft] = Field(None, description="Structured draft to be used if a reply is needed")


def generate_reply_node(state: EmailGraphState) -> EmailGraphState:
    logger.info("state: ")
    logger.info(state)
    email = state["email"]

    messages = state.get("messages", [])
    feedback = None
    old_draft = state.get("reply_draft")

    # Check for latest human feedback (if any)
    if messages and messages[-1].type == "human":
        content = messages[-1].content.strip()
        logger.info(f"feedback message from user: {content}")
        if content.lower() != "approved":
            feedback = content

    if feedback and old_draft:
        system_msg = SystemMessage(content="""
        You are an email assistant. Revise the draft format (if available)
        with 'to', 'subject', and 'body' based on the feedback given. 
        If there is no draft available, generate a new draft based on feedback given by user
        """)
        human_msg = HumanMessage(content=(
            "Original Email:\n"
            f"From: {email.sender}\n"
            f"Date: {email.date or ''}\n"
            f"Subject: {email.subject or ''}\n"
            f"Body:\n{email.body or ''}\n"
            "\n"
            "Current Draft:\n"
            f"To: {old_draft.to or ''}\n"
            f"Subject: {old_draft.subject} or ''\n"
            f"Body: \n{old_draft.body} or '' "
        ))
        # Ask LLM for structured output
        response = llm.with_structured_output(ReplyOutput).invoke([system_msg, human_msg])
        logger.info(f"reply generator response: {response}")

        state["reply_reason"] = response.reason
        state["reply_draft"] = response.draft if response.reply_needed else None
        return EmailGraphState(**state)
    else:
        system_msg = SystemMessage(content="""
        You are an email assistant. Determine if the email requires a reply.
        If a reply is needed, generate a response using the ReplyDraft format
        with 'to', 'subject', and 'body'. Otherwise, just say that no reply is needed.
        """)

        human_msg = HumanMessage(content=(
            f"From: {email.sender}\n"
            f"Date: {email.date or ''}\n"
            f"Subject: {email.subject or ''}\n"
            f"Body:\n{email.body or ''}"
        ))

        # Ask LLM for structured output
        response = llm.with_structured_output(ReplyOutput).invoke([system_msg, human_msg])
        logger.info(f"reply generator response: {response}")

        return EmailGraphState(
            **state,
            is_reply_needed=response.reply_needed,
            reply_reason=response.reason,
            reply_draft=response.draft if response.reply_needed else None
        )



In [34]:
def human_review_node(state: EmailGraphState) -> EmailGraphState:
    logger.info("state: ")
    logger.info(state)
    
    logger.info(f"Reply Draft: {state["reply_draft"]}")

    feedback = input("Enter feedback on the draft (or 'approved'): ").strip()
    try:
        state["messages"].append(HumanMessage(content=feedback))
    except Exception as e:
        state["messages"] = [HumanMessage(content=feedback)]
    logger.info(f"state: {state}")
    return state


In [None]:
def handle_feedback_condition(state: EmailGraphState) -> str:
    logger.info("state: ")
    logger.info(state)
    last_msg = state["messages"][-1].content.strip().lower()
    return "end" if last_msg == "approved" else "generate_reply"


In [38]:
# === Define the graph ===
graph = StateGraph(EmailGraphState)

graph.add_node("classify_intent", classify_intent_node)
graph.add_node("generate_reply", generate_reply_node)
graph.add_node("human_review", human_review_node)

graph.set_entry_point("classify_intent")

graph.add_edge("classify_intent", "generate_reply")
graph.add_edge("generate_reply", "human_review")


graph.add_conditional_edges(
    "human_review",
    handle_feedback_condition,
    {
        "end": END,
        "generate_reply": "generate_reply",
    },
)


# Compile the app
app = graph.compile()

In [39]:
pprint(app.get_graph().draw_mermaid())

("%%{init: {'flowchart': {'curve': 'linear'}}}%%\n"
 'graph TD;\n'
 '\t__start__([<p>__start__</p>]):::first\n'
 '\tclassify_intent(classify_intent)\n'
 '\tgenerate_reply(generate_reply)\n'
 '\thuman_review(human_review)\n'
 '\t__end__([<p>__end__</p>]):::last\n'
 '\t__start__ --> classify_intent;\n'
 '\tclassify_intent --> generate_reply;\n'
 '\tgenerate_reply --> human_review;\n'
 '\thuman_review -. &nbsp;end&nbsp; .-> __end__;\n'
 '\thuman_review -.-> generate_reply;\n'
 '\tclassDef default fill:#f2f0ff,line-height:1.2\n'
 '\tclassDef first fill-opacity:0\n'
 '\tclassDef last fill:#bfb6fc\n')


In [41]:
app.get_graph().print_ascii()

   +-----------+     
   | __start__ |     
   +-----------+     
          *          
          *          
          *          
+-----------------+  
| classify_intent |  
+-----------------+  
          *          
          *          
          *          
+----------------+   
| generate_reply |   
+----------------+   
          .          
          .          
          .          
  +--------------+   
  | human_review |   
  +--------------+   
          .          
          .          
          .          
    +---------+      
    | __end__ |      
    +---------+      


In [42]:
response = app.invoke(initial_state)
logger.info(response)

2025-05-21 15:39:53 | 1914452209.py | classify_intent_node | Line: 8 | INFO | state: 
2025-05-21 15:39:53 | 1914452209.py | classify_intent_node | Line: 9 | INFO | {'email': Email(sender='jobs@example.com', date='2025-05-20', subject='Interview Invitation', body='We are pleased to invite you...')}
2025-05-21 15:39:53 | 1914452209.py | classify_intent_node | Line: 32 | INFO | intent classification response: intent='primary' reason='Invitation to an event, likely a personal conversation'
2025-05-21 15:39:53 | 3221661697.py | generate_reply_node | Line: 8 | INFO | state: 
2025-05-21 15:39:53 | 3221661697.py | generate_reply_node | Line: 9 | INFO | {'email': Email(sender='jobs@example.com', date='2025-05-20', subject='Interview Invitation', body='We are pleased to invite you...'), 'intent': 'primary', 'intent_reason': 'Invitation to an event, likely a personal conversation'}
2025-05-21 15:39:54 | 3221661697.py | generate_reply_node | Line: 64 | INFO | reply generator response: reply_needed

KeyError: 'extract_event'