In [None]:
# AW (Agentic Workflows) Demo

## Overview

**AW** is a package for building AI agents that prepare data using the **ReAct pattern** (Reason-Act-Observe). It provides a modular, extensible framework for creating multi-step agentic workflows that can iteratively solve data preparation problems.

### Why Agentic Workflows?

Traditional data pipelines are rigid: if the data format changes slightly, the whole pipeline breaks. **Agentic workflows** solve this by:

1. **Reasoning about the problem**: Analyzing data characteristics and requirements
2. **Acting with generated code**: Creating transformation logic on-the-fly
3. **Observing results**: Validating outputs and learning from failures
4. **Iterating until success**: Automatically retrying with refined approaches

This makes data preparation more **robust** and **adaptable** to varied inputs.

### Core Concepts

- **AgenticStep Protocol**: Common interface for all agents
- **ReAct Pattern**: Reason → Act → Observe loop with retries
- **Three Validation Flavors**: Schema-based, info-dict, and functional
- **Context Management**: Shared state between workflow steps
- **Orchestration**: Chain multiple agents into workflows

In [None]:
# Core imports
import pandas as pd
import numpy as np
from pathlib import Path
import tempfile

# AW package imports
from aw import (
    # Core abstractions
    Context,
    StepConfig,
    # Agents
    LoadingAgent,
    PreparationAgent,
    # Validation
    schema_validator,
    info_dict_validator,
    functional_validator,
    all_validators,
    is_type,
    has_attributes,
    # Orchestration
    AgenticWorkflow,
    create_data_prep_workflow,
    load_and_prepare,
)

In [None]:
## Part 1: Validation - The Three Flavors

AW provides **three distinct validation approaches**, each suited to different scenarios:

### 1. Schema Validation

**Use when**: You have a precise structure to validate against (types, field constraints).

**Pattern**: Define a Pydantic model or JSON schema, validate the artifact against it.

**Best for**: APIs, structured data exchanges, type safety.

In [None]:
from pydantic import BaseModel, validator

# Define a schema for a data point
class DataPoint(BaseModel):
    x: float
    y: float
    label: str
    
    @validator('x', 'y')
    def check_positive(cls, v):
        if v < 0:
            raise ValueError('coordinates must be non-negative')
        return v

# Create a validator from the schema
validate_point = schema_validator(DataPoint)

# Test with valid data
valid_data = {'x': 1.5, 'y': 2.3, 'label': 'point_A'}
is_valid, info = validate_point(valid_data)
print(f"Valid: {is_valid}")
print(f"Validated object: {info['validated']}")

# Test with invalid data
invalid_data = {'x': -1.0, 'y': 2.3, 'label': 'point_B'}
is_valid, info = validate_point(invalid_data)
print(f"\nValid: {is_valid}")
print(f"Error: {info.get('error', 'N/A')[:100]}...")  # Truncate long error

In [None]:
### 2. Info-Dict Validation

**Use when**: You need to compute statistics/properties and check them against requirements.

**Pattern**: `compute_info(artifact) → check_info(info) → (bool, info)`

**Best for**: Data quality checks, statistical validation, multi-step analysis.

In [None]:
# Create sample DataFrame
df = pd.DataFrame({
    'x': [1.0, 2.0, 3.0, 4.0, 5.0],
    'y': [2.0, 4.0, 6.0, 8.0, 10.0],
    'category': ['A', 'B', 'A', 'B', 'A']
})

# Define compute function (extract statistics)
def compute_df_info(df):
    return {
        'n_rows': len(df),
        'n_cols': len(df.columns),
        'null_count': df.isnull().sum().sum(),
        'numeric_cols': list(df.select_dtypes(include='number').columns),
        'memory_mb': df.memory_usage(deep=True).sum() / 1024**2
    }

# Define check function (validate computed info)
def check_df_requirements(info):
    errors = []
    
    if info['n_rows'] < 3:
        errors.append('Need at least 3 rows')
    
    if info['null_count'] > 0:
        errors.append(f"Found {info['null_count']} null values")
    
    if len(info['numeric_cols']) < 2:
        errors.append('Need at least 2 numeric columns')
    
    if errors:
        return False, {'errors': errors, 'info': info}
    
    return True, {'status': 'passed', 'info': info}

