# Quick Start Guide

This guide will help you get started with AI-Graph in just a few minutes. AI-Graph is a framework for building and managing AI workflows using a pipeline-based approach with the Chain of Responsibility pattern.

## What You'll Learn

- How to create custom pipeline steps
- How to build and run pipelines
- How to chain multiple steps together
- How to work with collections using ForEach steps
- Error handling best practices
- Real-world examples

## Setup and Imports

First, let's import the necessary components from the AI-Graph framework:

In [1]:
# Import the core components from AI-Graph
from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep

# Additional imports for our examples
import json
import time
import re
from typing import Dict, Any

print("AI-Graph framework imported successfully!")

AI-Graph framework imported successfully!


## Your First Pipeline

Let's create a simple pipeline step that processes numerical data. In AI-Graph, all steps work with dictionary data structures and inherit from `BasePipelineStep`:

In [2]:
class DoubleStep(BasePipelineStep):
    """A step that doubles the input value."""

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Double the 'value' key in the input data."""
        if 'value' in data:
            data['value'] = data['value'] * 2
        return data

# Create and run the pipeline
pipeline = Pipeline(name="DoubleValuePipeline")
pipeline.add_step(DoubleStep())

# Input data as a dictionary
input_data = {"value": 5}
result = pipeline.process(input_data)
print(f"Input: {input_data}")
print(f"Result: {result}")

Input: {'value': 10}
Result: {'value': 10}


## Multiple Steps in a Pipeline

You can chain multiple steps together. Each step processes the data and passes it to the next step:

In [3]:
class AddStep(BasePipelineStep):
    def __init__(self, add_value: int, name: str = None):
        super().__init__(name or f"AddStep_{add_value}")
        self.add_value = add_value

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'value' in data:
            data['value'] = data['value'] + self.add_value
        return data

class MultiplyStep(BasePipelineStep):
    def __init__(self, multiply_value: int, name: str = None):
        super().__init__(name or f"MultiplyStep_{multiply_value}")
        self.multiply_value = multiply_value

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'value' in data:
            data['value'] = data['value'] * self.multiply_value
        return data

# Create pipeline with multiple steps
pipeline = Pipeline(name="MathPipeline")
pipeline.add_step(AddStep(10))       # Add 10
pipeline.add_step(MultiplyStep(2))   # Multiply by 2

input_data = {"value": 5}
result = pipeline.process(input_data)
print(f"Input: {input_data}")
print(f"Result: {result}")
print(f"Calculation: (5 + 10) * 2 = {result['value']}")

Input: {'value': 30}
Result: {'value': 30}
Calculation: (5 + 10) * 2 = 30


## Using Built-in Steps

AI-Graph provides several built-in steps for common operations. Let's use `AddKeyStep` and `DelKeyStep`:

In [None]:
# Create a pipeline that adds metadata and processes data
pipeline = Pipeline(name="MetadataPipeline")
pipeline.add_step(AddKeyStep("timestamp", "2025-07-16T10:00:00Z"))
pipeline.add_step(AddKeyStep("processed_by", "AI-Graph"))
pipeline.add_step(DoubleStep())  # Our custom step from before
pipeline.add_step(DelKeyStep("timestamp"))  # Remove timestamp after processing

input_data = {"value": 42, "user_id": "user123"}
result = pipeline.process(input_data)
print(f"Input: {input_data}")
print(f"Result: {result}")

## Working with Collections using ForEach

Use ForEach steps to process collections of items. The ForEach step creates a sub-pipeline that processes each item:

In [4]:
class SquareStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Square the current item in a ForEach iteration."""
        if '_current_item' in data:
            # ForEach puts the current item in '_current_item'
            current_value = data['_current_item']
            data['_current_item'] = current_value ** 2
        return data

# Create main pipeline with ForEach
pipeline = Pipeline(name="SquareNumbersPipeline")

# Create ForEach step that will process each number in the 'numbers' key
foreach_step = ForEachStep(
    items_key="numbers",
    results_key="squared_numbers"
)
foreach_step.add_sub_step(SquareStep())

pipeline.add_step(foreach_step)

input_data = {"numbers": [1, 2, 3, 4, 5]}
result = pipeline.process(input_data)

print(f"Input numbers: {input_data['numbers']}")
print(f"Squared numbers: {[item['_current_item'] for item in result['squared_numbers']]}")

Processing ForEachStep: 100%|██████████| 5/5 [00:00<00:00, 48998.88item/s]

Input numbers: [1, 2, 3, 4, 5]
Squared numbers: [1, 4, 9, 16, 25]





## Error Handling

AI-Graph provides built-in error handling. Let's create a step that might fail and see how to handle it:

