# Planning and Decomposition

In [1]:
from agentic_patterns.core.agents import get_agent, run_agent
from agentic_patterns.core.agents.utils import nodes_to_message_history

## The Problem

Build a data pipeline to analyze website traffic logs.

This problem benefits from planning because:
- It involves multiple distinct phases (data loading, cleaning, analysis, reporting)
- Steps have dependencies (can't analyze before cleaning)
- The solution structure matters as much as individual steps
- An explicit plan can be reviewed before execution begins

In [2]:
task = """Build a data pipeline to analyze website traffic logs.

Context:
- Log files are in JSON format, one event per line
- Each event has: timestamp, user_id, page_url, referrer, user_agent
- Files are stored in an S3 bucket, organized by date (logs/YYYY/MM/DD/)
- Need to produce a daily report showing:
  * Total page views and unique visitors
  * Top 10 most visited pages
  * Traffic sources breakdown (direct, search, social, referral)
  * Hourly traffic pattern

Design and implement this pipeline."""

## Step 1: Generate the Plan

Ask the model to create an explicit plan before any implementation.

In [3]:
system_prompt_planner = """You are a software architect. Your job is to create implementation plans.
Do NOT write code. Create a structured plan that a developer can follow."""

prompt_plan = f"""{task}

Create a detailed implementation plan. For each step:
1. Give it a clear name
2. Describe what it accomplishes
3. List inputs it requires
4. List outputs it produces
5. Note any dependencies on other steps

Format as a numbered list. Do not write code."""

agent_planner = get_agent(config_name="fast", system_prompt=system_prompt_planner)
agent_run_plan, nodes_plan = await run_agent(agent_planner, prompt_plan)

print("Implementation Plan:")
print(agent_run_plan.result.output)

Implementation Plan:
Here's a detailed implementation plan for the website traffic logs analysis pipeline:

1. Log File Collection & Validation
   - Purpose: Gather daily log files and verify data integrity
   - Inputs: Raw JSON log files from S3 (logs/YYYY/MM/DD/)
   - Outputs: Validated JSON files, error report for malformed records
   - Dependencies: None

2. Data Parsing & Enrichment
   - Purpose: Parse JSON, standardize fields, add derived data
   - Inputs: Validated JSON files
   - Outputs: Parquet files with enriched data including:
     * Parsed timestamp with hour extraction
     * Classified traffic source (using referrer analysis)
     * Normalized URLs (removing query parameters)
     * Parsed user agent information
   - Dependencies: Step 1

3. Daily Aggregation Processing
   - Purpose: Calculate core metrics for the day
   - Inputs: Enriched Parquet files
   - Outputs: Four separate analytical tables:
     * Daily visitor counts (total and unique)
     * Page view counts 

## Step 2: Validate the Plan

Before implementation, verify the plan is complete and feasible.

In [4]:
message_history = nodes_to_message_history(nodes_plan)

prompt_validate = """Review the plan you created. Check for:

1. Completeness: Does the plan cover all requirements?
2. Dependencies: Are step dependencies correctly ordered?
3. Feasibility: Are there any steps that seem unclear or underspecified?
4. Missing steps: Is anything needed that wasn't included?

If issues are found, provide an updated plan. Otherwise confirm the plan is ready."""

agent_run_validate, nodes_validate = await run_agent(
    agent_planner, prompt_validate, message_history=message_history
)

print("Plan Validation:")
print(agent_run_validate.result.output)

Plan Validation:
Let me review the plan systematically:

COMPLETENESS CHECK:
✓ Page views and unique visitors (Step 3)
✓ Top 10 pages (Step 4)
✓ Traffic sources (Steps 2 & 3)
✓ Hourly patterns (Steps 2 & 5)

IDENTIFIED GAPS:

1. Data Testing/Quality Assurance is underspecified
2. No backup/recovery procedures
3. Schema management not addressed
4. Initial setup/deployment process missing

Here's an updated plan with new steps (additions marked with *):

*0. Initial Setup & Configuration
   - Purpose: Configure infrastructure and define schemas
   - Inputs: Configuration parameters, schema definitions
   - Outputs: Deployed infrastructure, validated schemas
   - Dependencies: None

1-8. [Previous steps remain the same]

*9. Data Quality Validation
   - Purpose: Verify data completeness and accuracy
   - Inputs: 
     * Raw and processed datasets
     * Expected value ranges
     * Historical patterns
   - Outputs:
     * Data quality scorecard
     * Validation report
     * Data quality

## Step 3: Decompose a Complex Step

Some steps may need further decomposition. Here we break down a complex step into sub-tasks.

In [5]:
message_history = nodes_to_message_history(nodes_validate)

prompt_decompose = """The 'Traffic Sources Classification' step is complex.
Decompose it into smaller sub-tasks:

1. List each sub-task needed
2. Explain the logic for each classification category
3. Describe how to handle edge cases (unknown referrers, missing data)

Keep the sub-tasks atomic and implementable."""

agent_run_decompose, nodes_decompose = await run_agent(
    agent_planner, prompt_decompose, message_history=message_history
)

print("Decomposed Sub-Tasks:")
print(agent_run_decompose.result.output)

Decomposed Sub-Tasks:
Here's a detailed breakdown of the Traffic Sources Classification step:

SUB-TASKS:

1. Referrer URL Parsing
   - Extract domain from referrer URL
   - Handle URL encoding/decoding
   - Normalize domains (remove www., standardize format)
   - Output: Clean referrer domain

2. UTM Parameter Processing
   - Extract utm_source
   - Extract utm_medium
   - Extract utm_campaign
   - Handle URL-encoded parameters
   - Output: Structured UTM data

3. Search Engine Detection
   - Match against search engine domain list
   - Extract search query parameters
   - Handle different search engine URL patterns
   - Output: Boolean is_search_engine + engine_name

4. Social Media Classification
   ```python
   social_platforms = {
       'facebook.com': 'Facebook',
       'twitter.com': 'Twitter',
       't.co': 'Twitter',
       'linkedin.com': 'LinkedIn',
       'instagram.com': 'Instagram'
       # etc.
   }
   ```
   - Match against social domain list
   - Handle short URLs (t

## Step 4: Implement From the Plan

Now implement the solution following the validated, decomposed plan.

In [6]:
system_prompt_implementer = """You are a Python developer. Implement code following the given plan.
Write clean, well-structured code. Follow the plan exactly."""

message_history = nodes_to_message_history(nodes_decompose)

prompt_implement = """Implement the data pipeline following the plan.

Create Python code that:
1. Follows the step structure from the plan
2. Implements each step as a separate function
3. Includes type hints and docstrings
4. Has a main() function that orchestrates the pipeline

Use boto3 for S3, pandas for data processing."""

agent_implementer = get_agent(
    config_name="fast", system_prompt=system_prompt_implementer
)
agent_run_impl, _ = await run_agent(
    agent_implementer, prompt_implement, message_history=message_history
)

print("Implementation (from plan):")
print(agent_run_impl.result.output)

Implementation (from plan):
Here's an implementation of the traffic sources classification pipeline:

```python
from dataclasses import dataclass
from typing import Dict, Optional, List
import pandas as pd
import boto3
from urllib.parse import urlparse, parse_qs
import logging
from datetime import datetime

# Type definitions
@dataclass
class UTMParameters:
    source: Optional[str]
    medium: Optional[str]
    campaign: Optional[str]

@dataclass
class TrafficSource:
    category: str
    source: str
    medium: str
    campaign: Optional[str]
    metadata: Dict

# Constants
SEARCH_ENGINES = {
    'google.com': {'param': 'q', 'name': 'Google'},
    'bing.com': {'param': 'q', 'name': 'Bing'},
    'yahoo.com': {'param': 'p', 'name': 'Yahoo'}
}

SOCIAL_PLATFORMS = {
    'facebook.com': 'Facebook',
    'twitter.com': 'Twitter',
    't.co': 'Twitter',
    'linkedin.com': 'LinkedIn',
    'instagram.com': 'Instagram'
}

class TrafficSourceClassifier:
    def __init__(self):
        self.s3_c