In [1]:
{
  "workflow_name": "OutboundLeadGeneration",
  "description": "End-to-end AI agent workflow to find, contact, and qualify leads",
  "config": {
    "scoring": {
      "revenue_weight": 0.4,
      "employee_weight": 0.3,
      "signals_weight": 0.3
    }
  },
  "steps": [
    {
      "id": "prospect_search",
      "agent": "ProspectSearchAgent",
      "inputs": {
        "icp": {
          "industry": "SaaS",
          "location": "USA",
          "employee_count": { "min": 100, "max": 1000 },
          "revenue": { "min": 20000000, "max": 200000000 }
        },
        "signals": ["recent_funding", "hiring_for_sales"]
      },
      "instructions": "Use Clay and Apollo APIs to search for company and contact data matching ICP. Return structured leads.",
      "tools": [
        {
          "name": "ClayAPI",
          "config": { "api_key": "{{CLAY_API_KEY}}", "endpoint": "https://api.clay.com/search" }
        },
        {
          "name": "ApolloAPI",
          "config": { "api_key": "{{APOLLO_API_KEY}}", "endpoint": "https://api.apollo.io/v1/mixed_search" }
        }
      ],
      "output_schema": {
        "leads": [
          { "company": "string", "contact_name": "string", "email": "string", "linkedin": "string", "signal": "string", "employee_count": "int", "revenue": "int" }
        ]
      }
    },
    {
      "id": "enrichment",
      "agent": "DataEnrichmentAgent",
      "inputs": { "leads": "{{prospect_search.output.leads}}" },
      "instructions": "Enrich lead data using Clearbit/PeopleDataLabs.",
      "tools": [
        { "name": "Clearbit", "config": { "api_key": "{{CLEARBIT_KEY}}" } }
      ],
      "output_schema": {
        "enriched_leads": [
          { "company": "string", "contact": "string", "role": "string", "technologies": "array", "linkedin": "string" }
        ]
      }
    },
    {
      "id": "scoring",
      "agent": "ScoringAgent",
      "inputs": {
        "enriched_leads": "{{enrichment.output.enriched_leads}}",
        "scoring_criteria": "{{config.scoring}}"
      },
      "instructions": "Score leads based on configurable ICP scoring function.",
      "tools": [],
      "output_schema": { "ranked_leads": "array" }
    },
    {
      "id": "outreach_content",
      "agent": "OutreachContentAgent",
      "inputs": {
        "ranked_leads": "{{scoring.output.ranked_leads}}",
        "persona": "SDR",
        "tone": "friendly"
      },
      "instructions": "Generate personalized outreach messages using LLM and prospect context.",
      "tools": [{ "name": "OpenAI", "config": { "api_key": "{{OPENAI_KEY}}" } }],
      "output_schema": {
        "messages": [{ "lead": "string", "email_body": "string", "subject": "string" }]
      }
    },
    {
      "id": "send",
      "agent": "OutreachExecutorAgent",
      "inputs": { "messages": "{{outreach_content.output.messages}}" },
      "instructions": "Send emails using SendGrid or Apollo API and log delivery.",
      "tools": [
        { "name": "SendGrid", "config": { "api_key": "{{SENDGRID_API_KEY}}" } }
      ],
      "output_schema": { "sent_status": "array", "campaign_id": "string" }
    },
    {
      "id": "response_tracking",
      "agent": "ResponseTrackerAgent",
      "inputs": { "campaign_id": "{{send.output.campaign_id}}" },
      "instructions": "Monitor email responses and meeting bookings using Apollo API.",
      "tools": [{ "name": "ApolloAPI", "config": { "api_key": "{{APOLLO_API_KEY}}" } }],
      "output_schema": { "responses": "array" }
    },
    {
      "id": "feedback_trainer",
      "agent": "FeedbackTrainerAgent",
      "inputs": { "responses": "{{response_tracking.output.responses}}", "sent_status": "{{send.output.sent_status}}" },
      "instructions": "Analyze open/click/reply data, suggest new configs and write to Google Sheets. Await human approval before applying updates.",
      "tools": [{ "name": "GoogleSheets", "config": { "sheet_id": "{{SHEET_ID}}", "credentials": "{{GS_CREDENTIALS_JSON}}" } }],
      "output_schema": { "recommendations": "array" }
    }
  ]
}