# Create and use the validator
validate_df = info_dict_validator(compute_df_info, check_df_requirements)

is_valid, result = validate_df(df)
print(f"Valid: {is_valid}")
print(f"Info: {result['info']}")
print(f"Reason: {result.get('reason', 'OK')}")

### 3. Functional Validation

**Use when**: The best test is whether you can actually **use** the artifact for its intended purpose.

**Pattern**: Try the actual use case (e.g., visualization, model training) and catch any errors.

**Best for**: Integration testing, end-to-end validation, "does it actually work?"

In [None]:
# Example: Validate that we can actually plot the data
def try_to_plot(df):
    """Attempt to create a scatter plot - this is the 'purpose' test."""
    import matplotlib.pyplot as plt
    
    # This will raise if df doesn't have numeric columns or has issues
    fig, ax = plt.subplots(figsize=(6, 4))
    numeric_cols = df.select_dtypes(include='number').columns
    
    if len(numeric_cols) < 2:
        raise ValueError(f"Need at least 2 numeric columns, got {len(numeric_cols)}")
    
    ax.scatter(df[numeric_cols[0]], df[numeric_cols[1]])
    ax.set_xlabel(numeric_cols[0])
    ax.set_ylabel(numeric_cols[1])
    plt.close(fig)  # Close to avoid display
    
    return {'plotted': True, 'x_col': numeric_cols[0], 'y_col': numeric_cols[1]}

# Create functional validator
validate_plotable = functional_validator(try_to_plot)

# Test with good data
is_valid, result = validate_plotable(df)
print(f"Valid: {is_valid}")
print(f"Result: {result}")

# Test with bad data (no numeric columns)
bad_df = pd.DataFrame({'name': ['Alice', 'Bob'], 'city': ['NYC', 'LA']})
is_valid, result = validate_plotable(bad_df)
print(f"\nBad data - Valid: {is_valid}")
print(f"Error: {result.get('error', 'N/A')[:100]}...")

### Combining Validators

Validators can be **composed** for multi-stage validation:

In [None]:
# Combine: must be DataFrame AND must be non-empty AND must be plottable
from aw import is_not_empty

combined_validator = all_validators(
    is_type(pd.DataFrame),
    is_not_empty(),
    validate_plotable
)

# Test
is_valid, result = combined_validator(df)
print(f"All checks passed: {is_valid}")
print(f"Details: {result}")

# Test with empty DataFrame
empty_df = pd.DataFrame()
is_valid, result = combined_validator(empty_df)
print(f"\nEmpty df - Valid: {is_valid}")
print(f"Failed validators: {[k for k, v in result.items() if not v.get('success', True)]}")

## Part 2: Context - Shared State Across Steps

The **Context** object is a mutable mapping that stores artifacts and metadata as they flow through the workflow.

### Why Context?

- **State sharing**: Later steps can access artifacts from earlier steps
- **History tracking**: Automatically tracks all state updates
- **Debugging**: Snapshot the context at any point to understand what happened
- **Mapping interface**: Works like a dict but with extra capabilities

In [None]:
# Create a context
ctx = Context()

# Store results from a "loading" step
ctx['loading'] = {
    'df': df,
    'metadata': {'source': 'demo.csv', 'rows': len(df)}
}

# Store results from a "preparing" step
ctx['preparing'] = {
    'df': df.copy(),
    'metadata': {'transformations': ['dropna', 'normalize']}
}

# Access like a dict
print(f"Loading metadata: {ctx['loading']['metadata']}")

# View history
print(f"\nHistory (length {len(ctx.history)}):") 
for key, value in ctx.history:
    print(f"  - Set '{key}' with {len(str(value))} characters")

# Take a snapshot
snapshot = ctx.snapshot()
print(f"\nSnapshot keys: {list(snapshot.keys())}")

## Part 3: The LoadingAgent - Data Ingestion with Intelligence

The **LoadingAgent** automatically loads data from various sources into pandas DataFrames.

### How it Works (ReAct Pattern)

1. **Sample**: Read file metadata (extension, first 1KB)
2. **Reason**: Infer the loader function and parameters
3. **Act**: Generate and execute loading code
4. **Observe**: Validate the result is a non-empty DataFrame
5. **Retry**: If failed, analyze error and try different approach

