In [1]:
# ============================================================
# CELL 1: IMPORTS & SETUP
# ============================================================

# --- PATH SETUP (MUST BE FIRST) ---
import sys
from pathlib import Path
import uuid

# Get current directory (should be notebooks/)
current_dir = Path.cwd()
print(f"üìÅ Current directory: {current_dir}")
print(f"üìÅ Current directory name: {current_dir.name}")

# Go up one level to project root
project_root = current_dir.parent
print(f"üìÅ Project root: {project_root}")

# Verify database folder exists
database_path = project_root / 'database'
print(f"üìÅ Database path: {database_path}")
print(f"‚úÖ Database folder exists: {database_path.exists()}")

# Add project root to Python path
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))
    print(f"‚úÖ Added to Python path: {project_root}")
else:
    print(f"‚úÖ Already in Python path")

print(f"\nüìã Python path (first 3):")
for i, p in enumerate(sys.path[:3], 1):
    print(f"   {i}. {p}")


# --- STANDARD LIBRARY IMPORTS ---
import os
import json
import time
import sqlite3
import glob
from typing import TypedDict, Optional, List, Dict, Any
from datetime import datetime
from enum import Enum
from typing import Optional

print("‚úÖ Standard library imports loaded")


# --- ENVIRONMENT & CONFIGURATION ---
from dotenv import load_dotenv

# Load environment variables from project root
env_path = project_root / '.env'
if env_path.exists():
    load_dotenv(env_path)
    print(f"‚úÖ Loaded .env from: {env_path}")
else:
    print(f"‚ö†Ô∏è  .env not found at: {env_path}")


# --- THIRD-PARTY ML/AI ---
import assemblyai as aai
from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph
from pydantic import BaseModel, Field, field_validator
from langchain_core.messages import SystemMessage, HumanMessage  
import json
from langchain_core.prompts import PromptTemplate

print("‚úÖ Third-party imports loaded")


# --- DATABASE IMPORTS ---
from database.db_operations import db
from database.models import Conversation, ConversationCreate

print("‚úÖ Database imports loaded")
print("\nüéâ All imports loaded successfully!")

üìÅ Current directory: /home/manuel/Documents/tech/ai_content_ops/notebooks
üìÅ Current directory name: notebooks
üìÅ Project root: /home/manuel/Documents/tech/ai_content_ops
üìÅ Database path: /home/manuel/Documents/tech/ai_content_ops/database
‚úÖ Database folder exists: True
‚úÖ Added to Python path: /home/manuel/Documents/tech/ai_content_ops

üìã Python path (first 3):
   1. /home/manuel/Documents/tech/ai_content_ops
   2. /usr/lib/python313.zip
   3. /usr/lib/python3.13
‚úÖ Standard library imports loaded
‚úÖ Loaded .env from: /home/manuel/Documents/tech/ai_content_ops/.env
‚úÖ Third-party imports loaded
‚úÖ Database imports loaded

üéâ All imports loaded successfully!


In [2]:

# Cell 2 

# Test API key
assemblyai_key = os.getenv('ASSEMBLYAI_API_KEY')
print(f"AssemblyAI API Key loaded: {'‚úÖ' if assemblyai_key else '‚ùå'}")
print(f"Key starts with: {assemblyai_key[:10] if assemblyai_key else 'None'}...")


AssemblyAI API Key loaded: ‚úÖ
Key starts with: 972365f41d...


In [3]:
# ============================================================
# CELL 3: LANGSMITH TRACING SETUP (OPTIONAL)
# ============================================================

import os
from dotenv import load_dotenv

# Load environment variables if not already loaded
load_dotenv()

# Get LangSmith API key from environment
langsmith_api_key = os.getenv("LANGSMITH_API_KEY")

if langsmith_api_key:
    # Enable LangSmith tracing
    os.environ["LANGSMITH_API_KEY"] = langsmith_api_key
    os.environ["LANGCHAIN_TRACING_V2"] = "true"
    os.environ["LANGCHAIN_PROJECT"] = "ai_content_ops_n9"
    print("‚úÖ LangSmith tracing enabled")
    print(f"   Project: ai_content_ops")
else:
    print("‚ö†Ô∏è  LANGSMITH_API_KEY not found in .env")
    print("   LangSmith tracing disabled")
    print("   üí° Add LANGSMITH_API_KEY to your .env file to enable tracing")

‚úÖ LangSmith tracing enabled
   Project: ai_content_ops


In [4]:
# Cell 3: Database Connection Test

conn = sqlite3.connect("data/app.db")
cursor = conn.cursor()

# Get all table names
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()

print("üìä Tables in app.db:")
for table in tables:
    table_name = table[0]
    
    # Get column info for each table
    cursor.execute(f"PRAGMA table_info({table_name})")
    columns = cursor.fetchall()
    
    print(f"\nüîß {table_name}:")
    for col in columns:
        print(f"   - {col[1]} ({col[2]})")  # column_name (type)

conn.close()

üìä Tables in app.db:

üîß conversations:
   - id (INTEGER)
   - title (TEXT)
   - raw_text (TEXT)
   - source (TEXT)
   - word_count (INTEGER)
   - created_at (DATETIME)
   - status (TEXT)

üîß sqlite_sequence:
   - name ()
   - seq ()

üîß blog_post_ideas:
   - id (INTEGER)
   - conversation_id (INTEGER)
   - title (TEXT)
   - description (TEXT)
   - usefulness_potential (INTEGER)
   - fitwith_seo_strategy (INTEGER)
   - fitwith_content_strategy (INTEGER)
   - inspiration_potential (INTEGER)
   - collaboration_potential (INTEGER)
   - innovation (INTEGER)
   - difficulty (INTEGER)
   - total_score (INTEGER)
   - sent_to_prod (BOOLEAN)
   - raw_llm_response (TEXT)
   - created_at (DATETIME)

üîß processing_status:
   - id (INTEGER)
   - conversation_id (INTEGER)
   - stage (TEXT)
   - status (TEXT)
   - error_message (TEXT)
   - started_at (DATETIME)
   - completed_at (DATETIME)


In [41]:
## cell 4. Pydantic Model for Structured Output



class SpeakerRole(str, Enum):
    """Possible speaker roles in the conversation"""
    CLIENT = "client"
    INTERVIEWER = "interviewer"

class Speaker(BaseModel):
    """Information about a person speaking in the conversation"""
    name: Optional[str] = Field(default=None, description="Name of the speaker if mentioned")
    role: Optional[SpeakerRole] = Field(default=None, description="Role of the speaker in the conversation")
    company: Optional[str] = Field(default=None, description="Company they work for if mentioned")

class Challenge(BaseModel):
    """A challenge or problem mentioned in the conversation"""
    description: Optional[str] = Field(default=None, description="Description of the challenge")
    impact: Optional[str] = Field(default=None, description="How this challenge affects them")
    urgency: Optional[str] = Field(default=None, description="Low, Medium, or High urgency")

class CurrentSolution(BaseModel):
    """How they currently solve their problems"""
    solution: Optional[str] = Field(default=None, description="What they're currently doing")
    satisfaction_level: Optional[str] = Field(default=None, description="How satisfied they are: Very Satisfied, Satisfied, Neutral, Unsatisfied, Very Unsatisfied")
    limitations: Optional[List[str]] = Field(default=[], description="Limitations of current solution")

class Need(BaseModel):
    """A need identified using psychology frameworks like NVC"""
    need_category: Optional[str] = Field(default=None, description="Category of need (e.g., autonomy, efficiency, security, connection)")
    description: Optional[str] = Field(default=None, description="Specific need description")
    intensity: Optional[str] = Field(default=None, description="Low, Medium, or High intensity")

class ExtractedInsights(BaseModel):
    """Complete structured output from conversation analysis"""
    
    # Speakers
    speakers: Optional[List[Speaker]] = Field(default=[], description="People identified in the conversation")
    
    # What they care about
    core_values: Optional[List[str]] = Field(default=[], description="What this person/company cares about most")
    priorities: Optional[List[str]] = Field(default=[], description="Their current priorities and focus areas")
    
    # Challenges
    primary_challenges: Optional[List[Challenge]] = Field(default=[], description="Main problems they're facing")
    secondary_challenges: Optional[List[Challenge]] = Field(default=[], description="Secondary or related problems")
    
    # Current solutions
    current_solutions: Optional[List[CurrentSolution]] = Field(default=[], description="How they solve problems today")
    
    # Needs analysis
    psychological_needs: Optional[List[Need]] = Field(default=[], description="Underlying needs using NVC or similar frameworks")

class BlogPostStatus(Enum):
    """Publication status of the blog post"""
    DRAFT = "draft"
    PUBLISHED = "published"
    ARCHIVED = "archived"

class BlogPost(BaseModel):
    """A blog post that solves a problem from a human expert"""
    id: Optional[str] = Field(default=None, description="Unique identifier for the blog post (UUID)")
    title: str = Field(..., min_length=1, max_length=255, description="Title of the blog post")  # Required
    content: str = Field(..., description="Full content of the blog post")  # Required
    issue: str = Field(..., description="What issue is being discussed in this post")  # Required
    angle: str = Field(..., description="Where Bigkids is positioned on this issue; the angle of the post")  # Required
    single_message: str = Field(..., description="The single message we want to pass; one idea for the reader to retain")  # Required
    user_story: str = Field(..., description="User story in agile format (e.g., 'As a tinkerer I would like to...')")  # Required
    seed_keyword: str = Field(..., description="Primary SEO seed keyword")  # Required
    call_to_action: Optional[str] = Field(default=None, description="Call to action for readers")
    keywords: Optional[List[str]] = Field(default=None, description="Additional keywords present in the article")
    status: Optional[BlogPostStatus] = Field(default=None, description="Publication status of the blog post")
    published_date: Optional[str] = Field(default=None, description="Date when the blog post was published (YYYY-MM-DD)")
    created_at: Optional[str] = Field(default=None, description="Timestamp when the record was created (ISO 8601)")
    updated_at: Optional[str] = Field(default=None, description="Timestamp when the record was last updated (ISO 8601)")

  # Optional: Auto-generate UUID if not provided
    @field_validator('id', mode='before')  # Updated: field_validator with mode='before' (replaces pre=True)
    @classmethod
    def generate_uuid(cls, v):
        return v or str(uuid.uuid4())


class Plan(BaseModel):  # From your snippet
    who: str = Field(description="Target reader of the blog post.")
    why: str = Field(description="Why are we writing this blog post.")
    what: str = Field(description="Main topics to cover.")
    the_issue: str = Field(description="Main issue or problem addressed.")
    where_we_stand: str = Field(description="Current position on the issue.")
    single_message: str = Field(description="Single most important message.")
    qa_pairs: List[Dict[str, str]] = Field(  # NEW: Added for your Q&A skeleton
        default=[], description="List of dicts with 'question' and 'answer' to guide writer."
    )
    instructions: List[str] = Field(  # NEW: Added for writer instructions
        default=[], description="Instructions to keep the writer focused."
    )

    @property
    def plan(self) -> str:  # From your snippet, augmented
        base = f"Who: {self.who}\nWhy: {self.why}\nWhat: {self.what}\nIssue: {self.the_issue}\nWhere We Stand: {self.where_we_stand}\nSingle Message: {self.single_message}\n"
        qa_str = "\nQ&A Pairs:\n" + "\n".join([f"Q: {pair['question']} A: {pair['answer']}" for pair in self.qa_pairs])
        instr_str = "\nInstructions:\n" + "\n".join(self.instructions)
        return base + qa_str + instr_str
 


In [6]:
# Cell 5: Raw Blog Idea Model (Simple)
class RawBlogIdea(BaseModel):
    """Raw blog idea from creative agent"""
    title: str
    description: str
    target_audience: str
    content_angle: str
    business_value: str

print("‚úÖ Simple RawBlogIdea model ready")

‚úÖ Simple RawBlogIdea model ready


In [7]:
def validate_raw_blog_ideas(raw_ideas: List[Dict]) -> List[RawBlogIdea]:
    """Validate and convert raw JSON to Pydantic models"""
    validated_ideas = []
    
    for idea in raw_ideas:
        try:
            validated_idea = RawBlogIdea(**idea)
            validated_ideas.append(validated_idea)
        except Exception as e:
            print(f"‚ö†Ô∏è Invalid blog idea skipped: {e}")
    
    print(f"‚úÖ Validated {len(validated_ideas)} out of {len(raw_ideas)} raw ideas")
    return validated_ideas

print("‚úÖ RawBlogIdea model and validation ready")

‚úÖ RawBlogIdea model and validation ready


