In [None]:
# Cell 1: Setup and imports
import sys
import os
from typing import Dict, List, Any
from datetime import datetime

# Add the parent directory to sys.path if needed
sys.path.append(os.path.abspath(".."))

# Import the workflow functions
from workflows.marketing_automation.sms_generator import generate_sms
from tools.browser_utils import analyze_brand_from_url

In [None]:
brand_url = "https://amlgolabs.com"
def analyze_brand_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """Node to analyze brand from URL"""
    try:
        brand_url = state["brand_url"]
        brand_analysis = analyze_brand_from_url(brand_url)
        state["brand_analysis"] = brand_analysis
        return state
    except Exception as e:
        state["errors"].append(f"Brand analysis failed: {str(e)}")
        return state
        
brand_analysis = analyze_brand_from_url(brand_url)
print(brand_analysis)

In [None]:
from typing import Dict, Any
from workflows.marketing_automation.planner import plan_campaign
def plan_campaign_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """Node to plan the marketing campaign"""
    try:
        campaign_plan = plan_campaign(
            brand_analysis=state["brand_analysis"],
            campaign_type=state["campaign_type"],
            custom_prompt=state.get("custom_prompt", ""),
            num_emails=state["num_emails"],
            num_sms=state["num_sms"],
            target_audience=state["target_audience"],
            tone=state["tone"]
        )
        state["campaign_plan"] = campaign_plan
        return state
    except Exception as e:
        state["errors"].append(f"Campaign planning failed: {str(e)}")
        return state
    
state = {
    "brand_analysis": brand_analysis,
    "campaign_type": "email",   # Example, change as needed
    "num_emails": 3,
    "num_sms": 2,
    "target_audience": brand_analysis['target_audience'],
    "tone": brand_analysis['tone'],
    # optionally add custom_prompt key
    # "custom_prompt": "Some custom prompt text",
    "errors": []
}

result = plan_campaign_node(state)
# Check the output
print(result.get("campaign_plan"))
print(result.get("errors"))

In [None]:
from workflows.marketing_automation.email_generator import generate_emails    
def generate_emails_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """Node to generate email content"""
    try:
        emails = generate_emails(
            brand_analysis=state["brand_analysis"],
            campaign_plan=state["campaign_plan"],
            num_emails=state["num_emails"],
            tone=state["tone"]
        )
        state["emails"] = emails
        return state
    except Exception as e:
        state["errors"].append(f"Email generation failed: {str(e)}")
        return state
tone = "Professional"
text = generate_emails(brand_analysis=brand_analysis, campaign_plan=result, num_emails=1, tone=tone)
print(text)

In [None]:
from workflows.marketing_automation.sms_generator import generate_sms
def generate_sms_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """Node to generate SMS content"""
    try:
        if state["num_sms"] > 0:
            sms = generate_sms(
                brand_analysis=state["brand_analysis"],
                campaign_plan=state["campaign_plan"],
                num_sms=state["num_sms"],
                tone=state["tone"]
            )
            state["sms"] = sms
        return state
    except Exception as e:
        state["errors"].append(f"SMS generation failed: {str(e)}")
        return state
sms = generate_sms(brand_analysis=brand_analysis, campaign_plan=result, num_sms=1, tone=tone)
print(sms)

In [None]:
from pprint import pprint
pprint(sms)

In [None]:
from pprint import pprint
pprint(brand_analysis)

In [None]:
from pprint import pprint
pprint(result)