### Why Not Just `pd.read_csv()`?

- Handles multiple formats (CSV, Excel, JSON, Parquet, etc.)
- Automatically detects encoding issues
- Tries different delimiters/parameters on failure
- Provides rich metadata about the loading process

In [None]:
# Create some test data files
temp_dir = Path(tempfile.mkdtemp())

# Simple CSV
csv_path = temp_dir / 'data.csv'
test_df = pd.DataFrame({
    'id': range(1, 11),
    'x': np.random.randn(10),
    'y': np.random.randn(10),
    'category': np.random.choice(['A', 'B', 'C'], 10)
})
test_df.to_csv(csv_path, index=False)

print(f"Created test CSV at: {csv_path}")
print(f"Original data shape: {test_df.shape}")
print(f"\nFirst few rows:")
print(test_df.head(3))

In [None]:
# Create a LoadingAgent
loading_agent = LoadingAgent()

# Create context
context = Context()

# Execute loading
loaded_df, metadata = loading_agent.execute(str(csv_path), context)

# Check results
print(f"Success: {metadata['success']}")
print(f"Loaded shape: {loaded_df.shape if loaded_df is not None else 'None'}")
print(f"Number of attempts: {metadata.get('num_attempts', 'N/A')}")

# View DataFrame info from metadata
if 'df_info' in metadata:
    print(f"\nDataFrame info:")
    for key, value in metadata['df_info'].items():
        print(f"  {key}: {value}")

# Check what's in context
print(f"\nContext keys: {list(context.keys())}")
if 'loading' in context:
    print(f"Loading context keys: {list(context['loading'].keys())}")

## Part 4: The PreparationAgent - Intelligent Data Transformation

The **PreparationAgent** transforms data to meet specific requirements using functional validation.

### How it Works

1. **Analyze**: Compute initial data statistics
2. **Reason**: Determine what transformations are needed
3. **Act**: Generate and execute transformation code
4. **Observe**: Apply the **target validator** (functional test)
5. **Retry**: If validation fails, analyze error and try different transformation

### Target-Driven Design

Instead of specifying exact transformations, you specify the **target** (e.g., 'cosmo-ready', 'ml-ready') and a **validator** that checks if the data meets that target's requirements.

In [None]:
# Create a custom validator for "plotting-ready" data
def validate_plot_ready(df):
    """Data is plot-ready if it has at least 2 numeric columns with no nulls."""
    try:
        numeric_cols = df.select_dtypes(include='number').columns
        
        if len(numeric_cols) < 2:
            return False, {
                'error': f'Need at least 2 numeric columns, found {len(numeric_cols)}',
                'numeric_cols': list(numeric_cols)
            }
        
        # Check for nulls in numeric columns
        null_counts = df[numeric_cols].isnull().sum()
        if null_counts.sum() > 0:
            return False, {
                'error': 'Found null values in numeric columns',
                'null_counts': null_counts.to_dict()
            }
        
        return True, {
            'status': 'plot-ready',
            'numeric_cols': list(numeric_cols),
            'shape': df.shape
        }
    except Exception as e:
        return False, {'error': str(e)}

# Create PreparationAgent with this validator
config = StepConfig(validator=validate_plot_ready, max_retries=2)
prep_agent = PreparationAgent(config=config, target='plot-ready')

# Use the loaded DataFrame from previous step
# (In a real workflow, this would come from context)
prepared_df, prep_metadata = prep_agent.execute(loaded_df, context)

print(f"Success: {prep_metadata['success']}")
print(f"Number of attempts: {prep_metadata.get('num_attempts', 'N/A')}")

if 'validation_result' in prep_metadata:
    print(f"\nValidation result:")
    for key, value in prep_metadata['validation_result'].items():
        print(f"  {key}: {value}")

if prepared_df is not None:
    print(f"\nPrepared data shape: {prepared_df.shape}")
    print(f"Columns: {list(prepared_df.columns)}")

## Part 5: Workflows - Orchestrating Multiple Agents

**AgenticWorkflow** chains multiple agents together, managing context and artifact passing.

### Why Workflows?

