Skip to content

liferoad/df-agent

Repository files navigation

DF-Agent - Multi-Purpose ADK MCP Implementation

This project implements multiple Google Agent Development Kit (ADK) agents using the Model Context Protocol (MCP) for various data processing and pipeline management tasks.

Available Agents

1. Dataflow Coordinator Agent (Multi-Agent System)

A sophisticated multi-agent coordinator that orchestrates the complete Dataflow pipeline lifecycle using the ADK agent hierarchy pattern 0. This coordinator manages two specialized sub-agents:

  • BeamYAMLPipelineAgent: Handles pipeline generation and validation
  • DataflowStatusAgent: Manages job monitoring and troubleshooting

The coordinator provides intelligent task delegation, end-to-end workflow management, and coordinated responses across both pipeline development and job management domains.

2. Dataflow Job Management Agent

Monitors Google Cloud Dataflow jobs with capabilities to check job status, list jobs, and retrieve logs for failed jobs.

3. Beam YAML Pipeline Agent

Generates, validates, and manages Apache Beam YAML pipelines with comprehensive support for creating data processing pipelines using natural language descriptions.

4. Beam YAML Guide Agent

Provides step-by-step interactive guidance for creating Apache Beam YAML pipelines. This specialized agent systematically gathers all necessary information about pipeline structure, validates user inputs, and generates properly formatted YAML configurations through a structured 5-phase workflow.

5. YAML RAG Agent

An intelligent example search agent that uses Retrieval-Augmented Generation (RAG) with a local Chroma database to find similar YAML examples based on user requirements. This agent indexes YAML examples from the Apache Beam repository and provides similarity-based search capabilities to help users discover relevant pipeline patterns and configurations.

Architecture

The implementation follows multiple ADK patterns:

Multi-Agent System Architecture

The Dataflow Coordinator Agent implements the ADK multi-agent system pattern 0:

  1. Parent Agent (agents/dataflow_coordinator/agent.py): Coordinates and delegates tasks
  2. Sub-Agents: Specialized agents with distinct capabilities
    • BeamYAMLPipelineAgent: Pipeline generation and validation
    • DataflowStatusAgent: Job monitoring and management
  3. Agent Hierarchy: Parent-child relationships enable structured task delegation and context sharing

Traditional MCP Integration Pattern

Individual agents follow the ADK MCP integration pattern:

  1. MCP Servers: Wrap external tools and services
    • mcp_servers/dataflow_jobs.py: Google Cloud CLI commands for Dataflow operations
    • mcp_servers/beam_yaml.py: Beam YAML pipeline tools and validation
  2. ADK Agents: Use McpToolset to connect to MCP servers and provide intelligent capabilities

Features

Dataflow Coordinator Agent (Multi-Agent System)

  • Intelligent Task Delegation: Automatically routes requests to appropriate sub-agents based on context
  • End-to-End Workflow Management: Coordinates complete pipeline lifecycle from generation to monitoring
  • Sequential & Parallel Coordination: Manages complex workflows involving multiple agents
  • Cross-Domain Error Handling: Coordinates troubleshooting across pipeline and job management domains
  • Unified Interface: Single point of interaction for all Dataflow-related tasks
  • Agent Hierarchy Navigation: Leverages ADK parent-child relationships for structured task management
  • Context Sharing: Enables information flow between specialized sub-agents
  • Workflow Orchestration: Supports both sequential pipeline development and parallel monitoring tasks

Dataflow Job Management Agent

  • Job Status Checking: Get detailed status information for specific Dataflow jobs
  • Job Listing: List active, terminated, or failed jobs with filtering options
  • Log Retrieval: Fetch logs for failed jobs to identify error messages
  • Intelligent Analysis: AI-powered analysis of job failures and troubleshooting suggestions