In [8]:
# Cell 6: Update AudioPipelineState for 5-Node Pipeline
class AudioPipelineState(TypedDict):
    # File info
    file_path: str
    filename: str
    
    # Processing results
    transcript_text: Optional[str]
    conversation_id: Optional[int]
    extracted_insights: Optional[ExtractedInsights]
    raw_blog_ideas: Optional[List[Dict]]        # From creative agent (Node 4)
    scored_blog_ideas: Optional[List[Dict]]     # From analyst agent (Node 5) 
    saved_idea_ids: Optional[List[int]]         # From database saver (Node 6) 
    selected_idea_id: Optional[int]  # NEW: Human-selected idea ID from HITL
    selected_idea: Optional[Dict]    # NEW: The actual selected idea dict (for convenience)
    strategy_context: Optional[Dict[str, str]]  # NEW: Loaded company/SEO/content strategies
    blog_plan: Optional[Plan]       # NEW: Generated plan (Pydantic-structured)
    # For future writing node
    blog_post: Optional[BlogPost]       # NEW: e.g., {"title": str, "content": str}
    
    
    # Status & error handling
    status: str
    error: Optional[str]

print("‚úÖ Updated AudioPipelineState for 6-node pipeline")

‚úÖ Updated AudioPipelineState for 6-node pipeline


In [9]:


def planning_agent_node(state: AudioPipelineState) -> AudioPipelineState:
    """Planning agent: Generates a blog post plan using strategies, insights, and selected idea."""
    # Load strategy context (from your snippet)
    strategy_context = load_company_strategy_context()
    state["strategy_context"] = strategy_context
    
    # Access state data
    insights = state.get("extracted_insights")
    transcript = state.get("transcript_text", "No transcript available")
    selected_idea = state.get("selected_idea", {})  # Assumed populated post-HITL
    insights_json = insights.model_dump() if insights else {}
    
    # Prompt template inspired by your plan_instructions, augmented with pipeline data
    plan_instructions = """You are tasked with creating a plan for a professional blog post for Big Kids Automation Agency. The plan is a skeleton with questions/answers and instructions to guide the writer.

Follow these instructions carefully:

1. Review the company business strategy: {company_strategy_content}

2. Ensure the plan fits the company content strategy: {content_strategy_content}

3. Ensure the plan fits the company SEO strategy: {seo_strategy_content}

4. Review the extracted insights from the interview (pains, challenges, etc.): {insights_json}

5. Review the interview transcript: {transcript}

6. Base the plan on this human-selected blog idea: {selected_idea}

7. Create a plan with: who, why, what, the_issue, where_we_stand, single_message. Add qa_pairs (5-10 Q&A from insights, e.g., 'question': 'What is the main challenge?', 'answer': '[From pains]') and instructions (e.g., 'Stay focused on automation benefits', 'Incorporate SEO keywords').

8. If there's human feedback, incorporate it: {human_analyst_feedback}
"""
    
    # Format the prompt (add human feedback if you extend state for it)
    formatted_instructions = plan_instructions.format(
        company_strategy_content=strategy_context.get('company_strategy', ''),
        content_strategy_content=strategy_context.get('content_strategy', ''),
        seo_strategy_content=strategy_context.get('seo_strategy', ''),
        insights_json=json.dumps(insights_json),
        transcript=transcript[:2000],  # Truncate if needed
        selected_idea=json.dumps(selected_idea),
        human_analyst_feedback=state.get('human_analyst_feedback', 'No feedback')  # Optional extension
    )
    
    # Set up structured LLM (inspired by your snippet)
    structured_llm = llm.with_structured_output(Plan)
    
    # Invoke (now with proper message classes available)
    plan = structured_llm.invoke([SystemMessage(content=formatted_instructions), HumanMessage(content="Generate the blog post plan.")])
    
    # Update state
    state["blog_plan"] = plan
    return state

In [10]:


class SpeakerRole(Enum):
    CLIENT = 'client'
    INTERVIEWER = 'interviewer'

class Speaker(BaseModel):
    name: str
    role: SpeakerRole  # Enforces the Enum

class Challenge(BaseModel):
    description: str
    impact: Optional[str] = None  # Adjust fields as per your real model

class CurrentSolution(BaseModel):
    method: str
    effectiveness: Optional[str] = None

class Need(BaseModel):
    need: str
    details: Optional[str] = None

# Mock state with raw dicts for sub-models (Pydantic will coerce them)
#mock_state = {
#    "extracted_insights": ExtractedInsights(
#        speakers=[{"name": "John Doe", "role": "client"}],  # Raw dict; use 'client' or 'interviewer' for role
#        core_values=["Innovation", "Efficiency"],  # Simple list of str (no sub-model needed)
#        priorities=["Scaling automation", "Reducing costs"],
#        primary_challenges=[{"description": "Data integration issues", "impact": "High"}],  # Raw dict for Challenge
#        secondary_challenges=[{"description": "Team training", "impact": "Medium"}],
#        current_solutions=[{"method": "Manual workflows", "effectiveness": "Low"}],
#        psychological_needs=[{"need": "Security", "details": "Stable processes"}]
#    ),
#    "transcript_text": "Sample transcript: Interviewee discussed pains in automation, like integration challenges.",
#    "selected_idea": {"title": "Test Idea: Overcoming Automation Pains", "description": "Blog on solutions from interview"},
    # Add other fields if needed for your node (e.g., "human_analyst_feedback": "Incorporate more SEO")
#}

# Run the node
#result = planning_agent_node(mock_state)

# Print the result (adjust based on your Plan model)
#print("Generated Blog Plan:")
#if "blog_plan" in result and result["blog_plan"]:
#    print(result["blog_plan"].plan)  # Uses the @property from Plan
#else:
#    print("No plan generated‚Äîcheck for errors in the node.")

In [11]:
def writing_agent_node(state: AudioPipelineState) -> AudioPipelineState:
    """Writing agent: Drafts a blog post implementing the plan, using strategies."""
    # Load strategy context
    strategy_context = load_company_strategy_context()
    state["strategy_context"] = strategy_context  # Optional: Save to state for traceability
    
    # Access plan and other relevant state
    blog_plan = state.get("blog_plan")
    if not blog_plan:
        state["error"] = "No blog plan available for writing"
        return state
    transcript = state.get("transcript_text", "No transcript available")  # Optional context
    insights = state.get("extracted_insights")  # Optional
    insights_json = insights.model_dump() if insights else {}
    
    # Prompt template: Instruct LLM to read strategies, plan, and generate per schema
    writing_instructions = """You are a professional blog writer for Big Kids Automation Agency. Draft a complete blog post that implements the provided plan and solves a problem from a human expert interview.

Follow these instructions carefully:

1. Review the company business strategy: {company_strategy_content}

2. Ensure the post fits the company content strategy: {content_strategy_content}

3. Ensure the post fits the company SEO strategy: {seo_strategy_content} (e.g., incorporate the seed_keyword and keywords naturally).

4. Read and implement the blog plan: {blog_plan}. This includes who, why, what, the_issue, where_we_stand, single_message, qa_pairs, and instructions.

5. Optionally, reference the interview transcript: {transcript} and extracted insights: {insights_json} for authentic details (e.g., quotes from speakers or challenges).

6. Generate a blog post matching this exact schema:
   - id: Auto-generated UUID
   - title: Engaging title based on the plan
   - content: Full, well-written post (800-1500 words, engaging, with sections implementing qa_pairs)
   - issue: From the plan's the_issue
   - angle: From the plan's where_we_stand
   - single_message: From the plan's single_message
   - user_story: Agile-style story based on the plan's who and why
   - seed_keyword: Primary SEO keyword (align with SEO strategy)
   - call_to_action: Optional, e.g., "Contact us for automation solutions"
   - keywords: List of 5-10 additional keywords
   - status: Set to "draft"
   - published_date: Leave as null or set to today's date (YYYY-MM-DD)
   - created_at and updated_at: Set to current ISO timestamp (e.g., 2023-10-01T12:00:00Z)

Make the post professional, focused on automation benefits, and true to the plan.
"""
    
    # Format the prompt
    formatted_instructions = writing_instructions.format(
        company_strategy_content=strategy_context.get('company_strategy', ''),
        content_strategy_content=strategy_context.get('content_strategy', ''),
        seo_strategy_content=strategy_context.get('seo_strategy', ''),
        blog_plan=blog_plan.plan,  # Use the formatted string property for readability
        transcript=transcript[:2000],  # Truncate if too long
        insights_json=json.dumps(insights_json)
    )
    
    # Set up structured LLM
    structured_llm = llm.with_structured_output(BlogPost)
    
    # Invoke
    blog_post = structured_llm.invoke([SystemMessage(content=formatted_instructions), HumanMessage(content="Draft the blog post implementing the plan.")])
    
    # Update state
    state["blog_post"] = blog_post
    return state

In [12]:
def idea_selection_hitl(state: AudioPipelineState) -> AudioPipelineState:
    """No-op node for human-in-the-loop idea selection. Interrupt here to choose an idea."""
    # In dev: Print ideas for human to see
    print("HITL: Scored ideas:", state.get("scored_blog_ideas", []))
    print("Saved idea IDs:", state.get("saved_idea_ids", []))
    # The graph interrupts before this runs; human updates state externally (e.g., via checkpointer)
    return state

In [13]:
# Cell: Individual Strategy Loader Function

def load_company_strategy():
    """Load company strategy document"""
    try:
        company_strategy_path = "../data/processed/company_strategy.mkd"
        if os.path.exists(company_strategy_path):
            with open(company_strategy_path, "r", encoding="utf-8") as f:
                content = f.read()
            print(f"‚úÖ Loaded company strategy ({len(content)} chars)")
            return content
        else:
            print("‚ö†Ô∏è Company strategy document not found")
            return "Company strategy document not available."
    except Exception as e:
        print(f"‚ùå Error loading company strategy: {e}")
        return "Company strategy document not available."

def load_seo_strategy():
    """Load SEO strategy document"""
    try:
        seo_strategy_path = "../data/processed/seo_strategy.mkd"
        if os.path.exists(seo_strategy_path):
            with open(seo_strategy_path, "r", encoding="utf-8") as f:
                content = f.read()
            print(f"‚úÖ Loaded SEO strategy ({len(content)} chars)")
            return content
        else:
            print("‚ö†Ô∏è SEO strategy document not found")
            return "SEO strategy document not available."
    except Exception as e:
        print(f"‚ùå Error loading SEO strategy: {e}")
        return "SEO strategy document not available."

def load_content_strategy():
    """Load content strategy document"""
    try:
        content_strategy_path = "../data/processed/content_strategy.mkd"
        if os.path.exists(content_strategy_path):
            with open(content_strategy_path, "r", encoding="utf-8") as f:
                content = f.read()
            print(f"‚úÖ Loaded content strategy ({len(content)} chars)")
            return content
        else:
            print("‚ö†Ô∏è Content strategy document not found")
            return "Content strategy document not available."
    except Exception as e:
        print(f"‚ùå Error loading content strategy: {e}")
        return "Content strategy document not available."

def prepare_strategy_context_for_scoring():
    """Prepare full strategy context for scoring (used by analyst agent)"""
    return {
        'company_strategy_summary': load_company_strategy(),
        'seo_strategy_summary': load_seo_strategy(),
        'content_strategy_summary': load_content_strategy()
    }

print("‚úÖ Individual strategy loader functions ready")
print("‚úÖ prepare_strategy_context_for_scoring() ready")

‚úÖ Individual strategy loader functions ready
‚úÖ prepare_strategy_context_for_scoring() ready


In [14]:
# Cell 7: Updated Company Strategy Context Loader (3 Documents)
def load_company_strategy_context():
    """Load company strategy, SEO strategy, and content strategy for context"""
    
    strategy_context = {}
    
    try:
        # Load company strategy
        company_strategy_path = "../data/processed/company_strategy.mkd"
        if os.path.exists(company_strategy_path):
            with open(company_strategy_path, "r", encoding="utf-8") as f:
                strategy_context["company_strategy"] = f.read()
            print(f"‚úÖ Loaded company strategy ({len(strategy_context['company_strategy'])} chars)")
        else:
            strategy_context["company_strategy"] = "Company strategy document not available."
            print("‚ö†Ô∏è Company strategy document not found")
        
        # Load SEO strategy
        seo_strategy_path = "../data/processed/seo_strategy.mkd"
        if os.path.exists(seo_strategy_path):
            with open(seo_strategy_path, "r", encoding="utf-8") as f:
                strategy_context["seo_strategy"] = f.read()
            print(f"‚úÖ Loaded SEO strategy ({len(strategy_context['seo_strategy'])} chars)")
        else:
            strategy_context["seo_strategy"] = "SEO strategy document not available."
            print("‚ö†Ô∏è SEO strategy document not found")
        
        # Load content strategy (NEW)
        content_strategy_path = "../data/processed/content_strategy.mkd"
        if os.path.exists(content_strategy_path):
            with open(content_strategy_path, "r", encoding="utf-8") as f:
                strategy_context["content_strategy"] = f.read()
            print(f"‚úÖ Loaded content strategy ({len(strategy_context['content_strategy'])} chars)")
        else:
            strategy_context["content_strategy"] = "Content strategy document not available."
            print("‚ö†Ô∏è Content strategy document not found")
            
    except Exception as e:
        print(f"‚ùå Error loading strategy documents: {e}")
        strategy_context = {
            "company_strategy": "Strategy document not available",
            "seo_strategy": "SEO strategy document not available", 
            "content_strategy": "Content strategy document not available"
        }
    
    return strategy_context

