## Persona Info Extraction Pipeline

This notebook demonstrates the **persona info extraction pipeline** using the refactored functions from `src/modules/persona_info_extraction/`.

### Overview

The pipeline implements a **two-phase conversational interview** using LangGraph state machines:

1. **Phase 1: Intent & Focus Discovery**
   - Collect basic info: name, age, location, languages
   - Determine user's intent and current focus (jobs, training, or awareness)
   - Identify top professional domain

2. **Phase 2: Detailed Extraction** (focus-dependent)
   - For job seekers: experience, education, work preferences, skills
   - For training seekers: current skills and learning motivations
   - For awareness: brief overview

3. **Profile Extraction & Validation**
   - Extract structured PersonaInfo from conversation transcript
   - Validate against domain/skill constraints
   - Save to personas map

### Features

- **Multi-phase interviews** with AWS persona API integration
- **Domain-aware validation** using allowed skills mappings
- **Carbon emissions tracking** based on token consumption
- **Incremental profile building** with resume capability
- **Comprehensive logging** and session summaries

### 1. Setup & Installation

In [None]:
# Install LangGraph for state machine management
!pip install -U langgraph

In [None]:
# Core imports
import json
import os
import sys
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List

# Add parent directory to path
sys.path.append('..')

# Environment setup
from dotenv import load_dotenv
load_dotenv("../.env")

# Import pipeline functions
from src.modules.persona_info_extraction.pipeline import (
    run_interview,
    main_interview_pipeline,
    load_all_personas,
    create_persona_summary,
    validate_persona_profile,
    extract_profile_from_transcript
)

# Import agent and configuration
from src.agents import get_agent
from src.config import (
    KNOWN_JOB_DOMAINS,
    EDU_LEVELS,
    SKILL_LEVELS,
    ALLOWED_SKILLS,
    CurrentFocus,
    WorkType
)

# Models for type checking
from src.modules.persona_info_extraction.models import PersonaInfo, InterviewState

print("‚úì Imports complete")

In [None]:
# Display configuration
print(f"Known Job Domains: {len(KNOWN_JOB_DOMAINS)} domains")
print(f"Allowed Skills: {sum(len(skills) for skills in ALLOWED_SKILLS.values())} total skills across domains")
print(f"Education Levels: {len(EDU_LEVELS)} levels")
print(f"Skill Levels: {SKILL_LEVELS}")
print(f"\nExample domains: {list(KNOWN_JOB_DOMAINS)[:5]}")

### 2. PersonaInfo Schema

View the data model for extracted persona profiles:

In [None]:
# Display PersonaInfo schema
print(json.dumps(PersonaInfo.model_json_schema(), indent=2))

### 3. Run Single Persona Interview

Run a complete interview for a single persona using the main pipeline function:

In [None]:
# Configure interview parameters
PERSONA_ID = "persona_001"
MODEL_ID = "mistral-small-latest"
MAX_TURNS_P1 = 5  # Phase 1: Intent & Focus
MAX_TURNS_P2 = 5  # Phase 2: Detailed Extraction

# Run interview
print(f"Starting interview for {PERSONA_ID}...\n")

result = main_interview_pipeline(
    persona_id=PERSONA_ID,
    get_agent=get_agent,
    model_id=MODEL_ID,
    max_turns_p1=MAX_TURNS_P1,
    max_turns_p2=MAX_TURNS_P2
)

print("\n" + "="*80)
print("INTERVIEW COMPLETE")
print("="*80)

In [None]:
# Display results
if "error" in result:
    print(f"‚ùå Interview failed: {result['error']}")
else:
    profile = result["profile"]
    validation = result["validation"]
    
    print(f"‚úì Profile extracted for: {profile.get('full_name', 'Unknown')}")
    print(f"‚úì Saved to: {result['save_path']}")
    print(f"\nValidation: {'PASSED ‚úì' if validation['is_valid'] else 'FAILED ‚úó'}")
    
    if not validation['is_valid']:
        print("Validation errors:")
        for error in validation['errors']:
            print(f"  - {error}")
    
    if "summary" in result:
        print("\n" + "="*80)
        print("PERSONA SUMMARY")
        print("="*80)
        print(result["summary"])