- **Modularity**: Each agent focuses on one task
- **Reusability**: Mix and match agents for different pipelines
- **Observability**: Track execution of each step
- **Failure handling**: Know exactly where/why things failed

In [None]:
# Manual workflow construction
workflow = AgenticWorkflow()

# Add loading step
workflow.add_step('loading', LoadingAgent())

# Add preparing step
prep_config = StepConfig(validator=validate_plot_ready, max_retries=2)
workflow.add_step(
    'preparing', 
    PreparationAgent(config=prep_config, target='plot-ready')
)

# Execute workflow
final_artifact, workflow_metadata = workflow.run(str(csv_path))

print(f"Workflow success: {workflow_metadata.get('success', False)}")
print(f"\nSteps executed:")
for step_info in workflow_metadata['steps']:
    print(f"  - {step_info['name']}: {step_info.get('success', 'N/A')}")

# Final result
if final_artifact is not None:
    print(f"\nFinal artifact shape: {final_artifact.shape}")
    print(f"Ready to plot!")

### Workflow Factories

For common patterns, use **factory functions** for convenience:

In [None]:
# Use factory function
workflow = create_data_prep_workflow(target='plot-ready')

# Or use the convenience function directly
result_df, result_metadata = load_and_prepare(
    str(csv_path),
    target='plot-ready',
    validator=validate_plot_ready,
    max_retries=2
)

print(f"Success: {result_metadata.get('success', False)}")
if result_df is not None:
    print(f"Result shape: {result_df.shape}")
    print(f"\nFirst few rows:")
    print(result_df.head())

## Part 6: Real-World Example - Preparing for Cosmograph

**Cosmograph** is a visualization library that requires specific data formats. Let's use AW to automatically prepare data for it.

### Cosmograph Requirements

- At least 2 numeric columns (for x/y coordinates)
- No null values in coordinate columns
- Optional: labels, colors, sizes from other columns

AW provides specialized validators for this:

In [None]:
from aw.cosmo import basic_cosmo_validator, infer_cosmo_params
from aw import load_for_cosmo

# Create test data with some issues
messy_csv = temp_dir / 'messy_data.csv'
messy_df = pd.DataFrame({
    'id': range(1, 21),
    'feature_1': np.random.randn(20),
    'feature_2': np.random.randn(20),
    'feature_3': np.random.randn(20),
    'label': np.random.choice(['A', 'B', 'C'], 20),
    'notes': ['note_' + str(i) for i in range(20)]
})
# Add some nulls
messy_df.loc[2, 'feature_1'] = None
messy_df.loc[5, 'feature_2'] = None

messy_df.to_csv(messy_csv, index=False)

print(f"Created messy data with nulls:")
print(f"Shape: {messy_df.shape}")
print(f"Null counts:\n{messy_df.isnull().sum()}")

In [None]:
# Use the cosmo-specific workflow
# Note: This will use basic_cosmo_validator which doesn't actually call cosmograph
# (to avoid the dependency), but validates the structure is correct

cosmo_df, cosmo_metadata = load_for_cosmo(str(messy_csv), max_retries=3)

print(f"Success: {cosmo_metadata.get('success', False)}")

if cosmo_df is not None:
    print(f"\nCleaned data shape: {cosmo_df.shape}")
    print(f"Null counts: {cosmo_df.isnull().sum().sum()}")
    print(f"\nNumeric columns: {list(cosmo_df.select_dtypes(include='number').columns)}")
    
    # Infer cosmograph parameters
    if 'preparing' in cosmo_metadata:
        validation_result = cosmo_metadata['preparing']['metadata'].get('validation_result', {})
        if 'params' in validation_result:
            print(f"\nSuggested cosmograph params:")
            for key, value in validation_result['params'].items():
                print(f"  {key}: {value}")

## Part 7: Advanced - Custom Agents

You can build your own agents by following the **AgenticStep protocol**:

```python
def execute(self, input_data: Any, context: MutableMapping[str, Any]) -> tuple[Artifact, dict]:
    ...
```

Let's create a simple **FilteringAgent** that removes outliers:

In [None]:
from typing import Any
from collections.abc import MutableMapping