# Test loading all three documents
strategy_context = load_company_strategy_context()
print(f"üìä Strategy context keys: {list(strategy_context.keys())}")
print(f"üìä Total context size: {sum(len(v) for v in strategy_context.values() if isinstance(v, str))} chars")

‚ö†Ô∏è Company strategy document not found
‚úÖ Loaded SEO strategy (1120 chars)
‚úÖ Loaded content strategy (4473 chars)
üìä Strategy context keys: ['company_strategy', 'seo_strategy', 'content_strategy']
üìä Total context size: 5633 chars


In [15]:
# Batch Processing Function (Updated with Full Insights Display)
def process_audio_batch(audio_files: List[Path], pipeline) -> dict:
    """Process all audio files in batch with detailed insights display"""
    
    if not audio_files:
        print("‚ùå No files to process")
        return {"processed": [], "failed": [], "total": 0}
    
    print(f"\nüöÄ STARTING BATCH PROCESSING - {len(audio_files)} files")
    print("=" * 60)
    
    processed_files = []
    failed_files = []
    results = []
    
    for i, file_path in enumerate(audio_files, 1):
        print(f"\nüìÇ Processing {i}/{len(audio_files)}: {file_path.name}")
        print("-" * 40)
        
        # Create initial state
        initial_state = {
            "file_path": str(file_path),
            "filename": file_path.name,
            "transcript_text": None,
            "conversation_id": None,
            "extracted_insights": None,  
            "error": None,
            "status": "processing"
        }
        
        try:
            # Run through pipeline
            result = pipeline.invoke(initial_state)
            
            if result["status"] in ["completed", "insights_extracted"]:
                print(f"‚úÖ SUCCESS: {file_path.name}")
                print(f"   Conversation ID: {result['conversation_id']}")
                print(f"   Transcript preview: {result['transcript_text'][:100]}...")
                
                # FULL INSIGHTS DISPLAY
                if result.get('extracted_insights'):
                    insights = result['extracted_insights']
                    print(f"\nüß† === EXTRACTED INSIGHTS FOR: {file_path.name} ===")
                    print("=" * 50)
                    
                    # Speakers
                    if insights.speakers:
                        print("üë• SPEAKERS:")
                        for speaker in insights.speakers:
                            print(f"   ‚Ä¢ Name: {speaker.name or 'Unknown'}")
                            print(f"     Role: {speaker.role or 'Unknown'}")  
                            print(f"     Company: {speaker.company or 'Unknown'}")
                    
                    # Core Values
                    if insights.core_values:
                        print("üíé CORE VALUES:")
                        for value in insights.core_values:
                            print(f"   ‚Ä¢ {value}")
                    
                    # Priorities
                    if insights.priorities:
                        print("üéØ PRIORITIES:")
                        for priority in insights.priorities:
                            print(f"   ‚Ä¢ {priority}")
                    
                    # Primary Challenges
                    if insights.primary_challenges:
                        print("üî• PRIMARY CHALLENGES:")
                        for challenge in insights.primary_challenges:
                            print(f"   ‚Ä¢ Challenge: {challenge.description}")
                            print(f"     Impact: {challenge.impact}")
                            print(f"     Urgency: {challenge.urgency}")
                    
                    # Secondary Challenges
                    if insights.secondary_challenges:
                        print("‚ö†Ô∏è  SECONDARY CHALLENGES:")
                        for challenge in insights.secondary_challenges:
                            print(f"   ‚Ä¢ Challenge: {challenge.description}")
                            print(f"     Impact: {challenge.impact}")
                            print(f"     Urgency: {challenge.urgency}")
                    
                    # Current Solutions
                    if insights.current_solutions:
                        print("üîß CURRENT SOLUTIONS:")
                        for solution in insights.current_solutions:
                            print(f"   ‚Ä¢ Solution: {solution.solution}")
                            print(f"     Satisfaction: {solution.satisfaction_level}")
                            if solution.limitations:
                                print(f"     Limitations: {', '.join(solution.limitations)}")
                    
                    # Psychological Needs
                    if insights.psychological_needs:
                        print("üßò PSYCHOLOGICAL NEEDS:")
                        for need in insights.psychological_needs:
                            print(f"   ‚Ä¢ {need.description}")
                            print(f"     Category: {need.need_category}")
                            print(f"     Intensity: {need.intensity}")
                    
                    print("üß† === END INSIGHTS ===")
                    print("-" * 50)
                
                processed_files.append(file_path)
            else:
                print(f"‚ùå FAILED: {file_path.name}")
                print(f"   Status: {result.get('status', 'Unknown')}")
                print(f"   Error: {result.get('error', 'Unknown error')}")
                failed_files.append(file_path)
            
            results.append(result)
            
        except Exception as e:
            print(f"‚ùå PIPELINE ERROR: {file_path.name}")
            print(f"   Exception: {str(e)}")
            failed_files.append(file_path)
            
            results.append({
                **initial_state,
                "error": str(e),
                "status": "pipeline_error"
            })
    
    # Final Summary
    print(f"\nüìä BATCH PROCESSING COMPLETE!")
    print("=" * 60)
    print(f"‚úÖ Successfully processed: {len(processed_files)}")
    print(f"‚ùå Failed: {len(failed_files)}")
    print(f"üìÅ Total files: {len(audio_files)}")
    
    if failed_files:
        print(f"\n‚ùå Failed files:")
        for failed_file in failed_files:
            print(f"   - {failed_file.name}")
    
    return {
        "processed": processed_files,
        "failed": failed_files,
        "total": len(audio_files),
        "results": results
    }

print("‚úÖ Batch processing function ready with full insights display")

‚úÖ Batch processing function ready with full insights display


In [16]:
# Cell 5: Batch File Discovery and Management


def find_audio_files_in_temp(temp_folder: Path = None) -> List[Path]:
    """Find all audio files in temp folder"""
    
    # Use default temp folder if not specified
    if temp_folder is None:
        temp_folder = project_root / 'data' / 'temp'
    
    # Ensure folder exists
    temp_folder.mkdir(parents=True, exist_ok=True)
    
    # Check if folder exists
    if not temp_folder.exists():
        print(f"‚ùå Temp folder not found: {temp_folder}")
        return []
    
    # Find audio files
    audio_extensions = ['*.wav', '*.mp3', '*.m4a']
    audio_files = []
    
    for ext in audio_extensions:
        files = list(temp_folder.glob(ext))
        audio_files.extend(files)
    
    return sorted(audio_files)


def display_batch_info(audio_files: List[Path]) -> bool:
    """Display information about the batch of files"""
    
    if not audio_files:
        print("‚ùå No audio files found in temp folder!")
        print("üí° TIP: Add .wav files to data/temp/ folder")
        return False
    
    total_size_mb = sum(f.stat().st_size for f in audio_files) / (1024 * 1024)
    
    print(f"üìä BATCH INFO:")
    print(f"   Files to process: {len(audio_files)}")
    print(f"   Total size: {total_size_mb:.1f} MB")
    print(f"\nüìÅ Files found:")
    
    for i, file_path in enumerate(audio_files, 1):
        size_mb = file_path.stat().st_size / (1024 * 1024)
        print(f"   {i}. {file_path.name} ({size_mb:.1f} MB)")
    
    return True


print("‚úÖ File management functions defined")

‚úÖ File management functions defined


In [17]:
# Cell: Extract Insights Function - ENHANCED WITH JSON REPAIR
def extract_insights_from_transcript(transcript: str) -> ExtractedInsights:
    """Extract structured insights using Anthropic Claude - ENHANCED WITH JSON REPAIR"""
    
    prompt = f"""
    Analyze this conversation transcript and extract structured insights:
    
    Transcript: {transcript}
    
    IMPORTANT: For speaker roles, use ONLY these exact values:
    - "client" for the person being interviewed/consulted (CTO, CEO, Manager, business owner, etc.)
    - "interviewer" for the person asking questions or conducting the interview
    
    Extract the following information in JSON format:
    - speakers: List of people mentioned with name, role (client/interviewer only), company
    - core_values: What they care about most  
    - priorities: Current focus areas
    - primary_challenges: Main problems they face with description, impact, urgency
    - secondary_challenges: Secondary problems
    - current_solutions: How they solve problems now with satisfaction level
    - psychological_needs: Underlying needs with category, description, intensity
    
    Return ONLY valid JSON in this exact structure - no markdown, no code blocks:
    {{
        "speakers": [
            {{"name": "Manuel", "role": "client", "company": "Drone flytech"}}
        ],
        "core_values": ["efficiency", "transparency"],
        "priorities": ["improving processes"],
        "primary_challenges": [
            {{
                "description": "Tracking payment issues",
                "impact": "Creates confusion in processes", 
                "urgency": "High"
            }}
        ],
        "secondary_challenges": [
            {{
                "description": "Secondary challenge",
                "impact": "Secondary impact",
                "urgency": "Medium"
            }}
        ],
        "current_solutions": [
            {{
                "solution": "Current approach",
                "satisfaction_level": "Neutral",
                "limitations": ["limitation1", "limitation2"]
            }}
        ],
        "psychological_needs": [
            {{
                "need_category": "security",
                "description": "Need for confidence",
                "intensity": "High"
            }}
        ]
    }}
    
    Remember: 
    - Use "client" for the interviewee (even if they're CTO/CEO)
    - Use "interviewer" for the person asking questions
    - Use exact urgency values: "Low", "Medium", "High"
    - Use exact satisfaction levels: "Very Satisfied", "Satisfied", "Neutral", "Unsatisfied", "Very Unsatisfied"
    - Use exact intensity values: "Low", "Medium", "High"
    - Ensure all strings are properly closed with quotes
    - Do not truncate the response - complete all JSON structures
    """
    
    try:
        # Use the Claude LLM you already set up
        response = llm.invoke(prompt)
        
        print(f"üìù Raw response length: {len(response.content)} chars")
        print(f"üìù Response starts with: {response.content[:50]}...")
        
        # Clean markdown code blocks
        content = response.content.strip()
        if content.startswith('```json'):
            print("üîß Removing JSON markdown blocks...")
            content = content.replace('```json', '').replace('```', '').strip()
        elif content.startswith('```'):
            print("üîß Removing generic markdown blocks...")
            content = content.replace('```', '').strip()
        
        # Extract JSON boundaries
        first_brace = content.find('{')
        if first_brace > 0:
            content = content[first_brace:]
        
        last_brace = content.rfind('}')
        if last_brace > 0 and last_brace < len(content) - 1:
            content = content[:last_brace + 1]
        
        print(f"üîß Cleaned content starts with: {content[:50]}...")
        
        # === ENHANCED: Try parsing with auto-repair ===
        try:
            insights_data = json.loads(content)
            print("‚úÖ JSON parsed successfully")
            
        except json.JSONDecodeError as e:
            print(f"‚ö†Ô∏è  JSON parsing error: {e}")
            print(f"   Error at position: {e.pos}")
            
            # Show context around error
            start = max(0, e.pos - 80)
            end = min(len(content), e.pos + 80)
            print(f"   Context: ...{content[start:end]}...")
            
            print("\nüîß Attempting JSON auto-repair...")
            
            import re
            
            # Repair strategy based on error type
            repaired = content
            
            if "Unterminated string" in str(e):
                print("   ‚Üí Fixing unterminated string...")
                # Add closing quote at error position
                repaired = content[:e.pos] + '"'
                
                # Close any open structures after the fix
                partial = content[:e.pos]
                open_braces = partial.count('{')
                close_braces = partial.count('}')
                open_brackets = partial.count('[')
                close_brackets = partial.count(']')
                
                if open_brackets > close_brackets:
                    repaired += ']' * (open_brackets - close_brackets)
                    print(f"   ‚Üí Added {open_brackets - close_brackets} closing bracket(s)")
                
                if open_braces > close_braces:
                    repaired += '}' * (open_braces - close_braces)
                    print(f"   ‚Üí Added {open_braces - close_braces} closing brace(s)")
            
            else:
                # Generic repairs
                # Fix 1: Remove trailing commas
                repaired = re.sub(r',\s*}', '}', repaired)
                repaired = re.sub(r',\s*]', ']', repaired)
                
                # Fix 2: Balance brackets
                open_braces = repaired.count('{')
                close_braces = repaired.count('}')
                open_brackets = repaired.count('[')
                close_brackets = repaired.count(']')
                
                if open_brackets > close_brackets:
                    repaired += ']' * (open_brackets - close_brackets)
                    print(f"   ‚Üí Added {open_brackets - close_brackets} closing bracket(s)")
                
                if open_braces > close_braces:
                    repaired += '}' * (open_braces - close_braces)
                    print(f"   ‚Üí Added {open_braces - close_braces} closing brace(s)")
            
            # Try parsing repaired JSON
            try:
                insights_data = json.loads(repaired)
                print("‚úÖ Auto-repair successful!")
                
            except json.JSONDecodeError as e2:
                print(f"‚ùå Auto-repair failed: {e2}")
                
                # Last resort: Extract partial valid data
                print("\nüîß Last resort: Extracting partial data...")
                
                # Find last complete object before error
                try:
                    safe_end = content[:e.pos].rfind('}')
                    if safe_end > 0:
                        partial = content[:safe_end + 1]
                        
                        # Balance remaining brackets
                        open_braces = partial.count('{')
                        close_braces = partial.count('}')
                        if open_braces > close_braces:
                            partial += '}' * (open_braces - close_braces)
                        
                        insights_data = json.loads(partial)
                        print("‚úÖ Partial extraction successful!")
                    else:
                        raise ValueError("No valid JSON found")
                        
                except Exception as e3:
                    print(f"‚ùå Partial extraction failed: {e3}")
                    print(f"\nüìù Problematic response (first 1000 chars):")
                    print(content[:1000])
                    raise e  # Re-raise original error
        
        # === Validate and fill missing fields ===
        required_fields = {
            'speakers': [],
            'core_values': [],
            'priorities': [],
            'primary_challenges': [],
            'secondary_challenges': [],
            'current_solutions': [],
            'psychological_needs': []
        }
        
        for field, default in required_fields.items():
            if field not in insights_data:
                print(f"‚ö†Ô∏è  Missing field '{field}', using default: {default}")
                insights_data[field] = default
        
        # Fix speaker roles (ensure only 'client' or 'interviewer')
        for speaker in insights_data.get('speakers', []):
            if 'role' not in speaker or speaker['role'] not in ['interviewer', 'client']:
                print(f"‚ö†Ô∏è  Invalid role for {speaker.get('name', 'unknown')}, defaulting to 'client'")
                speaker['role'] = 'client'
        
        # Convert to Pydantic model
        result = ExtractedInsights(**insights_data)
        
        print(f"‚úÖ Successfully extracted insights!")
        print(f"   Speakers: {len(result.speakers)}")
        print(f"   Challenges: {len(result.primary_challenges)}")
        print(f"   Needs: {len(result.psychological_needs)}")
        print(f"   Values: {len(result.core_values)}")
        
        return result
        
    except json.JSONDecodeError as e:
        print(f"‚ùå Final JSON parsing error: {e}")
        print(f"üìù Raw response (first 500 chars): {response.content[:500]}...")
        raise
    except Exception as e:
        print(f"‚ùå Error in extraction: {e}")
        import traceback
        traceback.print_exc()
        raise