Beam YAML Pipeline Agent

  • Pipeline Generation: Generate complete Beam YAML pipelines from natural language descriptions
  • Schema Management: Look up input/output schemas for IO connectors with detailed documentation
  • Validation & Quality Assurance: YAML syntax validation, pipeline structure validation, and error detection
  • Pipeline Submission: Submit Beam YAML pipelines directly to Google Cloud Dataflow with comprehensive validation
  • Transform Documentation: Comprehensive documentation for all Beam transforms with examples
  • Multi-Format Support: Support for BigQuery, PubSub, CSV, Text, Parquet, JSON, and database connectors
  • Dry Run Validation: Test pipeline configurations without actual submission to catch issues early

Beam YAML Guide Agent

  • Interactive Step-by-Step Guidance: Systematic 5-phase workflow for pipeline creation (Requirements → Source → Transforms → Sink → Generation)
  • Focused Question Flow: One-question-at-a-time approach with clear options and explanations
  • Comprehensive Input Validation: Validates user responses against Beam YAML requirements with specific error messages
  • Educational Approach: Explains concepts and decisions throughout the pipeline creation process
  • Production-Ready Output: Generates complete, properly formatted YAML pipelines with error handling
  • User-Friendly Design: Makes complex pipeline creation accessible to users of all skill levels
  • Systematic Information Gathering: Methodically collects all necessary configuration details for sources, transforms, and sinks

YAML RAG Agent

  • Intelligent Example Search: Uses Chroma vector database to find similar YAML examples based on semantic similarity
  • Apache Beam Repository Integration: Automatically indexes YAML examples from the official Apache Beam repository
  • Similarity-Based Retrieval: Performs vector similarity searches to find the most relevant examples for user queries
  • Database Management: Provides tools to index, search, clear, and get statistics about the example database
  • Educational Guidance: Helps users discover relevant pipeline patterns and learn from existing examples
  • Real-Time Indexing: Can dynamically update the database with new examples from the repository
  • Contextual Recommendations: Provides detailed analysis and recommendations based on retrieved examples

Prerequisites

  1. Google Cloud SDK: Install and configure gcloud CLI

    # Install Google Cloud SDK
    curl https://sdk.cloud.google.com | bash
    exec -l $SHELL
    
    # Authenticate and set project
    gcloud auth login
    gcloud config set project YOUR_PROJECT_ID
  2. Python Environment: Python 3.9 or higher

    python -m venv venv
    source venv/bin/activate  # On Windows: venv\Scripts\activate
    pip install -r requirements.txt
  3. ADK Setup: Follow the ADK quickstart guide

Configuration

  1. Environment Variables: Copy and configure the .env file:

    cp .env.example .env

    Update the following variables:

    GOOGLE_CLOUD_PROJECT=your-project-id
    GOOGLE_CLOUD_REGION=us-central1
    GOOGLE_AI_API_KEY=your-google-ai-api-key
  2. Google Cloud Authentication: Ensure you have proper authentication:

    # Option 1: Use gcloud default credentials
    gcloud auth application-default login
    
    # Option 2: Use service account key (set in .env)
    export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json
  3. YAML RAG Setup (Optional - for YAML RAG Agent): Index Apache Beam YAML examples:

    # Index YAML examples from Apache Beam repository (one-time setup)
    python scripts/index_yaml_examples.py --verbose
    
    # This will:
    # - Download Apache Beam repository (~80MB)
    # - Extract and index YAML examples into ChromaDB
    # - Enable similarity search for pipeline patterns
    
    # Optional: Force re-indexing if examples are updated
    python scripts/index_yaml_examples.py --force
    
    # Check indexing options
    python scripts/index_yaml_examples.py --help

Usage

