In [None]:
import os
from typing import List, TypedDict
from typing_extensions import TypedDict
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain.schema import Document
from langgraph.graph import END, StateGraph

In [2]:
# Initialize LLM
GROQ_LLM = ChatGroq(model="llama3-70b-8192")

In [3]:
# Utility Functions
def write_markdown_file(content: str, filename: str) -> None:
    """Writes content to a markdown file."""
    with open(f"{filename}.md", "w") as f:
        f.write(content)

In [4]:
# State Definition
class GraphState(TypedDict):
    """State management for the email processing workflow."""
    initial_email: str
    email_category: str
    draft_email: str
    final_email: str
    research_info: List[str]
    info_needed: bool
    num_steps: int
    draft_email_feedback: dict

#### Prompts

In [5]:
EMAIL_CATEGORIZER_PROMPT = PromptTemplate(
    template="""You are an AI Email Analyst with expertise in natural language understanding and intent classification.

TASK: Analyze the email and categorize it into exactly ONE category.

Available Categories:
1. price_inquiry    - Questions about pricing, costs, subscription fees, or payment terms
2. customer_complaint - Issues, bugs, problems, or dissatisfaction with service/product
3. product_inquiry   - Questions about features, capabilities, integrations, or how-to
4. customer_feedback - Suggestions, appreciation, general comments, or feature requests
5. off_topic        - Spam, marketing, or unrelated to our products/services

Analysis Rules:
- Identify the PRIMARY intent (ignore secondary intentions)
- Look for specific keywords and context clues
- Consider the overall tone and urgency
- When in doubt, choose product_inquiry

Email Content:
{initial_email}

IMPORTANT: Output ONLY the category name without any explanation or additional text.""",
    input_variables=["initial_email"]
)

In [6]:
RESEARCH_ROUTER_PROMPT = PromptTemplate(
    template="""You are an AI Email Triage Specialist making binary routing decisions.

CONTEXT:
Email Category: {email_category}
Email Content: {initial_email}

TASK: Determine if this email needs research before response.

Choose "research_info" if the email:
- Contains technical questions requiring verification
- Asks about specific product capabilities or limitations
- References past interactions or specific cases
- Needs factual information or documentation
- Requires competitive analysis or market data

Choose "draft_email" if the email:
- Is a simple thank you or acknowledgment
- Contains basic questions with standard answers
- Provides feedback without specific questions
- Is a general update or notification
- Can be answered with existing knowledge

CRITICAL: Return ONLY this exact JSON format:
{{"router_decision": "research_info"}} or {{"router_decision": "draft_email"}}""",
    input_variables=["initial_email", "email_category"]
)

In [7]:
SEARCH_KEYWORDS_PROMPT = PromptTemplate(
    template="""You are an AI Search Query Optimizer specializing in technical keyword extraction.

INPUT:
Email: {initial_email}
Category: {email_category}

TASK: Extract 1-3 highly specific search keywords.

Extraction Rules:
1. Prioritize:
   - Technical terms
   - Product names
   - Feature names
   - Error messages
   - Specific concepts

2. Avoid:
   - Generic words (help, issue, problem)
   - Common verbs (is, can, will)
   - Greeting terms
   - Personal names
   - Basic adjectives

3. Optimize for:
   - Searchability
   - Technical accuracy
   - Relevance to category
   - Specific over general

CRITICAL: Return ONLY this exact JSON format:
{{"keywords": ["specific_term1", "specific_term2", "specific_term3"]}}""",
    input_variables=["initial_email", "email_category"]
)

In [8]:
EMAIL_DRAFT_PROMPT = PromptTemplate(
    template="""You are an AI Email Communication Specialist crafting professional, context-aware responses.

INPUT DATA:
Original Email: {initial_email}
Category: {email_category}
Research Info: {research_info}

RESPONSE REQUIREMENTS:

1. Category-Specific Tone:
   price_inquiry → Direct, transparent, value-focused
   customer_complaint → Empathetic, solution-oriented, urgent
   product_inquiry → Helpful, detailed, educational
   customer_feedback → Appreciative, engaging, encouraging
   off_topic → Polite, brief, redirecting

2. Required Structure:
   a) Personal greeting using recipient's name
   b) Clear acknowledgment of their message
   c) Direct response to main points
   d) Specific next steps or call to action
   e) Professional closing
   f) Signature: "Bhavik Jikadara, AI/ML Engineer"

3. Writing Guidelines:
   - Maximum 3-4 paragraphs
   - Short, clear sentences
   - Bullet points for multiple items
   - Include specific details from research
   - One clear call-to-action
   - Professional but friendly tone

4. Must Include:
   - Solution or direct answer
   - Relevant links or resources
   - Follow-up contact method
   - Timeline if applicable

CRITICAL: Return ONLY this exact JSON format:
{{"email_draft": "YOUR_COMPLETE_EMAIL_TEXT_HERE"}}""",
    input_variables=["initial_email", "email_category", "research_info"]
)