print("‚úÖ Enhanced extract_insights_from_transcript with JSON repair")

‚úÖ Enhanced extract_insights_from_transcript with JSON repair


In [18]:
# Cell: Fixed Creative Agent Function
def generate_blog_ideas_from_insights(insights: ExtractedInsights, strategy_context: dict) -> List[Dict]:
    """
    Fixed creative agent that handles Claude's markdown JSON response
    """
    
    creative_prompt = f"""
    You are a creative content strategist for Big Kids Automation, a company that helps businesses implement AI and automation solutions.
    
    COMPANY CONTEXT:
    {strategy_context.get('company_strategy', 'Strategy not available')[:1000]}...
    
    SEO STRATEGY:
    {strategy_context.get('seo_strategy', 'SEO strategy not available')[:500]}...
    
    CONVERSATION INSIGHTS TO WORK FROM:
    
    Speakers: {[f"{s.name} ({s.role}) from {s.company}" for s in insights.speakers] if insights.speakers else "Unknown speakers"}
    
    Core Values: {", ".join(insights.core_values) if insights.core_values else "None identified"}
    
    Priorities: {", ".join(insights.priorities) if insights.priorities else "None identified"}
    
    Primary Challenges:
    {chr(10).join([f"- {c.description} (Impact: {c.impact}, Urgency: {c.urgency})" for c in insights.primary_challenges]) if insights.primary_challenges else "None identified"}
    
    Current Solutions:
    {chr(10).join([f"- {s.solution} (Satisfaction: {s.satisfaction_level})" for s in insights.current_solutions]) if insights.current_solutions else "None identified"}
    
    Psychological Needs:
    {chr(10).join([f"- {n.description} ({n.need_category}, {n.intensity} intensity)" for n in insights.psychological_needs]) if insights.psychological_needs else "None identified"}
    
    TASK:
    Generate 4-5 creative blog post ideas that:
    1. Address the challenges and needs identified in this conversation
    2. Align with Big Kids Automation's mission to help businesses with AI/automation
    3. Provide value to potential clients facing similar challenges
    4. Support our SEO and content marketing strategy
    5. Are actionable and practical, not just theoretical
    
    For each blog post idea, provide:
    - title: Clear, engaging title that includes relevant keywords
    - description: 2-3 sentence description of what the post will cover
    - target_audience: Who this post is primarily for
    - content_angle: The unique angle or approach this post takes
    - business_value: How this post helps our business goals
    
    IMPORTANT: Return ONLY the JSON array, no markdown formatting, no code blocks, no explanatory text.
    
    Format:
    [
        {{
            "title": "How AI Proposal Systems Balance Speed with Brand Differentiation",
            "description": "A practical guide showing how modern AI-powered proposal systems solve the common problem of maintaining company uniqueness while leveraging automation. Includes real case studies and implementation steps.",
            "target_audience": "Business development directors and proposal managers at consulting firms",
            "content_angle": "Problem-solution with real case studies",
            "business_value": "Attracts prospects struggling with proposal automation while maintaining differentiation"
        }}
    ]
    """
    
    try:
        # Generate ideas using Claude
        response = llm.invoke(creative_prompt)
        raw_content = response.content.strip()
        
        print(f"üìù Raw response length: {len(raw_content)} chars")
        print(f"üìù Response starts with: {raw_content[:50]}...")
        
        # Handle markdown code blocks
        if raw_content.startswith('```'):
            print("üîß Removing markdown code blocks...")
            # Remove ```json and ``` wrappers
            lines = raw_content.split('\n')
            # Remove first line if it's ```json or ```
            if lines[0].startswith('```'):
                lines = lines[1:]
            # Remove last line if it's ```
            if lines and lines[-1].strip() == '```':
                lines = lines[:-1]
            raw_content = '\n'.join(lines).strip()
            print(f"üîß Cleaned content starts with: {raw_content[:50]}...")
        
        # Parse JSON response
        blog_ideas = json.loads(raw_content)
        
        print(f"‚úÖ Creative agent successfully parsed {len(blog_ideas)} blog ideas")
        return blog_ideas
        
    except json.JSONDecodeError as e:
        print(f"‚ùå JSON parsing error in creative agent: {e}")
        print(f"üìù Cleaned content: {raw_content[:500]}...")
        return []
    except Exception as e:
        print(f"‚ùå Error in creative agent: {e}")
        return []

print("‚úÖ Fixed creative agent function ready")

‚úÖ Fixed creative agent function ready


In [19]:
# Cell 17: Updated Scoring Engine with Content Strategy Context
def score_blog_idea_with_llm(idea: dict, strategy_context: dict, conversation_context: str = "") -> dict:
    """Score a single blog idea using LLM with all three strategy contexts"""
    
    scoring_prompt = f"""
    You are an expert content strategist for Big Kids Automation. Score this blog post idea on a 1-10 scale using our strategic context.
    
    COMPANY STRATEGY:
    {strategy_context.get('company_strategy_summary', 'Not available')}
    
    SEO STRATEGY:
    {strategy_context.get('seo_strategy_summary', 'Not available')}
    
    CONTENT STRATEGY:
    {strategy_context.get('content_strategy_summary', 'Not available')}
    
    BLOG IDEA TO SCORE:
    Title: {idea.get('title', 'No title')}
    Description: {idea.get('description', 'No description')}
    Target Audience: {idea.get('target_audience', 'Unknown')}
    Business Value: {idea.get('business_value', 'Unknown')}
    Content Angle: {idea.get('content_angle', 'Unknown')}
    
    CONVERSATION CONTEXT:
    {conversation_context[:300] if conversation_context else 'No context available'}...
    
    SCORING INSTRUCTIONS:
    Rate each criterion from 1-10 (10 = excellent, 1 = poor):
    
    1. usefulness_potential: How useful will this be to readers with real problems?
    2. fitwith_seo_strategy: How well does this align with our SEO keywords and strategy?
    3. fitwith_content_strategy: How well does this fit our content strategy, voice, and approach?
    4. inspiration_potential: How likely to inspire readers to take meaningful action?
    5. collaboration_potential: How likely to generate leads/prospects who contact us?
    6. innovation: How unique is this topic compared to existing content?
    7. difficulty: How complex/time-consuming will this be to write? (1=very hard, 10=easy)
    
    Return ONLY valid JSON with your scores and brief reasoning:
    {{
        "usefulness_potential": 8,
        "fitwith_seo_strategy": 7,
        "fitwith_content_strategy": 9,
        "inspiration_potential": 6,
        "collaboration_potential": 8,
        "innovation": 7,
        "difficulty": 4,
        "reasoning": "This idea scores well because it aligns with our content strategy focus on..."
    }}
    """
    
    # ... rest of the function stays the same
    try:
        response = llm.invoke(scoring_prompt)
        
        content = response.content.strip()
        if content.startswith('```json'):
            content = content.replace('```json', '').replace('```', '').strip()
        
        scores = json.loads(content)
        
        # Validate scores are in range
        for criterion in ['usefulness_potential', 'fitwith_seo_strategy', 'fitwith_content_strategy', 
                         'inspiration_potential', 'collaboration_potential', 'innovation', 'difficulty']:
            if criterion in scores:
                scores[criterion] = max(1, min(10, scores[criterion]))
        
        # Calculate total score
        total_score = sum([
            scores.get('usefulness_potential', 5),
            scores.get('fitwith_seo_strategy', 5),
            scores.get('fitwith_content_strategy', 5),
            scores.get('inspiration_potential', 5),
            scores.get('collaboration_potential', 5),
            scores.get('innovation', 5),
            scores.get('difficulty', 5)
        ])
        
        scores['total_score'] = total_score
        return scores
        
    except Exception as e:
        print(f"‚ùå Error scoring idea: {e}")
        return {
            "usefulness_potential": 5, "fitwith_seo_strategy": 5, "fitwith_content_strategy": 5,
            "inspiration_potential": 5, "collaboration_potential": 5, "innovation": 5,
            "difficulty": 5, "total_score": 35, "reasoning": f"Default scores due to error: {str(e)}"
        }

print("‚úÖ Updated LLM scoring engine with content strategy context")

‚úÖ Updated LLM scoring engine with content strategy context


In [20]:
# Cell 6: Define LangGraph Nodes
def transcription_node(state: AudioPipelineState) -> AudioPipelineState:
    """Node 1: Transcribe audio file with AssemblyAI"""
    try:
        print(f"üéôÔ∏è Transcribing: {state['filename']}")
        
        # Configure transcriber
        transcriber = aai.Transcriber()
        
        # Transcribe the file
        transcript = transcriber.transcribe(state['file_path'])
        
        if transcript.status == aai.TranscriptStatus.error:
            return {
                **state,
                "error": f"AssemblyAI error: {transcript.error}",
                "status": "transcription_failed"
            }
        
        return {
            **state,
            "transcript_text": transcript.text,
            "status": "transcribed"
        }
        
    except Exception as e:
        return {
            **state,
            "error": f"Transcription error: {str(e)}",
            "status": "transcription_failed"
        }

def database_saver_node_conversations(state: AudioPipelineState) -> AudioPipelineState:
    """Node 2: Save transcript to database"""
    try:
        print(f"üíæ Saving to database: {state['filename']}")
        
        # Create conversation object
        conversation = ConversationCreate(
            title=f"Audio: {state['filename']}",
            raw_text=state['transcript_text'],
            source="transcribed"
        )
        
        # Save to database
        conversation_id = db.create_conversation(conversation)
        
        return {
            **state,
            "conversation_id": conversation_id,
            "status": "completed"
        }
        
    except Exception as e:
        return {
            **state,
            "error": f"Database error: {str(e)}",
            "status": "database_failed"
        }