In [5]:
class DivideStep(BasePipelineStep):
    def __init__(self, divisor: float, name: str = None):
        super().__init__(name or f"DivideStep_{divisor}")
        self.divisor = divisor

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if self.divisor == 0:
            raise ValueError("Cannot divide by zero")
        
        if 'value' in data:
            data['value'] = data['value'] / self.divisor
        return data

# Test with a safe divisor first
pipeline = Pipeline(name="SafeDivisionPipeline")
pipeline.add_step(DivideStep(2))

input_data = {"value": 10}
result = pipeline.process(input_data)
print(f"Safe division - Input: {input_data}, Result: {result}")

# Now test with zero (this will raise an error)
pipeline_with_error = Pipeline(name="ErrorPipeline")
pipeline_with_error.add_step(DivideStep(0))

try:
    input_data = {"value": 10}
    result = pipeline_with_error.process(input_data)
except ValueError as e:
    print(f"Caught expected error: {e}")

Safe division - Input: {'value': 5.0}, Result: {'value': 5.0}
Caught expected error: Cannot divide by zero


## Real-World Example: Text Processing Pipeline

Here's a more practical example for text processing that demonstrates multiple concepts:

In [6]:
class CleanTextStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Clean text by removing extra whitespace and converting to lowercase."""
        if '_current_item' in data:
            text = data['_current_item']
            # Remove extra whitespace and convert to lowercase
            cleaned = re.sub(r'\s+', ' ', text.strip().lower())
            data['_current_item'] = cleaned
        return data

class CountWordsStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Count words in the text."""
        if '_current_item' in data:
            text = data['_current_item']
            word_count = len(text.split())
            data['word_count'] = word_count
            data['text'] = text  # Keep the cleaned text
        return data

class FilterLongTextsStep(BasePipelineStep):
    def __init__(self, min_words: int = 5, name: str = None):
        super().__init__(name or f"FilterLongTexts_{min_words}")
        self.min_words = min_words

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Filter out texts with fewer than min_words."""
        if 'word_count' in data and data['word_count'] < self.min_words:
            data['filtered_out'] = True
        else:
            data['filtered_out'] = False
        return data

# Create the text processing sub-pipeline
foreach_step = ForEachStep(
    items_key="texts",
    results_key="processed_texts"
)
foreach_step.add_sub_step(CleanTextStep())
foreach_step.add_sub_step(CountWordsStep())
foreach_step.add_sub_step(FilterLongTextsStep(min_words=3))

# Create main pipeline
main_pipeline = Pipeline(name="TextProcessingPipeline")
main_pipeline.add_step(foreach_step)

# Test data
input_data = {
    "texts": [
        "Hello World!",
        "This is a longer text with many words",
        "Short",
        "AI-Graph makes pipeline processing easy and efficient"
    ]
}

result = main_pipeline.process(input_data)

print("Text Processing Results:")
print("=" * 40)
for i, processed_item in enumerate(result['processed_texts']):
    original_text = input_data['texts'][i]
    cleaned_text = processed_item.get('text', 'N/A')
    word_count = processed_item.get('word_count', 0)
    filtered = processed_item.get('filtered_out', False)
    
    print(f"Original: '{original_text}'")
    print(f"Cleaned:  '{cleaned_text}'")
    print(f"Words:    {word_count}")
    print(f"Filtered: {'Yes' if filtered else 'No'}")
    print("-" * 40)

# Extract only non-filtered results
valid_results = [
    item for item in result['processed_texts'] 
    if not item.get('filtered_out', False)
]
print(f"\nValid texts (>= 3 words): {len(valid_results)} out of {len(input_data['texts'])}")

Processing ForEachStep: 100%|██████████| 4/4 [00:00<00:00, 25930.78item/s]

Text Processing Results:
Original: 'Hello World!'
Cleaned:  'hello world!'
Words:    2
Filtered: Yes
----------------------------------------
Original: 'This is a longer text with many words'
Cleaned:  'this is a longer text with many words'
Words:    8
Filtered: No
----------------------------------------
Original: 'Short'
Cleaned:  'short'
Words:    1
Filtered: Yes
----------------------------------------
Original: 'AI-Graph makes pipeline processing easy and efficient'
Cleaned:  'ai-graph makes pipeline processing easy and efficient'
Words:    7
Filtered: No
----------------------------------------

Valid texts (>= 3 words): 2 out of 4





## Progress Tracking with ForEach

The ForEach step automatically shows progress bars for long-running operations. Let's see this in action:

In [8]:
class SlowProcessingStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Simulate slow processing."""
        time.sleep(0.1)  # Simulate slow processing
        if '_current_item' in data:
            data['_current_item'] = data['_current_item'] * 2
        return data