Running with ADK Web

  1. Start the ADK web interface from the agents directory:

    cd agents
    adk web
  2. Navigate to the agent in your browser and interact with it using natural language:

    Dataflow Coordinator Agent Examples:

    • "I need to create a Beam YAML pipeline that reads from BigQuery and writes to PubSub, then monitor its execution"
    • "Generate a pipeline for real-time data processing and check if any similar jobs are currently running"
    • "My pipeline job failed, can you help troubleshoot and suggest improvements?"
    • "Create a batch processing pipeline and show me the status of all recent jobs"

    Individual Agent Examples:

    • "Check the status of job 2024-01-15_12_00_00-1234567890123456789"
    • "List all failed Dataflow jobs"
    • "Show me the logs for the failed job xyz-123"

    Beam YAML Guide Agent Examples:

    • "I want to create a new Beam YAML pipeline" (starts interactive guidance)
    • "Help me build a pipeline step by step"
    • "Guide me through creating a data processing pipeline"

    YAML RAG Agent Examples:

    • "Find YAML examples similar to reading from BigQuery and writing to PubSub"
    • "Show me examples of streaming data processing pipelines"
    • "What are some common patterns for batch processing in Beam YAML?"
    • "Find examples that use SQL transforms"

Direct Python Usage

Dataflow Coordinator Agent (Recommended)

from agents.dataflow_coordinator.agent import create_dataflow_coordinator_agent

# Create the coordinator agent
coordinator = create_dataflow_coordinator_agent()

# The coordinator automatically delegates to appropriate sub-agents
# Pipeline generation requests → BeamYAMLPipelineAgent
# Job monitoring requests → DataflowStatusAgent
# Complex workflows → Coordinated execution

# Example: End-to-end workflow
response = coordinator.run(
    "Create a pipeline that processes streaming data from PubSub "
    "and then check if there are any similar jobs currently running"
)
print(response)

Dataflow Job Management Agent

from agents.dataflow_job_management.agent import create_dataflow_agent

# Create the agent
agent = create_dataflow_agent()

# Example interaction
response = agent.run("Check the status of job 2024-01-15_12_00_00-1234567890123456789")
print(response)

Beam YAML Pipeline Agent

from agents.beam_yaml_pipeline.agent import create_beam_yaml_agent

# Create the agent
agent = create_beam_yaml_agent()

# Example interactions
response = agent.run("Create a pipeline that reads from BigQuery, filters records where age > 18, and writes to PubSub")
print(response)

# Schema lookup
response = agent.run("Show me the schema for ReadFromBigQuery connector")
print(response)

# Pipeline validation
response = agent.run("Validate this YAML pipeline configuration: [YAML content]")
print(response)

# Pipeline submission to Dataflow
response = agent.run("Submit this pipeline to Dataflow with job name 'my-pipeline' in project 'my-gcp-project' using staging location 'gs://my-bucket/staging' and temp location 'gs://my-bucket/temp'")
print(response)

# Dry run validation before submission
response = agent.run("Do a dry run validation of this pipeline before submitting to Dataflow")
print(response)

Beam YAML Guide Agent

from agents.beam_yaml_guide.agent import create_beam_yaml_guide_agent

# Create the agent
aguide = create_beam_yaml_guide_agent()

# Start interactive pipeline creation
response = guide.run("I want to create a new Beam YAML pipeline")
print(response)

# The agent will guide you through:
# 1. Requirements gathering
# 2. Data source configuration
# 3. Transformation design
# 4. Output destination setup
# 5. Pipeline generation and validation

# Example guided interaction:
response = guide.run("Help me build a pipeline that processes sales data")
print(response)
# Agent: "Welcome! I'll help you create a Beam YAML pipeline step by step.
#         First, can you describe what you want your pipeline to do?"

# Continue the conversation based on agent's questions
response = guide.run("I want to read sales data from BigQuery and write summaries to Cloud Storage")
print(response)
# Agent will ask specific questions about BigQuery table, transformations needed, etc.

YAML RAG Agent

from agents.yaml_rag.agent import create_yaml_rag_agent

# Create the agent
rag_agent = create_yaml_rag_agent()

# Index YAML examples from Apache Beam repository
response = rag_agent.run("Index YAML examples from the Apache Beam repository")
print(response)

# Search for similar examples
response = rag_agent.run("Find examples similar to reading from BigQuery and writing to PubSub")
print(response)

# Get database statistics
response = rag_agent.run("Show me statistics about the indexed examples")
print(response)

# Search for specific patterns
response = rag_agent.run("What are some common patterns for streaming data processing?")
print(response)

