# Practical Workflow - User Registration Pipeline

This notebook demonstrates a real-world data processing pipeline that validates, enriches, scores, and formats user registration data.

**What You'll Learn:**
- Building practical data validation pipelines
- Parallel processing with fan-out patterns
- Multi-step data enrichment workflows
- Combining results from parallel operations

**Pipeline Steps:**
1. **Validate** - Check required fields and data quality
2. **Enrich** - Add location data and company info (parallel)
3. **Score** - Calculate user quality score
4. **Format** - Prepare final output for database

## Business Scenario

**Problem:**
Your SaaS platform receives user registrations from multiple sources. Each registration needs to be:
- Validated for completeness and correctness
- Enriched with additional data (location, company info)
- Scored for lead quality
- Formatted for storage in your database

**Requirements:**
- Process registrations quickly (parallel enrichment)
- Maintain data quality (validation)
- Calculate lead scores for sales prioritization
- Ensure consistent data format

**Solution:**
A hexDAG pipeline that orchestrates these steps with parallel execution where possible.

## Step 1: Imports

Import the core hexDAG components for building and executing our pipeline:

In [None]:
import asyncio
from datetime import datetime

from hexdag.kernel.domain.dag import DirectedGraph, NodeSpec
from hexdag.kernel.orchestration.orchestrator import Orchestrator

## Step 2: Define Validation Function

The first step validates incoming registration data for required fields and format correctness:

In [None]:
async def validate_registration(input_data: dict) -> dict:
    """Validate user registration data.

    Checks:
    - Required fields present
    - Email format valid
    - Phone number format valid
    """
    await asyncio.sleep(0.1)  # Simulate validation processing

    warnings = []

    # Check required fields
    required = ["email", "name", "company"]
    errors = [f"Missing required field: {field}" for field in required if not input_data.get(field)]

    # Basic email validation
    email = input_data.get("email", "")
    if email and "@" not in email:
        errors.append("Invalid email format")

    # Check phone if provided
    phone = input_data.get("phone")
    if phone and len(phone) < 10:
        warnings.append("Phone number seems incomplete")

    return {
        "validated_data": input_data,
        "is_valid": len(errors) == 0,
        "errors": errors,
        "warnings": warnings,
        "validation_timestamp": datetime.now().isoformat(),
    }

## Step 3: Define Location Enrichment Function

Enriches registration data with geographic information based on phone number or email domain:

In [None]:
async def enrich_location(input_data: dict) -> dict:
    """Enrich with location data.

    Simulates API call to location service.
    Runs in parallel with company enrichment.
    """
    await asyncio.sleep(0.2)  # Simulate API call

    validated_data = input_data.get("validate_registration", {}).get("validated_data", {})
    email = validated_data.get("email", "")

    # Simple location inference from email domain
    if ".uk" in email:
        location = {"country": "UK", "region": "Europe", "timezone": "GMT"}
    elif ".de" in email:
        location = {"country": "Germany", "region": "Europe", "timezone": "CET"}
    elif ".jp" in email:
        location = {"country": "Japan", "region": "Asia", "timezone": "JST"}
    else:
        location = {"country": "USA", "region": "North America", "timezone": "PST"}

    return {"location_data": location, "enrichment_source": "location_api", "confidence": 0.85}

## Step 4: Define Company Enrichment Function

Enriches registration data with company information from external databases:

In [None]:
async def enrich_company(input_data: dict) -> dict:
    """Enrich with company data.

    Simulates API call to company database.
    Runs in parallel with location enrichment.
    """
    await asyncio.sleep(0.25)  # Simulate database query

    validated_data = input_data.get("validate_registration", {}).get("validated_data", {})
    company = validated_data.get("company", "")

    # Simulate company data lookup
    company_data = {
        "company_name": company,
        "industry": "Technology",
        "size": "50-200 employees",
        "founded": 2018,
        "is_verified": True,
    }

    return {
        "company_data": company_data,
        "enrichment_source": "company_database",
        "last_updated": datetime.now().isoformat(),
    }

## Step 5: Define Scoring Function

Calculates a lead quality score based on validation results and enrichment data:

In [None]:
async def calculate_score(input_data: dict) -> dict:
    """Calculate user quality score.

    Scoring factors:
    - Data completeness (30%)
    - Company verification (40%)
    - Location confidence (30%)
    """
    await asyncio.sleep(0.1)  # Simulate scoring computation

    # Extract data from dependencies
    validation = input_data.get("validate_registration", {})
    location = input_data.get("enrich_location", {})
    company = input_data.get("enrich_company", {})

    score = 0
    factors = {}

    # Data completeness score (0-30 points)
    validated_data = validation.get("validated_data", {})
    completeness = len([v for v in validated_data.values() if v]) / max(len(validated_data), 1)
    completeness_score = completeness * 30
    score += completeness_score
    factors["completeness"] = completeness_score

    # Company verification score (0-40 points)
    is_verified = company.get("company_data", {}).get("is_verified", False)
    verification_score = 40 if is_verified else 10
    score += verification_score
    factors["verification"] = verification_score

    # Location confidence score (0-30 points)
    confidence = location.get("confidence", 0)
    confidence_score = confidence * 30
    score += confidence_score
    factors["location_confidence"] = confidence_score

    # Determine quality tier
    if score >= 80:
        tier = "premium"
    elif score >= 60:
        tier = "standard"
    else:
        tier = "basic"

    return {
        "quality_score": round(score, 2),
        "quality_tier": tier,
        "score_factors": factors,
        "scored_at": datetime.now().isoformat(),
    }

## Step 6: Define Formatting Function

Formats all collected data into a consistent structure ready for database storage:

In [None]:
async def format_output(input_data: dict) -> dict:
    """Format final output for database storage.

    Combines all pipeline results into a single structured record.
    """
    await asyncio.sleep(0.05)  # Simulate formatting

    # Extract all dependency results
    validation = input_data.get("validate_registration", {})
    location = input_data.get("enrich_location", {})
    company = input_data.get("enrich_company", {})
    scoring = input_data.get("calculate_score", {})

    # Build final record
    validated_data = validation.get("validated_data", {})

    final_record = {
        "user_info": {
            "email": validated_data.get("email"),
            "name": validated_data.get("name"),
            "phone": validated_data.get("phone"),
        },
        "company_info": company.get("company_data", {}),
        "location_info": location.get("location_data", {}),
        "quality": {
            "score": scoring.get("quality_score"),
            "tier": scoring.get("quality_tier"),
        },
        "validation": {
            "is_valid": validation.get("is_valid"),
            "errors": validation.get("errors", []),
            "warnings": validation.get("warnings", []),
        },
        "metadata": {
            "processed_at": datetime.now().isoformat(),
            "pipeline_version": "1.0.0",
            "enrichment_sources": [
                location.get("enrichment_source"),
                company.get("enrichment_source"),
            ],
        },
    }

    return {
        "final_record": final_record,
        "ready_for_storage": validation.get("is_valid", False),
        "processing_complete": True,
    }

## Step 7: Build the DAG

Now we construct the directed acyclic graph that defines our pipeline structure:

```
validate_registration
         |
    +----+----+
    |         |
  location  company  (parallel enrichment)
    |         |
    +----+----+
         |
    calculate_score
         |
    format_output
```

In [None]:
print("üìä Building User Registration Pipeline...")
print()

# Create the graph
graph = DirectedGraph()

# Wave 1: Validation (entry point)
validate_node = NodeSpec("validate_registration", validate_registration)
graph += validate_node
print("‚úÖ Wave 1: validate_registration")

# Wave 2: Parallel enrichment (fan-out pattern)
location_node = NodeSpec("enrich_location", enrich_location).after("validate_registration")
company_node = NodeSpec("enrich_company", enrich_company).after("validate_registration")

graph += [location_node, company_node]
print("‚úÖ Wave 2: enrich_location, enrich_company (PARALLEL)")

# Wave 3: Scoring (fan-in pattern)
score_node = NodeSpec("calculate_score", calculate_score).after(
    "validate_registration", "enrich_location", "enrich_company"
)
graph += score_node
print("‚úÖ Wave 3: calculate_score (waits for all enrichment)")

# Wave 4: Final formatting
format_node = NodeSpec("format_output", format_output).after(
    "validate_registration", "enrich_location", "enrich_company", "calculate_score"
)
graph += format_node
print("‚úÖ Wave 4: format_output (final step)")

print()
print("üîç Validating pipeline structure...")
graph.validate()
print("   ‚úÖ Pipeline validation passed!")

## Step 8: Visualize Execution Waves

Let's examine how the pipeline will execute in waves:

In [None]:
print("üåä Execution Wave Analysis:")
print()
waves = graph.waves()
for i, wave in enumerate(waves, 1):
    if len(wave) == 1:
        print(f"   Wave {i}: {wave[0]} (sequential)")
    else:
        print(f"   Wave {i}: {', '.join(wave)} (PARALLEL)")

print()
print(f"‚ö° Total waves: {len(waves)}")
print(f"   üöÄ Wave 2 processes {len(waves[1])} nodes in parallel!")
print("   ‚è±Ô∏è  This reduces total execution time significantly")