# Create pipeline with progress tracking
foreach_step = ForEachStep(
    items_key="numbers",
    results_key="processed_numbers"
)
foreach_step.add_sub_step(SlowProcessingStep())

pipeline = Pipeline(name="SlowProcessingPipeline")
pipeline.add_step(foreach_step)

# Process a list of numbers with progress tracking
input_data = {"numbers": list(range(10))}
print("Processing numbers with progress tracking:")
result = pipeline.process(input_data)

processed_numbers = [item['_current_item'] for item in result['processed_numbers']]
print(f"\nOriginal: {input_data['numbers']}")
print(f"Doubled:  {processed_numbers}")

Processing numbers with progress tracking:


Processing ForEachStep: 100%|██████████| 10/10 [00:01<00:00,  9.91item/s]


Original: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Doubled:  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]





## Advanced Example: Data Validation Pipeline

Let's create a more complex example that validates and processes user data:

In [7]:
class ValidateEmailStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate email format."""
        if 'email' in data:
            email = data['email']
            # Simple email validation
            email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
            data['email_valid'] = bool(re.match(email_pattern, email))
        return data

class ValidateAgeStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate age is reasonable."""
        if 'age' in data:
            age = data['age']
            data['age_valid'] = isinstance(age, int) and 0 <= age <= 150
        return data

class SummarizeValidationStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Summarize validation results."""
        email_valid = data.get('email_valid', False)
        age_valid = data.get('age_valid', False)
        
        data['is_valid'] = email_valid and age_valid
        data['validation_errors'] = []
        
        if not email_valid:
            data['validation_errors'].append('Invalid email format')
        if not age_valid:
            data['validation_errors'].append('Invalid age')
            
        return data

# Create validation pipeline
validation_pipeline = Pipeline(name="UserValidationPipeline")
validation_pipeline.add_step(ValidateEmailStep())
validation_pipeline.add_step(ValidateAgeStep())
validation_pipeline.add_step(SummarizeValidationStep())

# Test with various user data
test_users = [
    {"name": "John Doe", "email": "john@example.com", "age": 25},
    {"name": "Jane Smith", "email": "invalid-email", "age": 30},
    {"name": "Bob Johnson", "email": "bob@test.com", "age": -5},
    {"name": "Alice Brown", "email": "alice@company.org", "age": 200}
]

print("User Validation Results:")
print("=" * 50)

for user in test_users:
    result = validation_pipeline.process(user.copy())
    
    print(f"Name: {result['name']}")
    print(f"Email: {result['email']} ({'✓' if result['email_valid'] else '✗'})")
    print(f"Age: {result['age']} ({'✓' if result['age_valid'] else '✗'})")
    print(f"Valid: {'✓' if result['is_valid'] else '✗'}")
    
    if result['validation_errors']:
        print(f"Errors: {', '.join(result['validation_errors'])}")
    
    print("-" * 50)

User Validation Results:
Name: John Doe
Email: john@example.com (✓)
Age: 25 (✓)
Valid: ✓
--------------------------------------------------
Name: Jane Smith
Email: invalid-email (✗)
Age: 30 (✓)
Valid: ✗
Errors: Invalid email format
--------------------------------------------------
Name: Bob Johnson
Email: bob@test.com (✓)
Age: -5 (✗)
Valid: ✗
Errors: Invalid age
--------------------------------------------------
Name: Alice Brown
Email: alice@company.org (✓)
Age: 200 (✗)
Valid: ✗
Errors: Invalid age
--------------------------------------------------


## Next Steps

Now that you've learned the basics of AI-Graph, here are some next steps to explore:

### Documentation
- **Concepts**: Learn about the core concepts in detail
- **API Reference**: Full API documentation for all classes and methods

### Best Practices

1. **Keep steps small and focused** - Each step should do one thing well
2. **Use meaningful names** - Name your steps clearly to improve readability
3. **Handle errors gracefully** - Always consider what might go wrong
4. **Test your pipelines** - Write unit tests for your custom steps
5. **Use type hints** - AI-Graph supports full type checking with mypy

### Advanced Features

- **Custom ForEach implementations** - Create specialized iteration patterns
- **Pipeline composition** - Combine multiple pipelines
- **Performance optimization** - Tips for handling large datasets
- **Integration patterns** - How to integrate with other frameworks

## Summary

In this quick start guide, you learned:

✅ How to create custom pipeline steps by extending `BasePipelineStep`  
✅ How to build pipelines and chain multiple steps together  
✅ How to use built-in steps like `AddKeyStep` and `DelKeyStep`  
✅ How to process collections with `ForEachStep`  
✅ Error handling patterns and best practices  
✅ Real-world examples for text processing and data validation  

The AI-Graph framework provides a powerful and flexible way to build processing pipelines that are easy to understand, test, and maintain. Happy coding! 🚀