### 4. View Conversation Transcript

In [None]:
# Display full conversation
if "error" not in result:
    conversation = result.get("conversation", [])
    
    print("="*80)
    print("CONVERSATION TRANSCRIPT")
    print("="*80 + "\n")
    
    for i, message in enumerate(conversation, 1):
        print(f"{i}. {message}")
        print()
    
    print(f"\nTotal messages: {len(conversation)}")

### 5. Detailed Profile Information

In [None]:
# Display full profile JSON
if "error" not in result:
    profile = result["profile"]
    
    print("="*80)
    print("EXTRACTED PROFILE (PersonaInfo)")
    print("="*80 + "\n")
    
    print(json.dumps(profile, indent=2, ensure_ascii=False))
    
    # Highlight key fields
    print("\n" + "="*80)
    print("KEY FIELDS")
    print("="*80)
    print(f"Name: {profile.get('full_name')}")
    print(f"Age: {profile.get('age')}")
    print(f"Location: {profile.get('location_city')}, {profile.get('location_country')}")
    print(f"Domain: {profile.get('top_domain')}")
    print(f"Focus: {profile.get('current_focus')}")
    print(f"Languages: {', '.join(profile.get('languages', []))}")
    print(f"Education: {profile.get('education_level')}")
    print(f"Experience: {profile.get('years_experience')} years")
    print(f"Work Type: {profile.get('preferred_work_type')}")
    print(f"Open to Relocate: {profile.get('open_to_relocate')}")
    
    # Technical skills
    tech_skills = profile.get('technical_skills', [])
    if tech_skills:
        print(f"\nTechnical Skills ({len(tech_skills)}):")
        for skill in tech_skills:
            print(f"  - {skill.get('name')} ({skill.get('level')})")
    
    # Training motivation
    training = profile.get('training_motivation', [])
    if training:
        print(f"\nTraining Motivation ({len(training)}):")
        for topic in training:
            print(f"  - {topic}")
    
    # Desired roles
    roles = profile.get('desired_job_roles', [])
    if roles:
        print(f"\nDesired Job Roles ({len(roles)}):")
        for role in roles:
            print(f"  - {role}")

### 6. Session Statistics & Emissions Tracking

In [None]:
# Display session summary
if "error" not in result:
    session_summary = result.get("session_summary", {})
    
    print("="*80)
    print("SESSION STATISTICS")
    print("="*80 + "\n")
    
    print(f"Persona ID: {session_summary.get('persona_id')}")
    print(f"Model: {session_summary.get('model')}")
    print(f"Total Turns: {session_summary.get('total_turns')}")
    print(f"Conversation IDs: {len(session_summary.get('conversation_ids', []))}")
    
    # Token usage
    meta = result.get("final_state", {}).get("meta", {})
    if meta:
        print(f"\nüìä Token Usage:")
        print(f"  Total Input Tokens: {meta.get('total_input_tokens', 0):,}")
        print(f"  Total Output Tokens: {meta.get('total_output_tokens', 0):,}")
        print(f"  Total Tokens: {meta.get('total_input_tokens', 0) + meta.get('total_output_tokens', 0):,}")
        
        # Emissions tracking
        if 'carbon_emissions_g' in meta:
            print(f"\nüå± Carbon Emissions:")
            print(f"  CO2 Equivalent: {meta.get('carbon_emissions_g', 0):.4f} grams")
            print(f"  (Based on token consumption)")

### 7. Batch Processing Multiple Personas

Process multiple personas in batch:

In [None]:
# Batch process personas
PERSONA_IDS = ["persona_001", "persona_002", "persona_003"]  # Update with actual IDs
BATCH_RESULTS = []

print(f"Processing {len(PERSONA_IDS)} personas...\n")

for i, persona_id in enumerate(PERSONA_IDS, 1):
    print(f"[{i}/{len(PERSONA_IDS)}] Interviewing {persona_id}...", end=" ")
    
    result = main_interview_pipeline(
        persona_id=persona_id,
        get_agent=get_agent,
        model_id="mistral-small-latest",
        max_turns_p1=5,
        max_turns_p2=5
    )
    
    BATCH_RESULTS.append(result)
    
    if "error" in result:
        print(f"‚ùå FAILED: {result['error']}")
    else:
        profile = result["profile"]
        name = profile.get("full_name", "Unknown")
        is_valid = result["validation"]["is_valid"]
        status = "‚úì" if is_valid else "‚úó"
        print(f"{status} {name}")