## Step 9: Prepare Sample Registration Data

Create realistic test data representing a new user registration:

In [None]:
# Sample registration data
sample_registration = {
    "email": "sarah.johnson@techstartup.uk",
    "name": "Sarah Johnson",
    "company": "TechStartup Ltd",
    "phone": "+44-20-1234-5678",
    "role": "Engineering Manager",
    "source": "website_signup",
}

print("üìù Sample Registration Data:")
print()
for key, value in sample_registration.items():
    print(f"   {key}: {value}")

## Step 10: Execute the Pipeline

Run the complete pipeline with our sample data:

In [None]:
import time

print("üöÄ Executing User Registration Pipeline...")
print()

# Create orchestrator and execute
orchestrator = Orchestrator()

start_time = time.time()
results = await orchestrator.run(graph, sample_registration)
end_time = time.time()

execution_time = end_time - start_time
print("‚úÖ Pipeline execution complete!")
print(f"‚è±Ô∏è  Total time: {execution_time:.3f} seconds")
print()
print("   Note: Parallel enrichment (Wave 2) saved significant time!")

## Step 11: Examine Validation Results

Let's look at the validation step output:

In [None]:
validation_result = results["validate_registration"]

print("üîç Validation Results:")
print("=" * 60)
print(f"Is Valid: {validation_result['is_valid']}")
print(f"Errors: {validation_result['errors'] if validation_result['errors'] else 'None'}")
print(f"Warnings: {validation_result['warnings'] if validation_result['warnings'] else 'None'}")
print(f"Timestamp: {validation_result['validation_timestamp']}")

## Step 12: Examine Enrichment Results

View the parallel enrichment data (location and company):

In [None]:
location_result = results["enrich_location"]
company_result = results["enrich_company"]

print("üåç Location Enrichment:")
print("=" * 60)
for key, value in location_result["location_data"].items():
    print(f"{key}: {value}")
print(f"Confidence: {location_result['confidence']}")
print()

print("üè¢ Company Enrichment:")
print("=" * 60)
for key, value in company_result["company_data"].items():
    print(f"{key}: {value}")
print()
print("üí° These two enrichments ran in parallel (Wave 2)!")

## Step 13: Examine Scoring Results

View the calculated quality score and breakdown:

In [None]:
score_result = results["calculate_score"]

print("üìä Quality Scoring:")
print("=" * 60)
print(f"Overall Score: {score_result['quality_score']}/100")
print(f"Quality Tier: {score_result['quality_tier'].upper()}")
print()
print("Score Breakdown:")
for factor, value in score_result["score_factors"].items():
    print(f"  {factor}: {value:.2f} points")
print()
print(f"Scored at: {score_result['scored_at']}")

## Step 14: View Final Formatted Output

This is the complete, structured record ready for database storage:

In [None]:
final_result = results["format_output"]

print("üì¶ Final Formatted Record:")
print("=" * 60)
print()

record = final_result["final_record"]

print("üë§ User Info:")
for key, value in record["user_info"].items():
    print(f"  {key}: {value}")
print()

print("üè¢ Company Info:")
for key, value in record["company_info"].items():
    print(f"  {key}: {value}")
print()

print("üåç Location Info:")
for key, value in record["location_info"].items():
    print(f"  {key}: {value}")
print()

print("‚≠ê Quality:")
for key, value in record["quality"].items():
    print(f"  {key}: {value}")
print()

print(f"Ready for Storage: {final_result['ready_for_storage']}")
print(f"Processing Complete: {final_result['processing_complete']}")

## Step 15: Test with Invalid Data

Let's see how the pipeline handles incomplete registration data:

In [None]:
# Incomplete registration - missing company
invalid_registration = {
    "email": "incomplete-user",  # Invalid format
    "name": "Test User",
    # Missing: company
    "phone": "123",  # Too short
}

print("üî¥ Testing with Invalid Data:")
print()
for key, value in invalid_registration.items():
    print(f"   {key}: {value}")

print()
print("üöÄ Executing pipeline...")
invalid_results = await orchestrator.run(graph, invalid_registration)
print("   ‚úÖ Execution complete (with validation errors)")

## Step 16: View Validation Errors

Examine how validation caught the data quality issues:

In [None]:
invalid_validation = invalid_results["validate_registration"]
invalid_final = invalid_results["format_output"]

print("‚ùå Validation Issues Found:")
print("=" * 60)
print(f"Is Valid: {invalid_validation['is_valid']}")
print()

