In [14]:
import pandas as pd

def load_csv(filepath: str):
    """
    Load a CSV file and perform basic validation.
    - Must include an 'email' column.
    - Trims whitespace.
    - Drops duplicate email rows.
    """
    try:
        df = pd.read_csv(filepath)
    except Exception as e:
        raise ValueError(f"Error reading CSV: {e}")

    # Normalize columns to lowercase for safety
    df.columns = [c.strip().lower() for c in df.columns]

    if "email" not in df.columns:
        raise ValueError("CSV is missing required column: 'email'")

    # Strip whitespace in all string columns
    for col in df.columns:
        if df[col].dtype == object:
            df[col] = df[col].astype(str).str.strip()

    # Drop empty email rows
    df = df[df["email"].notna() & (df["email"] != "")]

    # Drop duplicate emails
    df = df.drop_duplicates(subset=["email"])

    print(f"Loaded CSV with {len(df)} valid rows.")
    return df

# Test helper (you will run this with your file)
def preview_csv(filepath: str):
    df = load_csv(filepath)
    display(df.head())
    return df



In [15]:
preview_csv("sample_emails.csv")

Loaded CSV with 1 valid rows.


Unnamed: 0,email,first_name,last_name
0,sbatool6678@gmail.com,sana,batool


Unnamed: 0,email,first_name,last_name
0,sbatool6678@gmail.com,sana,batool


In [16]:
from pydantic import BaseModel, EmailStr, ValidationError
from typing import Optional, List
import pandas as pd

# Define the schema for each recipient
class RecipientRow(BaseModel):
    email: EmailStr               # Required, must be a valid email
    first_name: Optional[str] = None
    last_name: Optional[str] = None
    opt_out: Optional[bool] = False



# Function to validate all rows in a DataFrame
def validate_recipients(df: pd.DataFrame):
    valid_rows = []
    invalid_rows = []

    for idx, row in df.iterrows():
        try:
            # Convert row to dict and validate using Pydantic
            recipient = RecipientRow(**row.to_dict())
            valid_rows.append(recipient)
        except ValidationError as e:
            invalid_rows.append({
                "row_index": idx,
                "errors": e.errors(),
                "data": row.to_dict()
            })

    print(f"✅ Valid rows: {len(valid_rows)}")
    print(f"❌ Invalid rows: {len(invalid_rows)}")
    
    return valid_rows, invalid_rows



In [17]:
df = preview_csv("sample_emails.csv")
valid_recipients, invalid_recipients = validate_recipients(df)


Loaded CSV with 1 valid rows.


Unnamed: 0,email,first_name,last_name
0,sbatool6678@gmail.com,sana,batool


✅ Valid rows: 1
❌ Invalid rows: 0


In [18]:
from jinja2 import Template, meta, Environment

# Function to render template for one recipient
def render_email_template(template_str: str, recipient: dict):
    env = Environment()
    template = env.from_string(template_str)
    
    # Check which placeholders are used
    ast = env.parse(template_str)
    placeholders = meta.find_undeclared_variables(ast)

    missing = [ph for ph in placeholders if ph not in recipient]
    if missing:
        raise ValueError(f"Missing placeholder(s) in recipient data: {missing}")

    return template.render(**recipient)


In [19]:
template = "Hello {{first_name}}, your company {{company}} is amazing!"
recipient = {"first_name": "Alice", "company": "Acme Inc."}

rendered_body = render_email_template(template, recipient)
print(rendered_body)
# Output: "Hello Alice, your company Acme Inc. is amazing!"


Hello Alice, your company Acme Inc. is amazing!


In [20]:
def generate_personalized_emails(template_str: str, recipients: list, mode: str = "personalized"):
    """
    Render emails for recipients based on mode.
    mode = "personalized" or "single"
    """
    if mode not in ["personalized", "single"]:
        raise ValueError("Mode must be 'personalized' or 'single'")

    rendered_emails = []

    if mode == "single":
        # Render once using the first recipient as context
        context = recipients[0] if recipients else {}
        body = render_email_template(template_str, context)
        for recipient in recipients:
            rendered_emails.append({
                "email": recipient.get("email"),
                "rendered_body": body
            })
    else:  # personalized
        for recipient in recipients:
            body = render_email_template(template_str, recipient)
            rendered_emails.append({
                "email": recipient.get("email"),
                "rendered_body": body
            })

    return rendered_emails


In [21]:
template = "Hi {{first_name}}, welcome to {{company}}!"
recipients = [
    {"email": "alice@example.com", "first_name": "Alice", "company": "Acme Inc"},
    {"email": "bob@example.com", "first_name": "Bob", "company": "Beta LLC"}
]

# Personalized mode
emails = generate_personalized_emails(template, recipients, mode="personalized")
print(emails)

# Single email for all
emails_single = generate_personalized_emails(template, recipients, mode="single")
print(emails_single)