class OutlierFilterAgent:
    """Simple agent that removes outliers using IQR method."""
    
    def __init__(self, threshold: float = 1.5):
        self.threshold = threshold
    
    def execute(
        self, 
        input_df: pd.DataFrame, 
        context: MutableMapping[str, Any]
    ) -> tuple[pd.DataFrame, dict[str, Any]]:
        """Remove outliers from numeric columns."""
        
        df = input_df.copy()
        initial_count = len(df)
        
        removed_per_column = {}
        
        # For each numeric column
        for col in df.select_dtypes(include='number').columns:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            
            # Define outlier bounds
            lower_bound = Q1 - self.threshold * IQR
            upper_bound = Q3 + self.threshold * IQR
            
            # Count outliers
            outliers = (df[col] < lower_bound) | (df[col] > upper_bound)
            removed_per_column[col] = outliers.sum()
            
            # Remove outliers
            df = df[~outliers]
        
        final_count = len(df)
        
        metadata = {
            'success': True,
            'initial_rows': initial_count,
            'final_rows': final_count,
            'removed_total': initial_count - final_count,
            'removed_per_column': removed_per_column,
            'threshold': self.threshold
        }
        
        # Store in context
        context['outlier_filtering'] = {
            'df': df,
            'metadata': metadata
        }
        
        return df, metadata

# Test the custom agent
test_data = pd.DataFrame({
    'x': [1, 2, 3, 4, 5, 100],  # 100 is outlier
    'y': [2, 4, 6, 8, 10, 12],
    'z': [-50, 5, 6, 7, 8, 9]   # -50 is outlier
})

print("Before filtering:")
print(test_data)

filter_agent = OutlierFilterAgent(threshold=1.5)
ctx = Context()

filtered_df, filter_meta = filter_agent.execute(test_data, ctx)

print(f"\nAfter filtering:")
print(filtered_df)
print(f"\nRemoved {filter_meta['removed_total']} outliers")
print(f"Per column: {filter_meta['removed_per_column']}")

### Integrate Custom Agent into Workflow

Custom agents work seamlessly with built-in ones:

In [None]:
# Create a workflow with custom agent
custom_workflow = AgenticWorkflow()

# Step 1: Load
custom_workflow.add_step('loading', LoadingAgent())

# Step 2: Filter outliers (custom!)
custom_workflow.add_step('filtering', OutlierFilterAgent(threshold=2.0))

# Step 3: Prepare for visualization
prep_config = StepConfig(validator=validate_plot_ready, max_retries=2)
custom_workflow.add_step(
    'preparing',
    PreparationAgent(config=prep_config, target='plot-ready')
)

# Run complete workflow
final_result, final_metadata = custom_workflow.run(str(csv_path))

print(f"Workflow success: {final_metadata.get('success', False)}")
print(f"\nSteps:")
for step in final_metadata['steps']:
    print(f"  {step['name']}: {step.get('success', 'N/A')}")

# Check what each step did
if 'outlier_filtering' in custom_workflow.context:
    filter_info = custom_workflow.context['outlier_filtering']['metadata']
    print(f"\nFiltering removed {filter_info['removed_total']} rows")

if final_result is not None:
    print(f"\nFinal result shape: {final_result.shape}")

## Summary: Key Takeaways

### 1. **Three Validation Flavors**
   - **Schema**: Strict type/structure validation
   - **Info-dict**: Compute stats → check requirements
   - **Functional**: Try the actual use case

### 2. **ReAct Pattern**
   - Reason → Act → Observe → Retry
   - Makes pipelines robust to varied inputs
   - Automatic error analysis and recovery

### 3. **Modular Agents**
   - Each agent has single responsibility
   - Easy to test, reuse, and compose
   - Follow `AgenticStep` protocol

### 4. **Context Management**
   - Shared state across workflow steps
   - History tracking for debugging
   - Dict-like interface

### 5. **Orchestration**
   - Chain agents into workflows
   - Factory functions for common patterns
   - Detailed execution metadata

### Next Steps

- Explore the `aw.cosmo` module for cosmograph integration
- Build custom agents for your domain
- Integrate with real LLMs (OpenAI, Anthropic) for more sophisticated reasoning
- Add human-in-the-loop with `InteractiveWorkflow`
- Extend with more validation patterns

### Documentation

See the full README and test suite for more examples and patterns!