if invalid_validation["errors"]:
    print("Errors:")
    for error in invalid_validation["errors"]:
        print(f"  - {error}")
print()

if invalid_validation["warnings"]:
    print("Warnings:")
    for warning in invalid_validation["warnings"]:
        print(f"  - {warning}")
print()

print(f"Ready for Storage: {invalid_final['ready_for_storage']}")
print()
print("üí° Pipeline processed the data but flagged it as invalid!")

## Step 17: Compare Quality Scores

See how data quality affects the calculated score:

In [None]:
valid_score = results["calculate_score"]
invalid_score = invalid_results["calculate_score"]

print("üìä Quality Score Comparison:")
print("=" * 60)
print()

print("‚úÖ Valid Registration:")
print(f"   Score: {valid_score['quality_score']}/100")
print(f"   Tier: {valid_score['quality_tier'].upper()}")
print()

print("‚ùå Invalid Registration:")
print(f"   Score: {invalid_score['quality_score']}/100")
print(f"   Tier: {invalid_score['quality_tier'].upper()}")
print()

score_diff = valid_score["quality_score"] - invalid_score["quality_score"]
print(f"üìâ Score difference: {score_diff:.2f} points")
print()
print("üí° Data quality directly impacts lead scoring!")

## Step 18: Performance Analysis

Understand the performance benefits of parallel execution:

In [None]:
print("‚ö° Performance Analysis:")
print("=" * 60)
print()

# Simulated timings from our async sleep calls
sequential_time = 0.1 + 0.2 + 0.25 + 0.1 + 0.05  # All steps sequential
parallel_time = 0.1 + max(0.2, 0.25) + 0.1 + 0.05  # Enrichment in parallel

print(f"If executed sequentially: ~{sequential_time:.2f}s")
print(f"With parallel enrichment: ~{parallel_time:.2f}s")
print()

speedup = (sequential_time / parallel_time - 1) * 100
print(f"‚ö° Speedup: ~{speedup:.1f}% faster")
print()

print("üåä Execution Wave Breakdown:")
print("   Wave 1: validate_registration (0.1s)")
print("   Wave 2: enrich_location + enrich_company (0.25s parallel)")
print("   Wave 3: calculate_score (0.1s)")
print("   Wave 4: format_output (0.05s)")
print()
print("üí° Parallel execution in Wave 2 saves time!")

## Step 19: Examine Pipeline Metadata

Review the processing metadata included in the final output:

In [None]:
metadata = results["format_output"]["final_record"]["metadata"]

print("üìã Pipeline Metadata:")
print("=" * 60)
print(f"Processed At: {metadata['processed_at']}")
print(f"Pipeline Version: {metadata['pipeline_version']}")
print()
print("Enrichment Sources:")
for source in metadata["enrichment_sources"]:
    print(f"  - {source}")
print()
print("üí° Metadata enables audit trails and debugging!")

## Step 20: Summary and Key Takeaways

Review what we've learned from this practical pipeline:

In [None]:
print("üéØ Key Concepts Demonstrated:")
print("=" * 60)
print()

print("‚úÖ Pipeline Patterns:")
print("   ‚Ä¢ Sequential validation before enrichment")
print("   ‚Ä¢ Fan-out: One validation ‚Üí Multiple parallel enrichments")
print("   ‚Ä¢ Fan-in: Multiple enrichments ‚Üí Single scoring step")
print("   ‚Ä¢ Final aggregation: All results ‚Üí Formatted output")
print()

print("‚úÖ Parallel Execution:")
print("   ‚Ä¢ Location and company enrichment run simultaneously")
print("   ‚Ä¢ Reduces total pipeline execution time")
print("   ‚Ä¢ Wave-based execution for optimal performance")
print()

print("‚úÖ Data Quality:")
print("   ‚Ä¢ Validation catches errors early")
print("   ‚Ä¢ Quality scoring prioritizes leads")
print("   ‚Ä¢ Pipeline processes both valid and invalid data")
print()

print("‚úÖ Real-World Features:")
print("   ‚Ä¢ Metadata for audit trails")
print("   ‚Ä¢ Structured output for database storage")
print("   ‚Ä¢ Error handling and warnings")
print("   ‚Ä¢ Multi-source data enrichment")
print()

print("üîó Next Steps:")
print("   ‚Ä¢ Add conditional logic (e.g., skip enrichment if invalid)")
print("   ‚Ä¢ Integrate with real APIs (location, company databases)")
print("   ‚Ä¢ Add retry logic for failed enrichments")
print("   ‚Ä¢ Convert to YAML pipeline for declarative configuration")
print("   ‚Ä¢ Add LLM-based data extraction or classification")