In [9]:
# Initialize Chains
email_category_chain = EMAIL_CATEGORIZER_PROMPT | GROQ_LLM | StrOutputParser()
research_router_chain = RESEARCH_ROUTER_PROMPT | GROQ_LLM | JsonOutputParser()
search_keyword_chain = SEARCH_KEYWORDS_PROMPT | GROQ_LLM | JsonOutputParser()
draft_writer_chain = EMAIL_DRAFT_PROMPT | GROQ_LLM | JsonOutputParser()

#### Core Processing Functions

In [10]:
def categorize_email(state: GraphState) -> dict:
    """Categorizes the incoming email."""
    print("🔍 Categorizing Email...")
    email_category = email_category_chain.invoke({"initial_email": state['initial_email']})
    write_markdown_file(email_category, "email_category")
    return {
        "email_category": email_category, 
        "num_steps": state['num_steps'] + 1
    }

In [11]:
def research_info_search(state: GraphState) -> dict:
    """Performs web research based on email content."""
    print("🔎 Researching Additional Information...")
    keywords = search_keyword_chain.invoke({
        "initial_email": state["initial_email"],
        "email_category": state["email_category"]
    })
    
    web_search_tool = TavilySearchResults(k=1)
    search_results = []
    
    for keyword in keywords['keywords'][:1]:
        results = web_search_tool.invoke({"query": keyword})
        content = "\n".join([d["content"] for d in results])
        search_results.append(Document(page_content=content))
    
    return {
        "research_info": search_results, 
        "num_steps": state['num_steps'] + 1
    }

In [12]:
def draft_email_writer(state: GraphState) -> dict:
    """Generates initial email draft."""
    print("✍️ Writing Draft Email...")
    draft_result = draft_writer_chain.invoke({
        "initial_email": state["initial_email"],
        "email_category": state["email_category"],
        "research_info": state["research_info"]
    })
    
    write_markdown_file(draft_result['email_draft'], "draft_email")
    return {
        "draft_email": draft_result['email_draft'], 
        "num_steps": state['num_steps'] + 1
    }

# Workflow Graph Setup

In [13]:
workflow = StateGraph(GraphState)

In [14]:
# Add Nodes
workflow.add_node("categorize_email", categorize_email)
workflow.add_node("research_info_search", research_info_search)
workflow.add_node("draft_email_writer", draft_email_writer)

<langgraph.graph.state.StateGraph at 0x24a14fd9970>

In [15]:
# Add Edges
workflow.set_entry_point("categorize_email")
workflow.add_conditional_edges(
    "categorize_email",
    lambda state: "research_info" if research_router_chain.invoke({
        "initial_email": state["initial_email"],
        "email_category": state["email_category"]
    })['router_decision'] == 'research_info' else "draft_email",
    {
        "research_info": "research_info_search",
        "draft_email": "draft_email_writer",
    }
)
workflow.add_edge("research_info_search", "draft_email_writer")
workflow.add_edge("draft_email_writer", END)

<langgraph.graph.state.StateGraph at 0x24a14fd9970>

In [16]:
# Compile and Create Application
app = workflow.compile()

In [17]:
def process_email(email_content: str) -> str:
    """Main function to process an email and generate response."""
    inputs = {
        "initial_email": email_content,
        "research_info": None,
        "num_steps": 0
    }
    
    output = app.invoke(inputs)
    return output.get('draft_email', 'Unable to process email')

In [None]:
# Example Usage
if __name__ == "__main__":
    sample_email = """
    Dear Support,
    I'm interested in learning more about your AI solutions.
    What pricing options do you offer for enterprise customers?
    Best regards,
    John
    """
    
    response = process_email(sample_email)
    print("\nGenerated Response:")
    print(response)

🔍 Categorizing Email...


OutputParserException: Invalid json output: Based on the email content and decision criteria, I would make the following decision:

{"router_decision": "draft_email"}

Reasoning: The email is a basic price inquiry, which is a standard request that can be addressed with a standard response. There are no complex technical questions, specific product details, or historical context required to answer this question. A simple acknowledgment with a general pricing overview or a pointer to a relevant webpage should be sufficient.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/OUTPUT_PARSING_FAILURE 