print("‚úÖ LangGraph nodes defined")

‚úÖ LangGraph nodes defined


In [21]:
# Cell: Fixed pain_extractor_node (minimal change)
def pain_extractor_node(state: AudioPipelineState) -> AudioPipelineState:
    """
    LangGraph node: Extract structured insights from conversation transcript
    FIXED: Falls back to database if transcript_text not in state
    """
    print("üß† Starting pain extraction...")
    
    try:
        # Try to get transcript from state first
        transcript = state.get('transcript_text')
        
        # ADDED: If not in state, get from database using conversation_id
        if not transcript:
            conversation_id = state.get('conversation_id')
            if conversation_id:
                print(f"   üìù Transcript not in state, loading from database (conversation {conversation_id})...")
                conv = db.get_conversation(conversation_id)
                if conv:
                    # Try raw_text field (your database schema)
                    transcript = get_conv_attribute(conv, 'raw_text', None)
                    if transcript:
                        print(f"   ‚úÖ Loaded transcript from database ({len(transcript)} chars)")
                    else:
                        print(f"   ‚ö†Ô∏è  No raw_text found in conversation")
        
        # If still no transcript, fail
        if not transcript:
            print("‚ùå No transcript available")
            return {
                **state,
                "error": "No transcript available for pain extraction",
                "status": "error"
            }
        
        # Extract insights using OpenAI structured output
        insights = extract_insights_from_transcript(transcript)
        
        if insights:
            print(f"‚úÖ Extracted insights: {len(insights.primary_challenges)} primary challenges, {len(insights.speakers)} speakers")
            
            return {
                **state,
                "extracted_insights": insights,
                "status": "insights_extracted"
            }
        else:
            return {
                **state,
                "error": "Failed to extract insights from transcript",
                "status": "error"
            }
            
    except Exception as e:
        print(f"‚ùå Pain extraction failed: {e}")
        import traceback
        traceback.print_exc()
        return {
            **state,
            "error": f"Pain extraction error: {str(e)}",
            "status": "error"
        }

print("‚úÖ pain_extractor_node fixed (now checks database for transcript)")

‚úÖ pain_extractor_node fixed (now checks database for transcript)


In [22]:
# Cell: Creative Agent Node - FORCE RELOAD
def creative_agent_node(state: AudioPipelineState) -> AudioPipelineState:
    """Creative agent that generates raw blog ideas"""
    
    try:
        print("üé® Starting creative blog idea generation...")
        
        insights = state.get('extracted_insights')
        if not insights:
            return {**state, "error": "No insights available", "status": "error"}
        
        print(f"üìä Working with insights: {len(insights.primary_challenges)} challenges")
        
        # Load strategy context
        strategy_context = load_company_strategy_context()
        
        # Generate ideas (returns JSON list)
        raw_ideas_json = generate_blog_ideas_from_insights(insights, strategy_context)
        
        if not raw_ideas_json:
            return {**state, "error": "No ideas generated", "status": "error"}
        
        # Convert to Pydantic for validation
        validated_ideas = []
        for idea_json in raw_ideas_json:
            try:
                idea = RawBlogIdea(**idea_json)
                validated_ideas.append(idea)
            except Exception as e:
                print(f"‚ö†Ô∏è Skipping invalid idea: {e}")
        
        if validated_ideas:
            print(f"üéâ Generated {len(validated_ideas)} valid blog ideas")
            
            # Convert back to dict for state storage
            ideas_as_dicts = [idea.model_dump() for idea in validated_ideas]
            
            return {
                **state,
                "raw_blog_ideas": ideas_as_dicts,
                "status": "ideas_generated"
            }
        else:
            return {**state, "error": "No valid ideas after validation", "status": "error"}
            
    except Exception as e:
        print(f"‚ùå Creative agent error: {e}")
        import traceback
        traceback.print_exc()
        return {**state, "error": str(e), "status": "error"}

print("‚úÖ Creative agent node RELOADED")

‚úÖ Creative agent node RELOADED


In [23]:
def writing_agent_node(state: AudioPipelineState) -> AudioPipelineState:
    """Future node: Generate blog post from plan."""
    plan = state.get("blog_plan")
    if not plan:
        return state  # Skip if no plan
    instructions = f"Write a full blog post for Big Kids Automation Agency using this plan: {plan.plan}"
    # Structured LLM (inspired by your snippet)
    structured_llm = llm.with_structured_output(BlogPost)  # Use your BlogPost model
    blog_post = structured_llm.invoke([SystemMessage(content=instructions), HumanMessage(content="Write the post.")])
    
    # Updated: Store the FULL BlogPost instance (not just a dict)
    state["blog_post"] = blog_post  # Now includes id, issue, angle, etc.
    
    return state


In [24]:
# Cell 19: Analyst Agent Node - FIXED for Pydantic Objects
def analyst_agent_node(state: AudioPipelineState) -> AudioPipelineState:
    """
    LangGraph node that scores blog ideas using company strategy context
    Input: state["raw_blog_ideas"] 
    Output: state["scored_blog_ideas"]
    """
    
    try:
        print("üîç Starting analyst agent - scoring blog ideas...")
        
        # Check current status
        current_status = state.get('status', '')
        print(f"üìä Input status: {current_status}")
        
        # Check if we have raw blog ideas to score
        raw_ideas = state.get('raw_blog_ideas')
        if not raw_ideas:
            return {
                **state,
                "error": "No raw blog ideas available for scoring",
                "status": "error"
            }
        
        print(f"üìä Found {len(raw_ideas)} blog ideas to score")
        
        # Load strategy context for scoring
        print("üìö Loading strategy context...")
        strategy_context = prepare_strategy_context_for_scoring()
        
        # Get conversation context for better scoring
        conversation_context = state.get('transcript_text', '')
        
        # Score each blog idea
        scored_ideas = []
        for i, idea in enumerate(raw_ideas, 1):
            # FIXED: Handle both Pydantic objects and dicts properly
            if hasattr(idea, 'title'):
                # It's a Pydantic object - convert to dict first
                idea_dict = idea.model_dump() if hasattr(idea, 'model_dump') else idea.__dict__
                title_preview = idea.title[:50]
            else:
                # It's already a dict
                idea_dict = idea
                title_preview = idea.get('title', 'No title')[:50]
            
            print(f"üîç Scoring idea {i}/{len(raw_ideas)}: {title_preview}...")
            
            # Score the idea (now always working with dict)
            scores = score_blog_idea_with_llm(idea_dict, strategy_context, conversation_context)
            
            # Combine original idea with scores
            scored_idea = {
                **idea_dict,  # Original idea data (now definitely a dict)
                **scores      # Scoring data
            }
            
            scored_ideas.append(scored_idea)
            
            print(f"   ‚úÖ Scored: {scores.get('total_score', 0)}/70 points")
        
        # Sort by total score (highest first)
        scored_ideas.sort(key=lambda x: x.get('total_score', 0), reverse=True)
        
        print(f"\nüéâ Analyst agent completed scoring!")
        print(f"üìä Scored {len(scored_ideas)} ideas")
        
        if scored_ideas:
            print(f"üèÜ Top idea: '{scored_ideas[0].get('title', 'Unknown')[:50]}...' ({scored_ideas[0].get('total_score', 0)}/70)")
            print(f"üìâ Lowest idea: '{scored_ideas[-1].get('title', 'Unknown')[:50]}...' ({scored_ideas[-1].get('total_score', 0)}/70)")
        
        return {
            **state,
            "scored_blog_ideas": scored_ideas,
            "status": "ideas_scored"
        }
        
    except Exception as e:
        print(f"‚ùå Error in analyst agent node: {e}")
        import traceback
        traceback.print_exc()
        
        return {
            **state,
            "error": f"Analyst agent error: {str(e)}",
            "status": "error"
        }

print("‚úÖ Analyst agent node FIXED for Pydantic objects")

‚úÖ Analyst agent node FIXED for Pydantic objects


In [25]:
# Cell 4: Test AssemblyAI Connection
# Configure AssemblyAI
aai.settings.api_key = os.getenv('ASSEMBLYAI_API_KEY')

# Test with a simple transcription (we'll use a file from temp folder)
def test_assemblyai_connection():
    """Test if AssemblyAI is working"""
    try:
        # Just test the API key is valid
        transcriber = aai.Transcriber()
        print("‚úÖ AssemblyAI connection successful")
        return True
    except Exception as e:
        print(f"‚ùå AssemblyAI connection failed: {e}")
        return False

test_assemblyai_connection()

‚úÖ AssemblyAI connection successful


True

In [26]:
# Cell: Database Saver Node for BLOG IDEAS (Node 6) - FIXED RETURN
from database.models import BlogPostIdeaCreate

def database_saver_node(state: AudioPipelineState) -> AudioPipelineState:
    """
    LangGraph node that saves scored blog ideas to database
    Input: state["scored_blog_ideas"]
    Output: state["saved_idea_ids"]
    """
    
    try:
        print("üíæ Starting database saver - saving scored blog ideas...")
        
        scored_ideas = state.get('scored_blog_ideas')
        conversation_id = state.get('conversation_id')
        
        if not scored_ideas:
            print("‚ùå No scored blog ideas available to save")
            return {
                **state,
                "error": "No scored blog ideas available to save",
                "status": "error"
            }
        
        if not conversation_id:
            print("‚ùå No conversation_id available for linking ideas")
            return {
                **state,
                "error": "No conversation_id available for linking ideas",
                "status": "error"
            }
        
        print(f"üìä Found {len(scored_ideas)} scored ideas to save")
        print(f"üîó Linking ideas to conversation_id: {conversation_id}")
        
        saved_idea_ids = []
        failed_count = 0
        
        for i, scored_idea in enumerate(scored_ideas, 1):
            try:
                # Calculate total_score if not present
                if 'total_score' not in scored_idea:
                    scored_idea['total_score'] = sum([
                        scored_idea.get('usefulness_potential', 0),
                        scored_idea.get('fitwith_seo_strategy', 0),
                        scored_idea.get('fitwith_content_strategy', 0),
                        scored_idea.get('inspiration_potential', 0),
                        scored_idea.get('collaboration_potential', 0),
                        scored_idea.get('innovation', 0),
                        scored_idea.get('difficulty', 0)
                    ])
                
                blog_idea = BlogPostIdeaCreate(
                    conversation_id=conversation_id,
                    title=scored_idea.get('title', 'Untitled'),
                    description=scored_idea.get('description', ''),
                    usefulness_potential=scored_idea.get('usefulness_potential', 5),
                    fitwith_seo_strategy=scored_idea.get('fitwith_seo_strategy', 5),
                    fitwith_content_strategy=scored_idea.get('fitwith_content_strategy', 5),
                    inspiration_potential=scored_idea.get('inspiration_potential', 5),
                    collaboration_potential=scored_idea.get('collaboration_potential', 5),
                    innovation=scored_idea.get('innovation', 5),
                    difficulty=scored_idea.get('difficulty', 5),
                    sent_to_prod=False,
                    raw_llm_response=scored_idea.get('reasoning', None)
                )
                
                idea_id = db.create_blog_post_idea(blog_idea)
                saved_idea_ids.append(idea_id)
                
                print(f"   ‚úÖ Saved idea {i}: '{scored_idea.get('title', 'Unknown')[:50]}...' (ID: {idea_id})")
                
            except Exception as e:
                print(f"   ‚ùå Failed to save idea {i}: {e}")
                failed_count += 1
        
        if saved_idea_ids:
            print(f"\nüéâ Database saver completed!")
            print(f"‚úÖ Successfully saved: {len(saved_idea_ids)} ideas")
            if failed_count > 0:
                print(f"‚ö†Ô∏è  Failed to save: {failed_count} ideas")
            
            # FIXED: Explicitly return saved_idea_ids in the state
            return {
                **state,
                "saved_idea_ids": saved_idea_ids,  # ‚Üê This is the critical line
                "status": "ideas_saved_to_db"
            }
        else:
            print("‚ùå Failed to save any ideas to database")
            return {
                **state,
                "saved_idea_ids": [],  # Return empty list instead of None
                "error": "Failed to save any ideas to database",
                "status": "error"
            }
            
    except Exception as e:
        print(f"‚ùå Error in database saver node: {e}")
        import traceback
        traceback.print_exc()
        
        return {
            **state,
            "saved_idea_ids": [],  # Return empty list on error
            "error": f"Database saver error: {str(e)}",
            "status": "error"
        }

print("‚úÖ Database saver node (blog ideas) ready - FIXED RETURN")

‚úÖ Database saver node (blog ideas) ready - FIXED RETURN


In [27]:
# Cell 21: Updated Pipeline Builder - Now with 8 Nodes
from langgraph.checkpoint.memory import MemorySaver