[{'email': 'alice@example.com', 'rendered_body': 'Hi Alice, welcome to Acme Inc!'}, {'email': 'bob@example.com', 'rendered_body': 'Hi Bob, welcome to Beta LLC!'}]
[{'email': 'alice@example.com', 'rendered_body': 'Hi Alice, welcome to Acme Inc!'}, {'email': 'bob@example.com', 'rendered_body': 'Hi Alice, welcome to Acme Inc!'}]


In [22]:
import os, asyncio, gradio as gr
from typing import TypedDict
from langgraph.graph import StateGraph, END

from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
import pandas as pd

In [None]:


# -------------------------------
# 1. Pydantic Email Model
# -------------------------------
class EmailOutput(BaseModel):
    subject: str
    body: str

parser = PydanticOutputParser(pydantic_object=EmailOutput)

# -------------------------------
# 2. LLM
# -------------------------------
llm = ChatOpenAI(
    model="openai/gpt-oss-20b:free",
    api_key=os.getenv("OPENAI_API_KEY"),
    base_url="https://openrouter.ai/api/v1"
)

# -------------------------------
# 3. SYSTEM PROMPT (cleaned)
# -------------------------------
SYSTEM_PROMPT = """You generate personalized cold emails based on the subject and details provided by the user.
Write clear, concise, human-sounding emails that feel natural and tailored to the recipient.
Use a simple structure: hook → value → relevance → CTA.
Keep it short and avoid fluff, clichés, and generic templates.
Do NOT hallucinate facts, do NOT invent recipient details, and do NOT mention AI or internal reasoning.
Adapt tone based on user instructions (professional, friendly, casual, direct).
Your output must be a complete cold email ready to send"""


# -------------------------------
# 4. LangGraph State
# -------------------------------
class State(TypedDict):
    messages: list


# -------------------------------
# 5. LangGraph Node
# -------------------------------
async def llm_node(state: State):
    response = await llm.ainvoke(state["messages"])
    return {"messages": state["messages"] + [response]}


# -------------------------------
# 6. Build graph
# -------------------------------
graph = StateGraph(State)
graph.add_node("llm", llm_node)
graph.set_entry_point("llm")
graph.add_edge("llm", END)
app = graph.compile()


# -------------------------------
# 7. Graph Runner
# -------------------------------
async def run_graph(user_message, history):
    messages = [SystemMessage(content=SYSTEM_PROMPT)]

    # Rebuild chat history for the LLM
    for msg in history:
        if msg["role"] == "user":
            messages.append(HumanMessage(content=msg["content"]))
        elif msg["role"] == "assistant":
            messages.append(AIMessage(content=msg["content"]))

    messages.append(HumanMessage(content=user_message))

    result = await app.ainvoke({"messages": messages})

    ai_message = result["messages"][-1]  # AIMessage
    return ai_message.content


# -------------------------------
# 8. Sync wrapper for Gradio
# -------------------------------
def chatbot(user_message, history):
    return asyncio.run(run_graph(user_message, history))


# -------------------------------
# 9. SendGrid Email Sender (your function)
# -------------------------------

# SENDGRID_API_KEY = "sendgrid_api_key"
FROM_EMAIL = "mohsinamir6789@gmail.com"


def send_bulk_emails(df, subject, body):
    import sendgrid
    from sendgrid.helpers.mail import Mail

    sg = sendgrid.SendGridAPIClient(api_key=os.getenv("SENDGRID_API_KEY"))

    for _, row in df.iterrows():
        email = row.get("email")

        if not email:  
            print("Skipping empty row")
            continue

        message = Mail(
            from_email=FROM_EMAIL,
            to_emails=email,
            subject=subject,
            plain_text_content=body
        )

        try:
            response = sg.send(message)
            print(f"Sent to {email}: {response.status_code}")
        except Exception as e:
            print(f"Error sending to {email}: {e}")


# -------------------------------
# 10. Send Email Button Function (FIXED)
# -------------------------------
def send_email_button(history):
    last_ai_message = history[-1]["content"]

    # Parse AI JSON
    email = parser.parse(last_ai_message)

    # Use the global df loaded earlier
    send_bulk_emails(df, email.subject, email.body)

    return f"Emails sent! Subject: {email.subject}"



# -------------------------------
# 11. Gradio UI
# -------------------------------
with gr.Blocks() as ui:
    chatbot_ui = gr.ChatInterface(chatbot, type="messages")

    # Show loaded CSV in UI
    df_input = gr.Dataframe(
        value=df,
        headers=list(df.columns),
        interactive=False,
        label="Recipient List"
    )

    send_button = gr.Button("Send Email")
    status = gr.Textbox()

    send_button.click(
        send_email_button,
        inputs=[chatbot_ui.chatbot_state],
        outputs=status
    )

ui.launch()



* Running on local URL:  http://127.0.0.1:7864

To create a public link, set `share=True` in `launch()`.