{'workflow_name': 'OutboundLeadGeneration',
 'description': 'End-to-end AI agent workflow to find, contact, and qualify leads',
 'config': {'scoring': {'revenue_weight': 0.4,
   'employee_weight': 0.3,
   'signals_weight': 0.3}},
 'steps': [{'id': 'prospect_search',
   'agent': 'ProspectSearchAgent',
   'inputs': {'icp': {'industry': 'SaaS',
     'location': 'USA',
     'employee_count': {'min': 100, 'max': 1000},
     'revenue': {'min': 20000000, 'max': 200000000}},
    'signals': ['recent_funding', 'hiring_for_sales']},
   'instructions': 'Use Clay and Apollo APIs to search for company and contact data matching ICP. Return structured leads.',
   'tools': [{'name': 'ClayAPI',
     'config': {'api_key': '{{CLAY_API_KEY}}',
      'endpoint': 'https://api.clay.com/search'}},
    {'name': 'ApolloAPI',
     'config': {'api_key': '{{APOLLO_API_KEY}}',
      'endpoint': 'https://api.apollo.io/v1/mixed_search'}}],
   'output_schema': {'leads': [{'company': 'string',
      'contact_name': 'str

In [None]:
#!/usr/bin/env python3
"""
LangGraph builder & executor.
Reads workflow.json, dynamically imports and runs agents in order.
Resolves simple templated references like {{step_id.output.key}} or {{config.key}}.
"""
import json
import os
import importlib
import logging
from typing import Any, Dict
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
HERE = Path(__file__).parent

def load_workflow(path="workflow.json"):
    with open(path, "r") as f:
        return json.load(f)

def resolve_template(value: Any, state: Dict):
    """
    If value is a template string like "{{prospect_search.output.leads}}",
    return the resolved data from state. For non-strings or non-template strings, return value.
    Supports nested dict/list templates by recursing.
    """
    if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"):
        path = value[2:-2].strip()
        parts = path.split(".")
        node = state
        try:
            for p in parts:
                if p == "":
                    continue
                if p.isdigit():
                    node = node[int(p)]
                else:
                    node = node[p]
            return node
        except Exception as e:
            logging.warning(f"Failed to resolve template '{value}': {e}")
            return None
    elif isinstance(value, dict):
        return {k: resolve_template(v, state) for k, v in value.items()}
    elif isinstance(value, list):
        return [resolve_template(v, state) for v in value]
    else:
        return value

def resolve_inputs(inputs: Dict, state: Dict):
    return {k: resolve_template(v, state) for k, v in inputs.items()}

def build_and_run(workflow):
    state = {"config": workflow.get("config", {})}
    steps = workflow["steps"]
    outputs = {}
    for step in steps:
        step_id = step["id"]
        agent_name = step["agent"]
        logging.info(f"Running step {step_id} -> agent {agent_name}")
        # dynamically import agent
        try:
            module = importlib.import_module(f"agents.{agent_name.lower()}")
            AgentClass = getattr(module, agent_name)
        except Exception as e:
            logging.error(f"Could not import agent {agent_name}: {e}")
            raise

        # Resolve inputs using state (state contains prior step outputs keyed by step id)
        state_context = {"config": workflow.get("config", {}), **{f"{k}": {"output": outputs[k]} for k in outputs}}
        resolved_inputs = resolve_inputs(step.get("inputs", {}), state_context)

        # instantiate agent with tools config if provided
        tools = step.get("tools", [])
        agent = AgentClass(tools_config=tools, instructions=step.get("instructions", ""))

        # run agent (agents should return dict with 'output' and optional 'reasoning' or 'logs')
        result = agent.run(resolved_inputs)
        if not isinstance(result, dict):
            logging.warning(f"Agent {agent_name} returned non-dict result, wrapping it as 'output'")
            result = {"output": result}
        outputs[step_id] = result.get("output", {})
        # store logs in outputs too
        outputs[f"{step_id}__meta"] = {"reasoning": result.get("reasoning", []), "logs": result.get("logs", [])}
        logging.info(f"Step {step_id} completed. Output keys: {list(outputs[step_id].keys())}")
    return outputs

def main():
    wf = load_workflow()
    outputs = build_and_run(wf)
    print("\n===== FINAL OUTPUTS =====")
    print(json.dumps(outputs, indent=2, default=str))

if __name__ == "__main__":
    main()


In [3]:
# agents package
import logging
from typing import Dict, Any, List

class AgentBase:
    def __init__(self, tools_config: list = None, instructions: str = ""):
        self.tools_config = tools_config or []
        self.instructions = instructions
        self.logs: List[str] = []
        self.reasoning: List[str] = []

    def log(self, msg: str):
        logging.info(msg)
        self.logs.append(msg)

    def reason(self, msg: str):
        # small ReAct style trace
        logging.debug("REASON: " + msg)
        self.reasoning.append(msg)

    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        raise NotImplementedError("Agent must implement run()")


In [4]:
import logging
from typing import Dict, Any, List

class AgentBase:
    def __init__(self, tools_config: list = None, instructions: str = ""):
        self.tools_config = tools_config or []
        self.instructions = instructions
        self.logs: List[str] = []
        self.reasoning: List[str] = []

    def log(self, msg: str):
        logging.info(msg)
        self.logs.append(msg)

    def reason(self, msg: str):
        # small ReAct style trace
        logging.debug("REASON: " + msg)
        self.reasoning.append(msg)

    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        raise NotImplementedError("Agent must implement run()")


In [None]:
from .base_agent import AgentBase
import random

class DataEnrichmentAgent(AgentBase):
    """
    Enrich leads using Clearbit/PeopleDataLabs. If API not configured, run heuristics to amplify.
    """
    def run(self, inputs):
        leads = inputs.get("leads", [])
        enriched = []
        for l in leads:
            self.reason(f"Enriching {l.get('email')}")
            # Mock enrichment — in real use, call Clearbit or PeopleDataLabs
            enriched_lead = {
                "company": l.get("company"),
                "contact": l.get("contact_name"),
                "role": random.choice(["VP Sales", "Head of Sales", "Director of Growth", "CEO"]),
                "technologies": random.sample(["Salesforce", "HubSpot", "Segment", "AWS", "GCP", "Stripe"], 2),
                "linkedin": l.get("linkedin"),
                "employee_count": l.get("employee_count"),
                "revenue": l.get("revenue"),
                "email": l.get("email")
            }
            enriched.append(enriched_lead)
            self.log(f"Enriched {l.get('email')} -> role {enriched_lead['role']}")
        return {"output": {"enriched_leads": enriched}, "reasoning": self.reasoning, "logs": self.logs}


In [None]:
from .base_agent import AgentBase

class ScoringAgent(AgentBase):
    """
    Scores leads using configurable weights in scoring_criteria.
    """
    def score_item(self, lead, criteria):
        # Simple scoring: normalized revenue + employee + signals presence
        r_weight = criteria.get("revenue_weight", 0.4)
        e_weight = criteria.get("employee_weight", 0.3)
        s_weight = criteria.get("signals_weight", 0.3)
        # Normalize naive: revenue / 200M, employees / 2000
        revenue_score = min(lead.get("revenue", 0) / 200_000_000, 1.0)
        emp_score = min(lead.get("employee_count", 0) / 2000, 1.0)
        # signal bonus
        signal_score = 1.0 if lead.get("signal") and lead.get("signal") != "none" else 0.0
        total = r_weight * revenue_score + e_weight * emp_score + s_weight * signal_score
        return round(total, 4)

    def run(self, inputs):
        leads = inputs.get("enriched_leads", [])
        criteria = inputs.get("scoring_criteria", {})
        ranked = []
        for l in leads:
            s = self.score_item(l, criteria)
            out = dict(l)
            out["score"] = s
            ranked.append(out)
            self.log(f"Lead {l.get('email')} score {s}")
        ranked_sorted = sorted(ranked, key=lambda x: x["score"], reverse=True)
        return {"output": {"ranked_leads": ranked_sorted}, "reasoning": self.reasoning, "logs": self.logs}


In [None]:
from .base_agent import AgentBase

class ScoringAgent(AgentBase):
    """
    Scores leads using configurable weights in scoring_criteria.
    """
    def score_item(self, lead, criteria):
        # Simple scoring: normalized revenue + employee + signals presence
        r_weight = criteria.get("revenue_weight", 0.4)
        e_weight = criteria.get("employee_weight", 0.3)
        s_weight = criteria.get("signals_weight", 0.3)
        # Normalize naive: revenue / 200M, employees / 2000
        revenue_score = min(lead.get("revenue", 0) / 200_000_000, 1.0)
        emp_score = min(lead.get("employee_count", 0) / 2000, 1.0)
        # signal bonus
        signal_score = 1.0 if lead.get("signal") and lead.get("signal") != "none" else 0.0
        total = r_weight * revenue_score + e_weight * emp_score + s_weight * signal_score
        return round(total, 4)

    def run(self, inputs):
        leads = inputs.get("enriched_leads", [])
        criteria = inputs.get("scoring_criteria", {})
        ranked = []
        for l in leads:
            s = self.score_item(l, criteria)
            out = dict(l)
            out["score"] = s
            ranked.append(out)
            self.log(f"Lead {l.get('email')} score {s}")
        ranked_sorted = sorted(ranked, key=lambda x: x["score"], reverse=True)
        return {"output": {"ranked_leads": ranked_sorted}, "reasoning": self.reasoning, "logs": self.logs}


In [None]:
import os
from .base_agent import AgentBase

try:
    from openai import OpenAI
except Exception:
    OpenAI = None

class OutreachContentAgent(AgentBase):
    """
    Generates personalized email message per lead. Uses OpenAI if API key is present, otherwise templates.
    """
    def run(self, inputs):
        ranked = inputs.get("ranked_leads", [])
        persona = inputs.get("persona", "SDR")
        tone = inputs.get("tone", "friendly")
        messages = []
        openai_key = None
        # Tools config may contain OpenAI key, but we also read env for convenience
        for t in self.tools_config:
            if t.get("name") == "OpenAI":
                openai_key = t.get("config", {}).get("api_key") or os.getenv("OPENAI_KEY")
        if not openai_key and os.getenv("OPENAI_KEY"):
            openai_key = os.getenv("OPENAI_KEY")

        for lead in ranked:
            self.reason(f"Creating message for {lead.get('email')}")
            subject = f"Quick question about {lead.get('company')}"
            if openai_key and OpenAI:
                # NOTE: example usage — users should adapt to their OpenAI SDK version
                try:
                    client = OpenAI(api_key=openai_key)
                    prompt = (
                        f"You are a helpful {persona}. Write a short {tone} outreach email to {lead.get('contact')}, "
                        f"who is {lead.get('role')} at {lead.get('company')}. Mention they use {', '.join(lead.get('technologies', []))} "
                        f"and include a call to action to book a 15 minute call."
                    )
                    resp = client.responses.create(model="gpt-4o-mini", input=prompt)
                    body = resp.output_text if hasattr(resp, "output_text") else resp.get("output", {}).get("text", "")
                except Exception as e:
                    body = f"Hi {lead.get('contact')},\n\nI noticed {lead.get('company')} uses {', '.join(lead.get('technologies', []))}. Would you be open to a 15-min call?\n\nThanks!"
                    self.log(f"OpenAI generation failed: {e}")
            else:
                # Fallback templated message
                body = (
                    f"Hi {lead.get('contact')},\n\n"
                    f"I work with B2B SaaS companies like {lead.get('company')} to improve outbound sales. "
                    f"I noticed you use {', '.join(lead.get('technologies', []))}. Would you be open to a 15-minute chat?\n\n"
                    f"— SDR at Analytos.ai"
                )
            messages.append({"lead": lead.get("email"), "subject": subject, "email_body": body})
            self.log(f"Generated message for {lead.get('email')}")
        return {"output": {"messages": messages}, "reasoning": self.reasoning, "logs": self.logs}


In [None]:
import os
from .base_agent import AgentBase
import uuid

class OutreachExecutorAgent(AgentBase):
    """
    Sends messages (SendGrid mock if no key). Returns sent_status and campaign_id.
    """
    def run(self, inputs):
        messages = inputs.get("messages", [])
        sent_status = []
        sendgrid_key = None
        for t in self.tools_config:
            if t.get("name") == "SendGrid":
                sendgrid_key = t.get("config", {}).get("api_key") or os.getenv("SENDGRID_API_KEY")
        if not sendgrid_key and os.getenv("SENDGRID_API_KEY"):
            sendgrid_key = os.getenv("SENDGRID_API_KEY")

        campaign_id = str(uuid.uuid4())
        for m in messages:
            to = m.get("lead")
            # If we had SendGrid configured we'd call it here. We'll mock success.
            if sendgrid_key:
                status = {"to": to, "status": "sent", "provider": "SendGrid", "message_id": str(uuid.uuid4())}
            else:
                status = {"to": to, "status": "mock_sent", "provider": "mock", "message_id": str(uuid.uuid4())}
            sent_status.append(status)
            self.log(f"Sent to {to}: {status['status']}")
        return {"output": {"sent_status": sent_status, "campaign_id": campaign_id}, "reasoning": self.reasoning, "logs": self.logs}


In [None]:
import random
from .base_agent import AgentBase

class ResponseTrackerAgent(AgentBase):
    """
    Track responses for campaign_id. In real deployment, poll Apollo or SendGrid events.
    """
    def run(self, inputs):
        campaign_id = inputs.get("campaign_id")
        self.reason(f"Tracking responses for campaign {campaign_id}")
        # Mock responses: some emails open/click/reply
        responses = []
        for i in range(3):
            r = {
                "email": f"contact{i+1}@company{i+1}.com",
                "event": random.choice(["open", "click", "reply", "none"]),
                "timestamp": None
            }
            responses.append(r)
            self.log(f"Observed event {r['event']} for {r['email']}")
        return {"output": {"responses": responses}, "reasoning": self.reasoning, "logs": self.logs}


In [None]:
import os
from .base_agent import AgentBase

class FeedbackTrainerAgent(AgentBase):
    """
    Analyze campaign outcomes and propose recommendations.
    Writes to Google Sheets if credentials provided (mock otherwise).
    """
    def run(self, inputs):
        responses = inputs.get("responses", [])
        sent_status = inputs.get("sent_status", [])
        self.reason("Analyzing campaign performance")
        opens = sum(1 for r in responses if r.get("event") == "open")
        replies = sum(1 for r in responses if r.get("event") == "reply")
        total_sent = len(sent_status)
        open_rate = round((opens / total_sent) * 100, 2) if total_sent else 0
        reply_rate = round((replies / total_sent) * 100, 2) if total_sent else 0
        self.log(f"Open rate: {open_rate}%, Reply rate: {reply_rate}%")
        recommendations = []
        if open_rate < 20:
            recommendations.append({"type": "subject_line", "suggestion": "Test shorter subject lines with personalization (company name)."})
        if reply_rate < 2:
            recommendations.append({"type": "intro", "suggestion": "Try a one-line intro referencing a specific metric or recent event for the company."})
        recommendations.append({"type": "icp_tweak", "suggestion": "Prioritize companies with >50% revenue growth in last 12 months."})

        # Mock writing to Google Sheets or use real API if provided in tools_config
        gs_config = None
        for t in self.tools_config:
            if t.get("name") == "GoogleSheets":
                gs_config = t.get("config")
        if gs_config and os.getenv("GSHEET_WRITE") == "1":
            self.log("Would write recommendations to Google Sheets (not implemented here).")
            # Real implementation: use googleapiclient to write rows
        else:
            self.log("Google Sheets not configured; skipping write (mock).")

        return {"output": {"recommendations": recommendations, "metrics": {"open_rate": open_rate, "reply_rate": reply_rate}}, "reasoning": self.reasoning, "logs": self.logs}


In [None]:
#import os
from .base_agent import AgentBase

class FeedbackTrainerAgent(AgentBase):
    """
    Analyze campaign outcomes and propose recommendations.
    Writes to Google Sheets if credentials provided (mock otherwise).
    """
    def run(self, inputs):
        responses = inputs.get("responses", [])
        sent_status = inputs.get("sent_status", [])
        self.reason("Analyzing campaign performance")
        opens = sum(1 for r in responses if r.get("event") == "open")
        replies = sum(1 for r in responses if r.get("event") == "reply")
        total_sent = len(sent_status)
        open_rate = round((opens / total_sent) * 100, 2) if total_sent else 0
        reply_rate = round((replies / total_sent) * 100, 2) if total_sent else 0
        self.log(f"Open rate: {open_rate}%, Reply rate: {reply_rate}%")
        recommendations = []
        if open_rate < 20:
            recommendations.append({"type": "subject_line", "suggestion": "Test shorter subject lines with personalization (company name)."})
        if reply_rate < 2:
            recommendations.append({"type": "intro", "suggestion": "Try a one-line intro referencing a specific metric or recent event for the company."})
        recommendations.append({"type": "icp_tweak", "suggestion": "Prioritize companies with >50% revenue growth in last 12 months."})

        # Mock writing to Google Sheets or use real API if provided in tools_config
        gs_config = None
        for t in self.tools_config:
            if t.get("name") == "GoogleSheets":
                gs_config = t.get("config")
        if gs_config and os.getenv("GSHEET_WRITE") == "1":
            self.log("Would write recommendations to Google Sheets (not implemented here).")
            # Real implementation: use googleapiclient to write rows
        else:
            self.log("Google Sheets not configured; skipping write (mock).")

        return {"output": {"recommendations": recommendations, "metrics": {"open_rate": open_rate, "reply_rate": reply_rate}}, "reasoning": self.reasoning, "logs": self.logs}