def build_pipeline():
    workflow = StateGraph(AudioPipelineState)
    # Existing nodes 
    workflow.add_node("transcribe", transcription_node)
    workflow.add_node("save_to_db", database_saver_node_conversations)  
    workflow.add_node("extract_insights", pain_extractor_node)
    workflow.add_node("creative_agent", creative_agent_node)
    workflow.add_node("analyst_agent", analyst_agent_node)
    workflow.add_node("save_ideas", database_saver_node)
    workflow.add_node("idea_selection_hitl", idea_selection_hitl)
    workflow.add_node("planning_agent", planning_agent_node)
    workflow.add_node("writing_agent", writing_agent_node)
    
    # Existing edges
    workflow.add_edge("transcribe", "save_to_db")
    workflow.add_edge("save_to_db", "extract_insights")
    workflow.add_edge("extract_insights", "creative_agent")
    workflow.add_edge("creative_agent", "analyst_agent")    
    workflow.add_edge("analyst_agent", "save_ideas")
    workflow.add_edge("save_ideas", "idea_selection_hitl")
    workflow.add_edge("idea_selection_hitl", "planning_agent")
    workflow.add_edge("planning_agent", "writing_agent")
    
    workflow.set_entry_point("transcribe")
    workflow.set_finish_point("writing_agent")
    
    # Compile with checkpointer for HITL persistence
    memory = MemorySaver()
    return workflow.compile(checkpointer=memory, interrupt_before=["idea_selection_hitl"])


# In builder: Add node and edge

#
#workflow.set_finish_point("writing_agent")

# Rebuild
pipeline = build_pipeline()
print("‚úÖ Pipeline compiled (8 nodes, with HITL interrupt)")

‚úÖ Pipeline compiled (8 nodes, with HITL interrupt)


In [28]:
# Cell 7: Setup Anthropic LLM for Insights Extraction (FIXED)


# Initialize Anthropic with correct model name
anthropic_key = os.getenv('ANTHROPIC_API_KEY')
if not anthropic_key:
    print("‚ö†Ô∏è  ANTHROPIC_API_KEY not found in .env file")
    print("Please add: ANTHROPIC_API_KEY=your_key_here")
else:
    llm = ChatAnthropic(
        model="claude-haiku-4-5",  # ‚Üê Updated model name
        api_key=anthropic_key,
        temperature=0.7,
        max_tokens=8192,
    )
    print("‚úÖ Anthropic LSLM initialized with Claude Haiku 4.5")

‚úÖ Anthropic LSLM initialized with Claude Haiku 4.5


In [29]:
## 3. PainExtractor Node Implementation

# System prompt
PAIN_EXTRACTOR_SYSTEM_PROMPT = """
You are a UX researcher and business analyst for BigKids Automation. Your job is listening to transcripts from interviews with users and potential clients. 

You pay special attention to problems that users have regarding how their company is automating, using web apps and AI to save time and move towards a more ethical and sovereign tech infrastructure.

You will be given the transcript of an interview with a user or potential client.

Your task is to extract structured information about:
- Who is speaking and their role
- What this person cares about (values, priorities)
- Their main primary and secondary challenges
- How they are solving problems today
- Are there AI agents that can assist them?
- Their underlying psychological needs (using frameworks like NVC - Non-Violent Communication)

Focus on automation, web apps, AI, time-saving, ethical tech, and sovereign infrastructure themes.

Be thorough but concise. 

IMPORTANT: Only extract information that is explicitly mentioned in the transcript. 
If information is not clearly stated, leave the field empty/null rather than guessing or inferring.
Do not hallucinate or make assumptions about missing information.
"""

In [30]:
# Cell 16: Enhanced Strategy Context for Scoring (Updated for 3 Documents)
def prepare_strategy_context_for_scoring():
    """Prepare strategy context for scoring using all three strategy documents"""
    
    # Load all three strategy documents
    strategy_context = load_company_strategy_context()
    
    # Add scoring guidelines
    strategy_context["scoring_guidelines"] = """
    SCORING CRITERIA (1-10 scale):
    
    1. usefulness_potential: How useful will this post be to readers with problems?
    2. fitwith_seo_strategy: How well does this align with our SEO strategy and keywords?
    3. fitwith_content_strategy: How well does this fit our content strategy and voice?
    4. inspiration_potential: How likely is this to inspire readers to take action?
    5. collaboration_potential: How likely is this to encourage prospects to contact us?
    6. innovation: How unique/differentiated is this topic (10 = very unique)?
    7. difficulty: How complex is this to write (1 = very complex, 10 = easy)?
    """
    
    # Create summaries for LLM prompt efficiency (all three documents)
    if strategy_context.get('company_strategy'):
        strategy_context["company_strategy_summary"] = strategy_context['company_strategy'][:800] + "..."
    
    if strategy_context.get('seo_strategy'):
        strategy_context["seo_strategy_summary"] = strategy_context['seo_strategy'][:600] + "..."
    
    if strategy_context.get('content_strategy'):  # NEW
        strategy_context["content_strategy_summary"] = strategy_context['content_strategy'][:600] + "..."
    
    print(f"‚úÖ Enhanced strategy context for scoring with 3 documents")
    print(f"   Company strategy: {len(strategy_context.get('company_strategy', ''))} chars")
    print(f"   SEO strategy: {len(strategy_context.get('seo_strategy', ''))} chars")
    print(f"   Content strategy: {len(strategy_context.get('content_strategy', ''))} chars")
    
    return strategy_context

# Test the enhanced context
enhanced_context = prepare_strategy_context_for_scoring()

‚ö†Ô∏è Company strategy document not found
‚úÖ Loaded SEO strategy (1120 chars)
‚úÖ Loaded content strategy (4473 chars)
‚úÖ Enhanced strategy context for scoring with 3 documents
   Company strategy: 40 chars
   SEO strategy: 1120 chars
   Content strategy: 4473 chars


In [31]:
# Cell: Fixed Test Function (No Duplicate Loading)
def test_three_document_loading():
    """Test loading all three strategy documents (optimized)"""
    
    print("üß™ Testing three-document strategy loading...")
    
    # Load documents once and enhance
    enhanced = prepare_strategy_context_for_scoring()  # This calls load_company_strategy_context() internally
    
    print(f"\nüìä DOCUMENT SUMMARY:")
    for doc_type in ['company_strategy', 'seo_strategy', 'content_strategy']:
        if doc_type in enhanced:
            length = len(enhanced[doc_type]) if enhanced[doc_type] else 0
            status = "‚úÖ Loaded" if length > 100 else "‚ö†Ô∏è Missing/Short"
            print(f"   {doc_type}: {status} ({length} chars)")
    
    return enhanced

# Test with no duplicates
test_context = test_three_document_loading()

üß™ Testing three-document strategy loading...
‚ö†Ô∏è Company strategy document not found
‚úÖ Loaded SEO strategy (1120 chars)
‚úÖ Loaded content strategy (4473 chars)
‚úÖ Enhanced strategy context for scoring with 3 documents
   Company strategy: 40 chars
   SEO strategy: 1120 chars
   Content strategy: 4473 chars

üìä DOCUMENT SUMMARY:
   company_strategy: ‚ö†Ô∏è Missing/Short (40 chars)
   seo_strategy: ‚úÖ Loaded (1120 chars)
   content_strategy: ‚úÖ Loaded (4473 chars)


In [32]:
# ============================================================
# CELL 22: EXECUTE COMPLETE 9-NODE PIPELINE TEST (WITH HITL)
# ============================================================

import uuid  # For generating unique thread_ids

def test_complete_9_node_pipeline():
    """Test the complete 8-node pipeline: Audio ‚Üí Transcribe ‚Üí Save ‚Üí Insights ‚Üí Ideas ‚Üí Scoring ‚Üí Save Ideas ‚Üí HITL ‚Üí Planning ‚Üí (Optional) Writing"""
    print("üß™ EXECUTING COMPLETE 9-NODE PIPELINE TEST...")
    print("Audio ‚Üí Transcribe ‚Üí Save ‚Üí Insights ‚Üí Ideas ‚Üí Scoring ‚Üí Save Ideas ‚Üí HITL ‚Üí Planning ‚Üí (Optional) Writing")
    print("-" * 60)
    
    # Find audio files
    audio_files = find_audio_files_in_temp()
    if not audio_files:
        print("‚ùå No audio files found in data/temp/")
        print("üí° Add .wav files to data/temp/ and run this cell again")
        return None
    print(f"üìÅ Found {len(audio_files)} audio file(s)")
    print(f"üéØ Testing with: {audio_files[0].name}")
    print(f"üìä File size: {audio_files[0].stat().st_size / 1024:.1f} KB")
    
    # Create initial state for 8-node pipeline (extended from your 6-node version)
    initial_state = {
        "file_path": str(audio_files[0]),
        "filename": audio_files[0].name,
        "transcript_text": None,
        "conversation_id": None,
        "extracted_insights": None,
        "raw_blog_ideas": None,
        "scored_blog_ideas": None,
        "saved_idea_ids": None,
        "selected_idea_id": None,     # NEW: For HITL
        "selected_idea": None,        # NEW: For HITL
        "strategy_context": None,     # NEW: For planning
        "blog_plan": None,            # NEW: For planning
        "blog_post": None,            # NEW: For future writing (optional)
        "error": None,
        "status": "processing"
    }
    
    try:
        print("\nüé¨ STARTING COMPLETE PIPELINE EXECUTION...")
        print("=" * 60)
        
        # Generate unique thread_id for persistence
        thread_id = str(uuid.uuid4())
        config = {"configurable": {"thread_id": thread_id}}
        
        # Run the pipeline up to the interrupt (after Node 6)
        print("üèÉ Running up to HITL interrupt...")
        pipeline.invoke(initial_state, config=config)
        
        # Simulate HITL: Get current state, prompt for selection, update, and resume
        print("\nü§ù HITL SIMULATION: Paused for idea selection")
        current_state = pipeline.get_state(config).values
        scored_ideas = current_state.get("scored_blog_ideas", [])
        saved_ids = current_state.get("saved_idea_ids", [])
        print(f"   Available Ideas (Saved IDs): {saved_ids}")
        print(f"   Scored Ideas Preview: {[idea.get('title', 'No title') for idea in scored_ideas]}")
        
        # Prompt for human input (in dev; replace with UI/API in prod)
        selected_id = int(input("Enter selected idea ID (from saved_ids): "))  # Or hardcoded for auto-test: e.g., saved_ids[0]
        selected_idea = next((idea for idea in scored_ideas if idea.get("id") == selected_id), {})  # Adjust key if needed
        
        # Update state with selection
        updated_state = current_state.copy()
        updated_state["selected_idea_id"] = selected_id
        updated_state["selected_idea"] = selected_idea
        pipeline.update_state(config, updated_state)
        print(f"   ‚úÖ HITL Complete: Selected Idea ID {selected_id}")
        
        # Resume the pipeline (runs HITL node, planning, and optional writing)
        print("üèÉ Resuming pipeline from HITL...")
        final_state = pipeline.invoke(None, config=config)  # None as input to resume
        
        print(f"\nüìä COMPLETE PIPELINE RESULTS:")
        print("=" * 60)
        print(f"üéØ Final Status: {final_state.get('status')}")
        print(f"üìù Conversation ID: {final_state.get('conversation_id')}")
        print(f"üíæ Saved Blog Idea IDs: {final_state.get('saved_idea_ids')}")
        
        # Check all pipeline stages (extended for new nodes)
        print(f"\nüìã STAGE RESULTS:")
        stages = [
            ("üéôÔ∏è  Transcription", final_state.get('transcript_text')),
            ("üíæ Database Save (Conversation)", final_state.get('conversation_id')),
            ("üß† Insights Extraction", final_state.get('extracted_insights')),
            ("üé® Blog Ideas Generation", final_state.get('raw_blog_ideas')),
            ("üîç Blog Ideas Scoring", final_state.get('scored_blog_ideas')),
            ("üíæ Database Save (Ideas)", final_state.get('saved_idea_ids')),
            ("ü§ù HITL Idea Selection", final_state.get('selected_idea_id') is not None),  
            ("üìù Planning", final_state.get('blog_plan')),  
            ("‚úçÔ∏è Writing", final_state.get('blog_post'))  
        ]
        all_passed = True
        for stage_name, stage_data in stages:
            status = "‚úÖ" if stage_data else "‚ùå"
            print(f"   {stage_name}: {status}")
            if not stage_data:
                all_passed = False
        
        # Show detailed results if all stages passed
        if all_passed and final_state.get('scored_blog_ideas') and final_state.get('saved_idea_ids'):
            scored_ideas = final_state['scored_blog_ideas']
            saved_ids = final_state['saved_idea_ids']
            print(f"\nüéâ COMPLETE SUCCESS! Pipeline generated, scored, saved, selected, and planned {len(saved_ids)} blog ideas")
            print("=" * 80)
            
            # Existing sections (conversation, insights, DB summary, top ideas) -- unchanged from your code
            # (Insert your existing print blocks here for conversation details, insights summary, DB save summary, and top scored ideas)
            
            # NEW: Show HITL and planning results
            print(f"\nü§ù HITL Selection:")
            print(f"   Selected Idea ID: {final_state.get('selected_idea_id')}")
            print(f"   Selected Idea Title: {final_state.get('selected_idea', {}).get('title', 'N/A')}")
            
            print(f"\nüìù Generated Blog Plan:")
            blog_plan = final_state.get('blog_plan')
            if blog_plan:
                print(blog_plan.plan)  # Uses the @property from Plan
                print(f"   Q&A Pairs Count: {len(blog_plan.qa_pairs)}")
                print(f"   Instructions Count: {len(blog_plan.instructions)}")
            else:
                print("   No plan generated")
            
            # NEW: If writing node is enabled
            # blog_post = final_state.get('blog_post')
            # if blog_post:
            #     print(f"\n‚úçÔ∏è Generated Blog Post:")
            #     print(f"   Title: {blog_post.get('title')}")
            #     print(f"   Content Preview: {blog_post.get('content')[:200]}...")
            
            print("=" * 80)
            print("üéâ COMPLETE 9-NODE PIPELINE: SUCCESS!")
            print("‚úÖ All stages completed successfully")
            print(f"üíæ Conversation saved (ID: {final_state.get('conversation_id')})")
            print(f"üíæ {len(saved_ids)} blog ideas saved to database")
            print(f"üîó Ideas linked to conversation for traceability")
            
            # Extended NEXT STEPS
            print(f"\nüí° NEXT STEPS:")
            print(f"   ‚Ä¢ Query saved ideas: db.get_blog_post_ideas_by_conversation({final_state.get('conversation_id')})")
            print(f"   ‚Ä¢ View conversation: db.get_conversation({final_state.get('conversation_id')})")
            print(f"   ‚Ä¢ Access specific idea: db.get_blog_post_idea({saved_ids[0]})")
            print(f"   ‚Ä¢ Review plan: Access final_state['blog_plan']")
        
        else:
            # Something failed (unchanged from your code)
            print("\n‚ùå PIPELINE INCOMPLETE")
            print("=" * 50)
            if final_state.get('error'):
                print(f"‚ùå Error: {final_state.get('error')}")
            else:
                print("‚ùå Pipeline stopped but no error message provided")
            # Extended DEBUG INFO
            print(f"\nüîç DEBUG INFO:")
            print(f"   Transcript exists: {bool(final_state.get('transcript_text'))}")
            if final_state.get('transcript_text'):
                print(f"   Transcript preview: {final_state.get('transcript_text')[:100]}...")
            print(f"   Conversation ID: {final_state.get('conversation_id')}")
            print(f"   Insights exist: {bool(final_state.get('extracted_insights'))}")
            print(f"   Raw ideas exist: {bool(final_state.get('raw_blog_ideas'))}")
            if final_state.get('raw_blog_ideas'):
                print(f"   Raw ideas count: {len(final_state.get('raw_blog_ideas'))}")
            print(f"   Scored ideas exist: {bool(final_state.get('scored_blog_ideas'))}")
            if final_state.get('scored_blog_ideas'):
                print(f"   Scored ideas count: {len(final_state.get('scored_blog_ideas'))}")
            print(f"   Saved idea IDs exist: {bool(final_state.get('saved_idea_ids'))}")
            if final_state.get('saved_idea_ids'):
                print(f"   Saved ideas count: {len(final_state.get('saved_idea_ids'))}")
            # NEW: Debug for new fields
            print(f"   Selected Idea ID: {final_state.get('selected_idea_id')}")
            print(f"   Blog Plan exists: {bool(final_state.get('blog_plan'))}")
            if final_state.get('blog_post'):
                blog_post = final_state['blog_post']
                print(f"\n‚úçÔ∏è Generated Blog Post:")
                print(f"   Title: {blog_post.title}")
                print(f"   Issue: {blog_post.issue}")
                print(f"   Single Message: {blog_post.single_message}")
                print(f"   Content Preview: {blog_post.content[:200]}...")  # Truncate for brevity
                print(f"   Status: {blog_post.status}")
        
        return final_state
    
    except Exception as e:
        print(f"\n‚ùå COMPLETE PIPELINE FAILED WITH EXCEPTION:")
        print(f"   {str(e)}")
        import traceback
        traceback.print_exc()
        return None