MCP Tools Available

Dataflow Job Management Tools

The Dataflow MCP server provides three main tools:

1. check_dataflow_job_status

  • Purpose: Get detailed status of a specific job
  • Parameters:
    • job_id (required): The Dataflow job ID
    • project_id (required): Google Cloud project ID
    • region (optional, default: us-central1): Google Cloud region

2. list_dataflow_jobs

  • Purpose: List Dataflow jobs with filtering
  • Parameters:
    • project_id (required): Google Cloud project ID
    • region (optional, default: us-central1): Google Cloud region
    • status (optional): Filter by status (active, terminated, failed, all)
    • limit (optional): Maximum number of jobs to return

3. get_dataflow_job_logs

  • Purpose: Retrieve logs for a specific job
  • Parameters:
    • job_id (required): The Dataflow job ID
    • project_id (required): Google Cloud project ID
    • region (optional, default: us-central1): Google Cloud region
    • severity (optional, default: INFO): Minimum log severity (DEBUG, INFO, WARNING, ERROR)

Beam YAML Pipeline Tools

The Beam YAML MCP server provides six main tools:

1. get_beam_yaml_transforms

  • Purpose: List available Beam YAML transforms by category
  • Parameters:
    • category (optional): Filter transforms by category (all, io, transform, ml, sql)

2. get_transform_details

  • Purpose: Get detailed documentation for a specific transform
  • Parameters:
    • transform_name (required): Name of the transform to get details for

3. validate_beam_yaml

  • Purpose: Validate a Beam YAML pipeline configuration
  • Parameters:
    • yaml_content (required): The YAML pipeline content to validate

4. generate_beam_yaml_pipeline

  • Purpose: Generate a Beam YAML pipeline based on requirements
  • Parameters:
    • description (required): Natural language description of pipeline requirements
    • source_type (optional): Type of data source (BigQuery, PubSub, Text, CSV)
    • sink_type (optional): Type of data sink (BigQuery, PubSub, Text, CSV)
    • transformations (optional): List of transformations to apply

5. get_io_connector_schema

  • Purpose: Get input/output schema information for IO connectors
  • Parameters:
    • connector_name (required): Name of the IO connector (e.g., ReadFromBigQuery, WriteToText)