print(f"\nBatch processing complete!")
print(f"Success: {sum(1 for r in BATCH_RESULTS if 'error' not in r)}/{len(BATCH_RESULTS)}")
print(f"Failed: {sum(1 for r in BATCH_RESULTS if 'error' in r)}/{len(BATCH_RESULTS)}")

In [None]:
# Display batch summary
print("="*80)
print("BATCH SUMMARY")
print("="*80 + "\n")

for i, result in enumerate(BATCH_RESULTS, 1):
    if "error" not in result:
        profile = result["profile"]
        print(f"{i}. {profile.get('full_name')}")
        print(f"   Domain: {profile.get('top_domain')}")
        print(f"   Focus: {profile.get('current_focus')}")
        print(f"   Skills: {len(profile.get('technical_skills', []))}")
        print()

### 8. Load All Saved Personas

In [None]:
# Load all personas from today's session
all_personas = load_all_personas()

print(f"Loaded {len(all_personas)} personas from storage\n")

# Display summary
for persona_id, persona_data in list(all_personas.items())[:5]:  # Show first 5
    profile = persona_data.get("profile", {})
    print(f"ID: {persona_id}")
    print(f"  Name: {profile.get('full_name', 'Unknown')}")
    print(f"  Domain: {profile.get('top_domain', 'unspecified')}")
    print(f"  Focus: {profile.get('current_focus', 'unspecified')}")
    print()

### 9. Extract Profile from Existing Transcript

If you have an existing conversation transcript, you can extract the profile directly:

In [None]:
# Example: Extract profile from transcript
sample_transcript = """
Assistant: Hello! I'm here to help you find job opportunities and training programs. What's your full name?
User: My name is Maria Silva.
Assistant: Nice to meet you, Maria! How old are you?
User: I'm 28 years old.
Assistant: Great! What city are you currently in?
User: I'm in S√£o Paulo, Brazil.
Assistant: What are your main goals right now? Are you looking for jobs, training, or just exploring?
User: I want to find a job and also get some training to improve my skills.
Assistant: Excellent! What professional domain are you most interested in?
User: I'm interested in the Food Industry.
"""

# Extract profile
extractor_agent = get_agent("extractor")
extracted_profile = extract_profile_from_transcript(
    transcript=sample_transcript,
    extractor_agent=extractor_agent,
    domain="Food Industry",
    intent="Find job and training",
    focus="jobs+trainings"
)

print("Extracted Profile:")
print(json.dumps(extracted_profile, indent=2, ensure_ascii=False))

### 10. Profile Validation

Validate a persona profile manually:

In [None]:
# Validate a profile
if "error" not in result:
    profile_to_validate = result["profile"]
    
    is_valid, errors = validate_persona_profile(profile_to_validate)
    
    print("Profile Validation:")
    print(f"Status: {'VALID ‚úì' if is_valid else 'INVALID ‚úó'}")
    
    if not is_valid:
        print("\nErrors found:")
        for error in errors:
            print(f"  - {error}")
    else:
        print("\nProfile passed all validation checks!")

### 11. Export Results

Export personas for downstream processing (knowledge graph, recommendations):

In [None]:
# Export all personas to JSON
output_dir = Path("../processed_data/outputs")
output_dir.mkdir(parents=True, exist_ok=True)

export_file = output_dir / f"personas_extracted_{datetime.now().strftime('%Y-%m-%d')}.json"

# Collect all valid profiles
valid_personas = {}
for result in BATCH_RESULTS:
    if "error" not in result and result["validation"]["is_valid"]:
        persona_id = result["final_state"].get("persona_id")
        valid_personas[persona_id] = result["profile"]

# Save to file
with open(export_file, "w", encoding="utf-8") as f:
    json.dump(valid_personas, f, indent=2, ensure_ascii=False)

print(f"‚úì Exported {len(valid_personas)} personas to: {export_file}")