# ============================================================
# RUN THE TEST
# ============================================================

print("üöÄ Ready to test complete 9-node pipeline")
print("üí° Run the cell below to execute the test\n")

# Uncomment the line below to run automatically, or run it manually
# test_result = test_complete_8_node_pipeline()

üöÄ Ready to test complete 9-node pipeline
üí° Run the cell below to execute the test



In [42]:
# Uncomment the line below to run automatically, or run it manually
test_result = test_complete_9_node_pipeline()

üß™ EXECUTING COMPLETE 9-NODE PIPELINE TEST...
Audio ‚Üí Transcribe ‚Üí Save ‚Üí Insights ‚Üí Ideas ‚Üí Scoring ‚Üí Save Ideas ‚Üí HITL ‚Üí Planning ‚Üí (Optional) Writing
------------------------------------------------------------
üìÅ Found 1 audio file(s)
üéØ Testing with: blog_record (2026-01-07 15_11_23).wav
üìä File size: 133727.3 KB

üé¨ STARTING COMPLETE PIPELINE EXECUTION...
üèÉ Running up to HITL interrupt...
üéôÔ∏è Transcribing: blog_record (2026-01-07 15_11_23).wav
üíæ Saving to database: blog_record (2026-01-07 15_11_23).wav
üß† Starting pain extraction...
üìù Raw response length: 6715 chars
üìù Response starts with: ```json
{
    "speakers": [
        {
            ...
üîß Removing JSON markdown blocks...
üîß Cleaned content starts with: {
    "speakers": [
        {
            "name": ...
‚úÖ JSON parsed successfully
‚úÖ Successfully extracted insights!
   Speakers: 2
   Challenges: 4
   Needs: 7
   Values: 7
‚úÖ Extracted insights: 4 primary challenges, 2 

In [43]:
# Assuming you ran: test_result = test_complete_8_node_pipeline()
# If not, re-run it now with assignment

if test_result and 'blog_plan' in test_result:
    blog_plan = test_result['blog_plan']  # Access the Plan instance
    
    # Option 1: Print the formatted plan (using the @property)
    print("Formatted Blog Plan:\n")
    print(blog_plan.plan)  # This outputs the nice string with sections, Q&A, and instructions
    
    # Option 2: Print individual fields for detailed review
    print("\nIndividual Fields:")
    print(f"Who: {blog_plan.who}")
    print(f"Why: {blog_plan.why}")
    print(f"What: {blog_plan.what}")
    print(f"The Issue: {blog_plan.the_issue}")
    print(f"Where We Stand: {blog_plan.where_we_stand}")
    print(f"Single Message: {blog_plan.single_message}")
    print("\nQ&A Pairs:")
    for pair in blog_plan.qa_pairs:
        print(f"  Q: {pair['question']}  A: {pair['answer']}")
    print("\nInstructions:")
    for instr in blog_plan.instructions:
        print(f"  - {instr}")
    
    # Option 3: Convert to dict/JSON for export or further processing
    plan_dict = blog_plan.model_dump()  # Pydantic method to get a dict
    print("\nPlan as Dict:")
    print(plan_dict)
    
    # Bonus: Save to file (e.g., for external review)
    import json
    with open("blog_plan.json", "w") as f:
        json.dump(plan_dict, f, indent=4)
    print("\nSaved to blog_plan.json")
else:
    print("No blog_plan found‚Äîcheck if the test completed successfully or re-run it.")

Formatted Blog Plan:

Who: SME owners and managers, particularly those running mission-driven organizations (like NGOs and activist groups) who are frustrated with their current tech stack, concerned about data privacy and GDPR compliance, and seeking alternatives to Big Tech solutions. Specifically, decision-makers who feel trapped between operational necessity and ethical values.
Why: To demonstrate how automation and GenAI can help mission-driven SMEs reduce dependency on untrustworthy tech giants, improve data privacy compliance, and free up their limited technical resources to focus on their core mission. This positions Big Kids Automation Agency as a trusted partner that understands the unique challenges of values-aligned organizations seeking sustainable, ethical tech infrastructure.
What: 1) The data privacy crisis facing SMEs relying on Big Tech platforms (Google Suite, Facebook, etc.)
2) GDPR compliance challenges and risks for organizations handling sensitive stakeholder dat

In [44]:
# Cell: Simple list_conversations - No helper functions needed
def list_conversations():
    """
    List all conversations in the database
    Simple version that works without helper functions
    """
    
    conversations = db.get_all_conversations()
    
    if not conversations:
        print("‚ö†Ô∏è  No conversations found in database")
        return
    
    print("\n" + "=" * 100)
    print("üí¨ ALL CONVERSATIONS IN DATABASE")
    print("=" * 100)
    
    for conv in conversations:
        # Handle both dict and Pydantic model
        if isinstance(conv, dict):
            conv_id = conv.get('id', 'Unknown')
            title = conv.get('title', 'Untitled')
            raw_text = conv.get('raw_text', '')
            created = conv.get('created_at', 'Unknown')
            word_count = conv.get('word_count', 0)
        else:
            # It's a Pydantic model - use model_dump()
            data = conv.model_dump()
            conv_id = data.get('id', 'Unknown')
            title = data.get('title', 'Untitled')
            raw_text = data.get('raw_text', '')
            created = data.get('created_at', 'Unknown')
            word_count = data.get('word_count', 0)
        
        transcript_len = len(raw_text) if raw_text else 0
        
        # Get idea count for this conversation
        ideas = db.get_ideas_by_conversation(conv_id)
        idea_count = len(ideas) if ideas else 0
        
        # Calculate average score if ideas exist
        if ideas and idea_count > 0:
            total_scores = []
            for idea in ideas:
                if isinstance(idea, dict):
                    score = sum([
                        idea.get('usefulness_potential', 0),
                        idea.get('fitwith_seo_strategy', 0),
                        idea.get('fitwith_content_strategy', 0),
                        idea.get('inspiration_potential', 0),
                        idea.get('collaboration_potential', 0),
                        idea.get('innovation', 0),
                        idea.get('difficulty', 0)
                    ])
                else:
                    data = idea.model_dump() if hasattr(idea, 'model_dump') else idea.__dict__
                    score = sum([
                        data.get('usefulness_potential', 0),
                        data.get('fitwith_seo_strategy', 0),
                        data.get('fitwith_content_strategy', 0),
                        data.get('inspiration_potential', 0),
                        data.get('collaboration_potential', 0),
                        data.get('innovation', 0),
                        data.get('difficulty', 0)
                    ])
                total_scores.append(score)
            
            avg_score = sum(total_scores) / len(total_scores)
            score_info = f"Avg Score: {avg_score:.1f}/70"
        else:
            score_info = "No ideas yet"
        
        print(f"\nüìÅ ID: {conv_id}")
        print(f"   üìù Title: {title}")
        print(f"   üìÑ Transcript: {transcript_len} chars ({word_count} words)")
        print(f"   üí° Ideas: {idea_count} | {score_info}")
        print(f"   üìÖ Created: {created}")
        print(f"   üîç View: quick_view({conv_id})")
    
    print("\n" + "=" * 100)
    print(f"üí° Total Conversations: {len(conversations)}")
    print("=" * 100 + "\n")

print("‚úÖ list_conversations() ready (standalone version)")

‚úÖ list_conversations() ready (standalone version)


In [45]:
# Or just list them first
list_conversations()


üí¨ ALL CONVERSATIONS IN DATABASE

üìÅ ID: 41
   üìù Title: Audio: blog_record (2026-01-07 15_11_23).wav
   üìÑ Transcript: 10553 chars (1910 words)
   üí° Ideas: 5 | Avg Score: 48.2/70
   üìÖ Created: 2026-01-08 09:59:41
   üîç View: quick_view(41)

üìÅ ID: 40
   üìù Title: Audio: blog_record (2026-01-07 15_11_23).wav
   üìÑ Transcript: 10553 chars (1910 words)
   üí° Ideas: 5 | Avg Score: 43.6/70
   üìÖ Created: 2026-01-08 09:43:49
   üîç View: quick_view(40)

üìÅ ID: 39
   üìù Title: Audio: blog_ecord (2025-12-24 10_13_55).wav
   üìÑ Transcript: 1704 chars (268 words)
   üí° Ideas: 5 | Avg Score: 48.4/70
   üìÖ Created: 2025-12-24 09:22:03
   üîç View: quick_view(39)

üìÅ ID: 38
   üìù Title: Audio: blog_ecord (2025-12-24 10_13_55).wav
   üìÑ Transcript: 1704 chars (268 words)
   üí° Ideas: 5 | Avg Score: 47.2/70
   üìÖ Created: 2025-12-24 09:18:42
   üîç View: quick_view(38)

üìÅ ID: 37
   üìù Title: Audio: blog_ecord (2025-12-01 19_47_21).wav
   üìÑ Tr

In [46]:
# Cell: Terminal Dashboard - Using Existing DB Methods
def show_ideas_dashboard(conversation_id=None, top_n=10):
    """
    Beautiful terminal dashboard showing scored blog ideas
    Uses existing db.get_all_ideas() or db.get_ideas_by_conversation()
    
    Args:
        conversation_id: Show ideas for specific conversation (None = all ideas)
        top_n: Number of top ideas to show
    """
    
    print("\n" + "=" * 100)
    print("üìä BLOG IDEAS DASHBOARD")
    print("=" * 100)
    
    # Get ideas using existing methods
    if conversation_id:
        ideas = db.get_ideas_by_conversation(conversation_id)
        print(f"üìÅ Showing ideas from Conversation ID: {conversation_id}")
    else:
        ideas = db.get_all_ideas()
        print(f"üìÅ Showing ALL ideas from database")
    
    if not ideas:
        print("‚ö†Ô∏è  No ideas found in database")
        return
    
    # Calculate scores and sort
    scored_ideas = []
    for idea in ideas:
        # Handle both dict and object formats
        if isinstance(idea, dict):
            total = sum([
                idea.get('usefulness_potential', 0),
                idea.get('fitwith_seo_strategy', 0),
                idea.get('fitwith_content_strategy', 0),
                idea.get('inspiration_potential', 0),
                idea.get('collaboration_potential', 0),
                idea.get('innovation', 0),
                idea.get('difficulty', 0)
            ])
        else:
            total = sum([
                idea.usefulness_potential,
                idea.fitwith_seo_strategy,
                idea.fitwith_content_strategy,
                idea.inspiration_potential,
                idea.collaboration_potential,
                idea.innovation,
                idea.difficulty
            ])
        scored_ideas.append((idea, total))
    
    # Sort by total score (highest first)
    scored_ideas.sort(key=lambda x: x[1], reverse=True)
    
    # Show summary stats
    all_scores = [s[1] for s in scored_ideas]
    avg_score = sum(all_scores) / len(all_scores)
    
    print(f"\nüìà SUMMARY STATISTICS")
    print(f"   Total Ideas: {len(scored_ideas)}")
    print(f"   Average Score: {avg_score:.1f}/70 ({avg_score/70*100:.1f}%)")
    print(f"   Highest Score: {scored_ideas[0][1]}/70 ({scored_ideas[0][1]/70*100:.1f}%)")
    print(f"   Lowest Score: {scored_ideas[-1][1]}/70 ({scored_ideas[-1][1]/70*100:.1f}%)")
    
    # Show score distribution
    high_scores = sum(1 for s in all_scores if s >= 60)
    medium_scores = sum(1 for s in all_scores if 50 <= s < 60)
    low_scores = sum(1 for s in all_scores if s < 50)
    
    print(f"\nüìä SCORE DISTRIBUTION")
    print(f"   üü¢ High (60-70):  {high_scores} ideas ({high_scores/len(all_scores)*100:.1f}%)")
    print(f"   üü° Medium (50-59): {medium_scores} ideas ({medium_scores/len(all_scores)*100:.1f}%)")
    print(f"   üî¥ Low (<50):     {low_scores} ideas ({low_scores/len(all_scores)*100:.1f}%)")
    
    # Show top ideas
    display_count = min(top_n, len(scored_ideas))
    print(f"\nüèÜ TOP {display_count} IDEAS")
    print("-" * 100)
    
    for rank, (idea, total_score) in enumerate(scored_ideas[:top_n], 1):
        # Handle both dict and object formats
        if isinstance(idea, dict):
            idea_id = idea.get('id')
            title = idea.get('title', 'Untitled')
            usefulness = idea.get('usefulness_potential', 0)
            seo = idea.get('fitwith_seo_strategy', 0)
            content = idea.get('fitwith_content_strategy', 0)
            inspiration = idea.get('inspiration_potential', 0)
            collaboration = idea.get('collaboration_potential', 0)
            innovation = idea.get('innovation', 0)
            difficulty = idea.get('difficulty', 0)
            created_at = idea.get('created_at', 'Unknown')
            conv_id = idea.get('conversation_id', 'N/A')
            sent_to_prod = idea.get('sent_to_prod', False)
        else:
            idea_id = idea.id
            title = idea.title
            usefulness = idea.usefulness_potential
            seo = idea.fitwith_seo_strategy
            content = idea.fitwith_content_strategy
            inspiration = idea.inspiration_potential
            collaboration = idea.collaboration_potential
            innovation = idea.innovation
            difficulty = idea.difficulty
            created_at = idea.created_at
            conv_id = idea.conversation_id
            sent_to_prod = idea.sent_to_prod
        
        # Create score bar
        bar_length = 35
        percentage = total_score / 70
        filled = int(percentage * bar_length)
        bar = "‚ñà" * filled + "‚ñë" * (bar_length - filled)
        
        # Medal emoji for top 3
        medal = {1: "ü•á", 2: "ü•à", 3: "ü•â"}.get(rank, f"{rank:2d}.")
        
        # Color indicator based on score
        if total_score >= 60:
            indicator = "üü¢"  # High score
        elif total_score >= 50:
            indicator = "üü°"  # Medium score
        else:
            indicator = "üî¥"  # Low score
        
        print(f"\n{medal} {indicator} ID: {idea_id} | Score: {total_score}/70 ({percentage*100:.1f}%)")
        print(f"   üìù {title}")
        print(f"   üìä [{bar}] {total_score}/70")
        print(f"   üí° Breakdown:")
        print(f"      ‚Ä¢ Usefulness: {usefulness}/10 | SEO Fit: {seo}/10 | Content Fit: {content}/10")
        print(f"      ‚Ä¢ Inspiration: {inspiration}/10 | Collaboration: {collaboration}/10")
        print(f"      ‚Ä¢ Innovation: {innovation}/10 | Difficulty (ease): {difficulty}/10")
        print(f"   üìÖ Created: {created_at}")
        print(f"   üîó Conversation: {conv_id}")
        
        if sent_to_prod:
            print(f"   ‚úÖ STATUS: SENT TO PRODUCTION")
        else:
            print(f"   üìù STATUS: Draft")
    
    print("\n" + "=" * 100)
    print("üí° USAGE TIPS:")
    print("   show_ideas_dashboard()              # Show all ideas")
    print("   show_ideas_dashboard(28)            # Show ideas from conversation 28")
    print("   show_ideas_dashboard(28, top_n=3)   # Show top 3 ideas only")
    print("=" * 100 + "\n")

print("‚úÖ Terminal dashboard ready (using db.get_all_ideas and db.get_ideas_by_conversation)")

‚úÖ Terminal dashboard ready (using db.get_all_ideas and db.get_ideas_by_conversation)


In [47]:
# Cell: Quick View - Compact Dashboard
def quick_view(conversation_id=None):
    """Quick compact view of scored ideas"""
    
    ideas = db.get_ideas_by_conversation(conversation_id) if conversation_id else db.get_all_ideas()
    
    if not ideas:
        print("‚ö†Ô∏è  No ideas found")
        return
    
    # Score and sort
    scored = []
    for idea in ideas:
        if isinstance(idea, dict):
            total = sum([idea.get('usefulness_potential', 0), idea.get('fitwith_seo_strategy', 0),
                        idea.get('fitwith_content_strategy', 0), idea.get('inspiration_potential', 0),
                        idea.get('collaboration_potential', 0), idea.get('innovation', 0), idea.get('difficulty', 0)])
            scored.append((idea, total))
        else:
            total = sum([idea.usefulness_potential, idea.fitwith_seo_strategy, idea.fitwith_content_strategy,
                        idea.inspiration_potential, idea.collaboration_potential, idea.innovation, idea.difficulty])
            scored.append((idea, total))
    
    scored.sort(key=lambda x: x[1], reverse=True)
    
    print(f"\n{'='*100}")
    print(f"üìä {'CONVERSATION ' + str(conversation_id) if conversation_id else 'ALL IDEAS'} | Total: {len(scored)} ideas | Avg: {sum(s[1] for s in scored)/len(scored):.1f}/70")
    print(f"{'='*100}\n")
    
    for rank, (idea, score) in enumerate(scored, 1):
        if isinstance(idea, dict):
            idea_id, title = idea.get('id'), idea.get('title', 'Untitled')
        else:
            idea_id, title = idea.id, idea.title
        
        medal = {1: "ü•á", 2: "ü•à", 3: "ü•â"}.get(rank, f"{rank:2d}.")
        indicator = "üü¢" if score >= 60 else "üü°" if score >= 50 else "üî¥"
        
        print(f"{medal} {indicator} [{idea_id:3d}] {score:2d}/70 | {title[:75]}")
    
    print(f"\n{'='*100}\n")

print("‚úÖ Quick view function ready")

‚úÖ Quick view function ready


In [48]:
# Test 1: Full dashboard for conversation 37
print("üß™ Test 1: Full dashboard for conversation 39")
show_ideas_dashboard(39)

# Test 2: Quick view for all ideas
#print("\nüß™ Test 2: Quick view for all ideas")
#quick_view()

# Test 3: Quick view for conversation 33
#print("\nüß™ Test 3: Quick view for conversation 33")
#quick_view(33)

üß™ Test 1: Full dashboard for conversation 39

üìä BLOG IDEAS DASHBOARD
üìÅ Showing ideas from Conversation ID: 39

üìà SUMMARY STATISTICS
   Total Ideas: 5
   Average Score: 48.4/70 (69.1%)
   Highest Score: 54/70 (77.1%)
   Lowest Score: 41/70 (58.6%)

üìä SCORE DISTRIBUTION
   üü¢ High (60-70):  0 ideas (0.0%)
   üü° Medium (50-59): 2 ideas (40.0%)
   üî¥ Low (<50):     3 ideas (60.0%)

üèÜ TOP 5 IDEAS
----------------------------------------------------------------------------------------------------

ü•á üü° ID: 58 | Score: 54/70 (77.1%)
   üìù AI Implementation for Financial Independence: How to Evaluate Solutions That Respect Your Autonomy
   üìä [‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñë‚ñë‚ñë‚ñë‚ñë‚ñë‚ñë‚ñë] 54/70
   üí° Breakdown:
      ‚Ä¢ Usefulness: 9/10 | SEO Fit: 6/10 | Content Fit: 9/10
      ‚Ä¢ Inspiration: 8/10 | Collaboration: 9/10
      ‚Ä¢ Innovation: 8/10 | Difficulty (ease): 5/10
   üìÖ Created: 2025-12-

In [49]:
if test_result and "blog_post" in test_result:
    post = test_result["blog_post"]  # Now a full BlogPost instance
    print("Generated Blog Post (Full Schema):\n")
    print(f"ID: {post.id}")
    print(f"Title: {post.title}")
    print(f"Issue: {post.issue}")
    print(f"Angle: {post.angle}")
    print(f"Single Message: {post.single_message}")
    print(f"User Story: {post.user_story}")
    print(f"Seed Keyword: {post.seed_keyword}")
    print(f"Call to Action: {post.call_to_action}")
    print(f"Keywords: {', '.join(post.keywords) if post.keywords else 'None'}")
    print(f"Status: {post.status}")
    print(f"Published Date: {post.published_date}")
    print(f"Created At: {post.created_at}")
    print(f"Updated At: {post.updated_at}")
    print("\nFull Content:\n")
    print(post.content)  # The complete blog text
else:
    print("No blog_post found.")

Generated Blog Post (Full Schema):

ID: None
Title: Breaking Free from Big Tech: How Mission-Driven SMEs Can Build Ethical, GDPR-Compliant Infrastructure
Issue: Mission-driven SMEs (NGOs, activist groups, social enterprises) face an impossible choice: use Big Tech platforms that compromise their values and expose stakeholder data to privacy risks, or struggle with limited alternatives that lack reach and visibility. They lack the technical expertise and financial resources to build sustainable, GDPR-compliant, values-aligned technology infrastructure. Single individuals often become bottlenecks, creating organizational fragility.
Angle: Big Kids Automation Agency positions custom automation and thoughtful tech solutions as a path for mission-driven SMEs to break free from Big Tech dependency, achieve GDPR compliance, and build organizational autonomy‚Äîwithout requiring massive investment or technical expertise. We emphasize that this is achievable, values-aligned, and ultimately less 