6. submit_dataflow_yaml_pipeline

  • Purpose: Submit a Beam YAML pipeline to Google Cloud Dataflow using gcloud CLI
  • Parameters:
    • yaml_content (required): The YAML pipeline content to submit
    • job_name (required): Name for the Dataflow job (must be unique)
    • project_id (required): Google Cloud project ID
    • region (optional, default: us-central1): Google Cloud region
    • staging_location (required): GCS bucket for staging files (gs://bucket-name/staging)
    • temp_location (required): GCS bucket for temporary files (gs://bucket-name/temp)
    • service_account_email (optional): Service account email for the job
    • max_workers (optional, default: 10): Maximum number of workers
    • machine_type (optional, default: n1-standard-1): Machine type for workers
    • network (optional): VPC network for the job
    • subnetwork (optional): VPC subnetwork for the job
    • additional_experiments (optional): Additional Dataflow experiments to enable
    • dry_run (optional, default: false): Validate the pipeline without actually submitting it

YAML RAG Tools

The YAML RAG MCP server provides four main tools:

1. index_yaml_examples

  • Purpose: Check if YAML examples are indexed and provide guidance for indexing
  • Parameters: None
  • Note: This tool now provides instructions to run the separate indexing script instead of downloading directly

To index examples, run the separate script:

# One-time setup to index Apache Beam YAML examples
python scripts/index_yaml_examples.py --verbose

# Force re-indexing if needed
python scripts/index_yaml_examples.py --force

# Custom database path
python scripts/index_yaml_examples.py --db-path /custom/path

2. search_similar_examples

  • Purpose: Search for similar YAML examples based on a query
  • Parameters:
    • query (required): Natural language description of what you're looking for
    • num_results (optional, default: 5): Number of similar examples to return

3. get_database_stats

  • Purpose: Get statistics about the indexed examples database
  • Parameters: None

4. clear_database

  • Purpose: Clear all indexed examples from the database
  • Parameters: None

Example Interactions

Dataflow Job Management Agent

Check Job Status

User: "What's the status of job 2024-01-15_12_00_00-1234567890123456789?"

Agent: I'll check the status of that Dataflow job for you.

[Agent uses check_dataflow_job_status tool]

Dataflow Job Status Report:

Job ID: 2024-01-15_12_00_00-1234567890123456789
Job Name: my-dataflow-pipeline
Current State: JOB_STATE_FAILED
Job Type: JOB_TYPE_BATCH
Location: us-central1
Created: 2024-01-15T12:00:00.000Z
Started: 2024-01-15T12:01:00.000Z
Ended: 2024-01-15T12:15:30.000Z

Error Information:
Stage 'Transform Data' failed

The job failed during the 'Transform Data' stage. Let me get the logs to provide more details about the failure.

List Failed Jobs

User: "Show me all failed Dataflow jobs from today"

Agent: I'll list all failed Dataflow jobs for you.

[Agent uses list_dataflow_jobs tool with status='failed']

Dataflow Jobs (showing failed jobs):

• Job ID: 2024-01-15_12_00_00-1234567890123456789
  Name: my-dataflow-pipeline
  State: JOB_STATE_FAILED
  Type: JOB_TYPE_BATCH
  Created: 2024-01-15T12:00:00.000Z

• Job ID: 2024-01-15_10_30_00-9876543210987654321
  Name: data-processing-job
  State: JOB_STATE_FAILED
  Type: JOB_TYPE_STREAMING
  Created: 2024-01-15T10:30:00.000Z

Beam YAML Pipeline Agent

Pipeline Generation Request

User: "I need a pipeline that reads sales data from BigQuery, calculates daily totals by region, and writes the results to another BigQuery table."

Agent: I'll generate a complete Beam YAML pipeline for your sales data processing requirements.

[Agent uses generate_beam_yaml_pipeline tool]

Generated Beam YAML Pipeline:

```yaml
pipeline:
  transforms:
    - name: ReadSalesData
      type: ReadFromBigQuery
      config:
        table: "your-project:sales_dataset.daily_sales"
    - name: GroupByRegion
      type: Combine
      input: ReadSalesData
      config:
        group_by: ["region", "date"]
        combine:
          total_sales:
            sum: "sales_amount"
          transaction_count:
            count: "*"
    - name: WriteResults
      type: WriteToBigQuery
      input: GroupByRegion
      config:
        table: "your-project:analytics_dataset.regional_daily_totals"
        create_disposition: "CREATE_IF_NEEDED"
        write_disposition: "WRITE_APPEND"

📝 Next Steps:

  1. Replace placeholder values (your-project, sales_dataset, etc.) with actual values
  2. Customize transform configurations based on your specific requirements
  3. Validate the pipeline using the validate_beam_yaml tool
  4. Test the pipeline with a small dataset first

#### Schema Information Request

User: "What's the schema for ReadFromPubSub?"

Agent: I'll provide detailed schema information for the ReadFromPubSub connector.

[Agent uses get_io_connector_schema tool]

Schema Information for ReadFromPubSub

Output Schema

Row(data: bytes, attributes: map[string, string], timestamp: timestamp)

Example Output:

Row(data: b'message content', attributes: {'key': 'value'}, timestamp: 2023-01-01T00:00:00Z)

Configuration Parameters

  • topic: string (projects/project/topics/topic) - PubSub topic
  • subscription: string (projects/project/subscriptions/sub) - PubSub subscription
  • id_label: string (optional) - Attribute for deduplication
  • timestamp_attribute: string (optional) - Attribute containing event timestamp

Usage Tips

  • Ensure PubSub API is enabled in your Google Cloud project
  • Use subscriptions for ReadFromPubSub to ensure message delivery
  • Consider setting id_label for exactly-once processing

#### Pipeline Validation

User: "Please validate this YAML pipeline configuration: [YAML content]"

Agent: I'll validate your Beam YAML pipeline configuration.

[Agent uses validate_beam_yaml tool]

✅ YAML pipeline validation passed!

Warnings:

  • Transform 1 (ReadFromBigQuery) should specify either 'table' or 'query'
  • Consider adding error handling transforms for production use

Recommendations:

  • Add descriptive names to your transforms
  • Include proper error handling and monitoring
  • Test with a small dataset before full deployment

#### Transform Documentation Request

User: "Show me how to use the Filter transform"

Agent: I'll provide detailed documentation for the Filter transform.

[Agent uses get_transform_details tool]

Transform: Filter

Description: Filters elements based on a condition

Configuration:

  • condition: Expression or callable to filter elements
  • language: Language for the condition (python, javascript, etc.)

Example:

type: Filter
input: InputData
config:
  condition: "element.age > 18"
  language: python

#### Pipeline Submission to Dataflow

User: "Submit this pipeline to Dataflow with job name 'sales-analysis-pipeline' in my project 'my-gcp-project'"

Agent: I'll submit your Beam YAML pipeline to Google Cloud Dataflow.

[Agent uses submit_dataflow_yaml_pipeline tool]

✅ Pipeline submitted successfully!

Job Details:

  • Job Name: sales-analysis-pipeline
  • Project: my-gcp-project
  • Region: us-central1
  • Staging Location: gs://my-bucket/staging
  • Temp Location: gs://my-bucket/temp

Monitoring:

Your pipeline is now running on Dataflow. You can monitor its progress using the provided links.


#### Pipeline Dry Run Validation

User: "Can you do a dry run validation of this pipeline before submitting it?"

Agent: I'll perform a dry run validation of your pipeline to check for any issues before actual submission.

[Agent uses submit_dataflow_yaml_pipeline tool with dry_run=true]

✅ Pipeline validation successful!

Job Details:

  • Job Name: sales-analysis-pipeline-dryrun
  • Project: my-gcp-project
  • Region: us-central1
  • Staging Location: gs://my-bucket/staging
  • Temp Location: gs://my-bucket/temp

The pipeline passed all validation checks. You can now submit it for actual execution by removing the dry-run flag.

Validation Results:

  • YAML syntax: ✅ Valid
  • Pipeline structure: ✅ Valid
  • Resource requirements: ✅ Acceptable
  • Permissions: ✅ Sufficient
  • GCS locations: ✅ Accessible

### YAML RAG Agent

#### Search for Similar Examples

User: "Find YAML examples similar to reading from BigQuery and writing to PubSub"

Agent: I'll search for YAML examples that match your requirements for BigQuery to PubSub pipelines.

[Agent uses search_similar_examples tool]

🔍 Found 3 Similar Examples:

Example 1: BigQuery to PubSub Streaming Pipeline (Similarity: 0.92)

pipeline:
  transforms:
    - name: ReadFromBigQuery
      type: ReadFromBigQuery
      config:
        query: "SELECT * FROM `project.dataset.table` WHERE timestamp > CURRENT_TIMESTAMP() - INTERVAL 1 HOUR"
    - name: FormatForPubSub
      type: Map
      input: ReadFromBigQuery
      config:
        language: python
        fn: |
          import json
          def format_message(element):
            return json.dumps(element._asdict())
    - name: WriteToPubSub
      type: WriteToPubSub
      input: FormatForPubSub
      config:
        topic: "projects/your-project/topics/processed-data"

Example 2: Batch BigQuery to PubSub with Windowing (Similarity: 0.87)

pipeline:
  transforms:
    - name: ReadBigQueryTable
      type: ReadFromBigQuery
      config:
        table: "project:dataset.source_table"
    - name: WindowData
      type: WindowInto
      input: ReadBigQueryTable
      config:
        windowing:
          type: fixed
          size: 300  # 5 minutes
    - name: PublishToPubSub
      type: WriteToPubSub
      input: WindowData
      config:
        topic: "projects/your-project/topics/windowed-data"
        id_label: "message_id"

Example 3: BigQuery Query to Multiple PubSub Topics (Similarity: 0.84)

pipeline:
  transforms:
    - name: QueryBigQuery
      type: ReadFromBigQuery
      config:
        query: |
          SELECT
            customer_id,
            order_total,
            region
          FROM `project.sales.orders`
          WHERE order_date = CURRENT_DATE()
    - name: RouteByRegion
      type: Partition
      input: QueryBigQuery
      config:
        by: "region"
        outputs:
          - name: us_orders
            condition: "region == 'US'"
          - name: eu_orders
            condition: "region == 'EU'"
    - name: PublishUSOrders
      type: WriteToPubSub
      input: RouteByRegion.us_orders
      config:
        topic: "projects/your-project/topics/us-orders"
    - name: PublishEUOrders
      type: WriteToPubSub
      input: RouteByRegion.eu_orders
      config:
        topic: "projects/your-project/topics/eu-orders"

💡 Key Patterns Identified:

  • Use ReadFromBigQuery with either table or query configuration
  • Consider data formatting with Map transforms before publishing
  • Use WriteToPubSub with proper topic configuration
  • For streaming scenarios, consider windowing strategies
  • For complex routing, use Partition transforms

📝 Recommendations:

  1. Choose between table-based or query-based reading based on your data access pattern
  2. Add proper error handling and dead letter queues for production use
  3. Consider message ordering requirements when configuring PubSub
  4. Test with small datasets before scaling to production volumes

#### Index Examples from Repository

User: "Index YAML examples from the Apache Beam repository"

Agent: I'll index all YAML examples from the Apache Beam repository into the local Chroma database.

[Agent uses index_yaml_examples tool]

📥 Indexing Complete!

Repository Processing:

Database Statistics:

  • Total examples: 52
  • Unique transforms: 28
  • IO connectors: 12
  • Transform types: 16

Sample Categories Indexed:

  • IO Examples: BigQuery, PubSub, Kafka, Text files, Parquet, JSON
  • Transform Examples: Map, Filter, GroupBy, Combine, SQL, ML transforms
  • Advanced Patterns: Windowing, Triggers, Side inputs, Error handling

The database is now ready for similarity searches! You can search for examples using natural language queries.


#### Get Database Statistics

User: "Show me statistics about the indexed examples"

Agent: I'll provide detailed statistics about the YAML examples database.

[Agent uses get_database_stats tool]

📊 Database Statistics:

Overall Metrics:

  • Total indexed examples: 52
  • Database size: 2.3 MB
  • Last updated: 2024-01-15 14:30:00 UTC

Content Breakdown:

  • IO Connectors: 18 examples
    • BigQuery: 6 examples
    • PubSub: 4 examples
    • Kafka: 3 examples
    • File-based: 5 examples
  • Transforms: 24 examples
    • Map/Filter: 8 examples
    • Aggregations: 6 examples
    • SQL: 4 examples
    • ML: 3 examples
    • Windowing: 3 examples
  • Complete Pipelines: 10 examples

Transform Types Found:

  • ReadFromBigQuery, WriteToBigQuery
  • ReadFromPubSub, WriteToPubSub
  • ReadFromKafka, WriteToKafka
  • Map, Filter, FlatMap
  • GroupBy, Combine
  • SqlTransform
  • MLTransform
  • WindowInto, Trigger

Complexity Distribution:

  • Simple (1-3 transforms): 28 examples
  • Medium (4-7 transforms): 18 examples
  • Complex (8+ transforms): 6 examples

The database contains a comprehensive collection of examples covering most common Beam YAML use cases.


## Troubleshooting

### Common Issues

1. **Authentication Errors**:
   ```bash
   gcloud auth application-default login
   gcloud config set project YOUR_PROJECT_ID
  1. Permission Issues: Ensure your account has the following IAM roles:

    • roles/dataflow.viewer (for job monitoring)
    • roles/dataflow.developer (for pipeline submission)
    • roles/logging.viewer
    • roles/compute.viewer
    • roles/storage.objectAdmin (for GCS staging/temp locations)
  2. MCP Server Connection Issues:

    • Check that Python dependencies are installed
    • Verify the MCP server script path in agent.py
    • Check logs for detailed error messages
  3. Job Not Found:

    • Verify the job ID is correct
    • Ensure you're checking the right project and region
    • Check if the job exists using gcloud dataflow jobs list
  4. Pipeline Submission Issues:

    • gcloud CLI not found: Install Google Cloud SDK and ensure it's in your PATH
    • Authentication required: Run gcloud auth login to authenticate
    • Invalid GCS locations: Ensure staging and temp locations start with gs:// and buckets exist
    • Job name conflicts: Use unique job names or include timestamps
    • Dataflow API not enabled: Enable the Dataflow API in your Google Cloud project
    • Invalid job name format: Job names must start with lowercase letter, contain only lowercase letters, numbers, and hyphens
    • Insufficient permissions: Ensure service account has Dataflow Developer and Storage Object Admin roles
    • Network/VPC issues: Verify network and subnetwork configurations if specified
    • Resource quotas: Check if you have sufficient Dataflow job quotas in the target region

Debug Mode

Enable debug logging by setting in your .env file:

LOG_LEVEL=DEBUG

Development

Setup Development Environment

To set up the development environment with code formatting and linting:

# Run the setup script
./setup-dev.sh

# Or manually:
pip install -r requirements.txt
pre-commit install
pre-commit run --all-files

Code Formatting

This project uses pre-commit hooks to ensure consistent code formatting:

  • Black: Python code formatter
  • isort: Import sorting
  • flake8: Python linting
  • mypy: Static type checking

Manual formatting commands:

black .                    # Format Python code
isort .                    # Sort imports
flake8 .                   # Lint Python code
mypy .                     # Type checking
pre-commit run --all-files # Run all hooks

Project Structure

df-agent/
├── .env                                   # Environment variables and configuration
├── requirements.txt                       # Python package dependencies
├── README.md                              # Project documentation and setup guide
├── mcp_servers/
│   ├── dataflow_jobs.py                  # MCP server wrapping Google Cloud Dataflow CLI
│   ├── beam_yaml.py                      # MCP server for Beam YAML pipeline operations
│   └── yaml_rag.py                       # MCP server for YAML RAG with Chroma database
├── agents/
│   ├── dataflow_coordinator/
│   │   └── agent.py                       # ADK coordinator agent managing pipeline lifecycle
│   ├── dataflow_job_management/
│   │   └── agent.py                       # ADK agent with intelligent job monitoring
│   ├── beam_yaml_pipeline/
│   │   └── agent.py                       # ADK agent for Beam YAML pipeline generation
│   ├── beam_yaml_guide/
│   │   └── agent.py                       # ADK agent for step-by-step pipeline guidance
│   └── yaml_rag/
│       └── agent.py                       # ADK agent for intelligent YAML example search
└── tests/
    ├── test_dataflow_coordinator_agent.py # Test suite for coordinator agent functionality
    ├── test_job_agent.py                  # Test suite for Dataflow agent functionality
    ├── test_beam_yaml_agent.py            # Test suite for Beam YAML agent functionality
    ├── test_beam_yaml_guide_agent.py      # Test suite for Beam YAML guide agent functionality
    └── test_yaml_rag_agent.py             # Test suite for YAML RAG agent functionality

Testing

Run tests for all agents:

pytest tests/

Run tests for specific agents:

# Test Dataflow Coordinator Agent
pytest tests/test_dataflow_coordinator_agent.py -v

# Test Dataflow Job Management Agent
pytest tests/test_job_agent.py -v

# Test Beam YAML Pipeline Agent
pytest tests/test_beam_yaml_agent.py -v

# Test Beam YAML Guide Agent
pytest tests/test_beam_yaml_guide_agent.py -v

# Test YAML RAG Agent
pytest tests/test_yaml_rag_agent.py -v

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

References

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors