# 5. Parallelism with Semantic Kernel

The parallelism pattern involves processing multiple similar tasks concurrently to improve efficiency. In this example, we'll generate random cities and then process each one in parallel to get interesting facts about them.

In [None]:
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai.services.azure_chat_completion import AzureChatCompletion
from semantic_kernel.contents import ChatHistory
from semantic_kernel.agents import ChatCompletionAgent
import asyncio
import re
from typing import List

Set up the kernel and service

In [None]:
# Create a kernel instance
kernel = Kernel()

# Add Azure OpenAI chat completion service
service_id = "azure_openai"
kernel.add_service(
    AzureChatCompletion(
        service_id=service_id,
        deployment_name="gpt-4.1-mini",
    )
)

Create specialized agents

In [None]:
# City Generator Agent
city_generator = ChatCompletionAgent(
    service_id=service_id,
    kernel=kernel,
    name="CityGenerator",
    instructions="Generate random city names. When asked, provide exactly 3 city names from the specified country, separated by commas. Only return the city names, no other text.",
    execution_settings={
        service_id: kernel.get_prompt_execution_settings_from_service_id(service_id)
    }
)

# City Facts Agent
city_facts_agent = ChatCompletionAgent(
    service_id=service_id,
    kernel=kernel,
    name="CityFactsAgent",
    instructions="""Provide exactly 3 interesting facts about the given city.
    
Format your response as:
# CityName
1. First interesting fact
2. Second interesting fact  
3. Third interesting fact

Only return the facts, no other text. Use a numbered list.""",
    execution_settings={
        service_id: kernel.get_prompt_execution_settings_from_service_id(service_id)
    }
)

Create helper functions for parallel processing

In [None]:
async def generate_random_cities(country: str = "India", count: int = 3) -> List[str]:
    """
    Generate random city names from the specified country
    """
    print(f"→ Generating {count} random cities from {country}...")
    
    chat_history = ChatHistory()
    chat_history.add_user_message(f"Return {count} random city names in {country}, comma separated.")
    
    async for response in city_generator.invoke(chat_history):
        cities_text = str(response.content)
        # Parse the comma-separated cities
        cities = [city.strip() for city in cities_text.split(',')]
        print(f"Generated cities: {cities}")
        return cities
    
    return []

async def get_city_facts(city: str) -> str:
    """
    Get interesting facts about a specific city
    """
    print(f"→ Getting facts for {city}...")
    
    chat_history = ChatHistory()
    chat_history.add_user_message(f"Tell me 3 interesting facts about {city}. Only return the facts, no other text. Use a numbered list. Start with the city name. Ex. # CityName")
    
    async for response in city_facts_agent.invoke(chat_history):
        facts = str(response.content)
        print(f"✓ Completed facts for {city}")
        return facts
    
    return f"No facts available for {city}"

async def process_cities_parallel(cities: List[str]) -> List[str]:
    """
    Process multiple cities in parallel to get facts about each
    """
    print(f"\n→ Processing {len(cities)} cities in parallel...")
    
    # Create tasks for parallel execution
    tasks = [get_city_facts(city) for city in cities]
    
    # Execute all tasks concurrently
    results = await asyncio.gather(*tasks)
    
    print(f"✓ Completed processing all {len(cities)} cities\n")
    return results

Create the main parallel workflow

In [None]:
async def parallel_city_facts_workflow(country: str = "India", city_count: int = 3) -> dict:
    """
    Main workflow that demonstrates parallel processing
    """
    print(f"=== Parallel City Facts Workflow ===")
    print(f"Country: {country}, Cities to process: {city_count}\n")
    
    # Step 1: Generate random cities
    cities = await generate_random_cities(country, city_count)
    
    if not cities:
        print("❌ No cities generated")
        return {"cities": [], "facts": []}
    
    # Step 2: Process cities in parallel
    facts = await process_cities_parallel(cities)
    
    # Step 3: Return results
    return {
        "cities": cities,
        "facts": facts
    }

def display_results(results: dict):
    """
    Display the results in a formatted way
    """
    print("=== RESULTS ===")
    print(f"Cities processed: {results['cities']}")
    print(f"Total facts collected: {len(results['facts'])}\n")
    
    print("=== CITY FACTS ===")
    for fact in results['facts']:
        print(fact)
        print("-" * 50)

Test the basic parallel workflow

In [None]:
# Test with Indian cities
results = await parallel_city_facts_workflow("India", 3)
display_results(results)

Test with a different country

In [None]:
# Test with Japanese cities
results_japan = await parallel_city_facts_workflow("Japan", 4)
display_results(results_japan)

## Performance Comparison: Sequential vs Parallel

In [None]:
import time

async def sequential_processing(cities: List[str]) -> List[str]:
    """
    Process cities sequentially for comparison
    """
    print(f"→ Processing {len(cities)} cities sequentially...")
    results = []
    
    for city in cities:
        fact = await get_city_facts(city)
        results.append(fact)
    
    return results

# Performance comparison
test_cities = ["Tokyo", "Osaka", "Kyoto", "Hiroshima"]

# Test parallel processing
print("Testing PARALLEL processing:")
start_time = time.time()
parallel_results = await process_cities_parallel(test_cities)
parallel_time = time.time() - start_time

print(f"\nTesting SEQUENTIAL processing:")
start_time = time.time()
sequential_results = await sequential_processing(test_cities)
sequential_time = time.time() - start_time

print(f"\n=== PERFORMANCE COMPARISON ===")
print(f"Parallel processing time: {parallel_time:.2f} seconds")
print(f"Sequential processing time: {sequential_time:.2f} seconds")
print(f"Speed improvement: {sequential_time/parallel_time:.2f}x faster")

## Advanced: Batch Processing with Concurrency Control

In [None]:
async def batch_process_cities(cities: List[str], batch_size: int = 3) -> List[str]:
    """
    Process cities in batches to control concurrency
    """
    print(f"→ Processing {len(cities)} cities in batches of {batch_size}...")
    all_results = []
    
    for i in range(0, len(cities), batch_size):
        batch = cities[i:i + batch_size]
        print(f"  Processing batch {i//batch_size + 1}: {batch}")
        
        batch_results = await process_cities_parallel(batch)
        all_results.extend(batch_results)
    
    return all_results

# Test batch processing with a larger list
large_city_list = ["Mumbai", "Delhi", "Bangalore", "Chennai", "Kolkata", "Hyderabad", "Pune", "Ahmedabad"]
batch_results = await batch_process_cities(large_city_list, batch_size=3)

print(f"\n=== BATCH PROCESSING RESULTS ===")
print(f"Total cities processed: {len(large_city_list)}")
print(f"Results collected: {len(batch_results)}")

## Key Differences from LangChain Parallelism

1. **Asyncio-Based Concurrency**: Uses Python's native `asyncio.gather()` for parallel execution instead of complex graph structures with `Send` commands.

2. **Simplified Coordination**: No need for complex state management or command objects - just straightforward async functions.

3. **Agent-Based Architecture**: Each processing unit is a dedicated `ChatCompletionAgent` with specific instructions.

4. **Flexible Batching**: Easy to implement batch processing to control concurrency and avoid overwhelming the AI service.

5. **Better Error Handling**: Standard Python exception handling works naturally with async/await patterns.

6. **Performance Monitoring**: Easy to measure and compare performance between parallel and sequential approaches.

7. **Scalable Design**: Can easily adjust concurrency levels and batch sizes based on requirements.

### Benefits:

- **Simplicity**: Uses standard Python async patterns that developers are familiar with
- **Performance**: Achieves significant speed improvements for concurrent tasks
- **Flexibility**: Easy to adjust concurrency levels and processing strategies
- **Reliability**: Better error handling and recovery options
- **Maintainability**: Clear, readable code that's easy to debug and modify

### Use Cases:

- **Bulk Data Processing**: Processing multiple similar items concurrently
- **Content Generation**: Creating content for multiple topics simultaneously
- **Data Enrichment**: Enhancing datasets with AI-generated information
- **Multi-Source Analysis**: Analyzing multiple data sources in parallel
- **Batch Operations**: Processing large lists of items efficiently

This approach provides a clean, efficient way to implement parallelism without the complexity of graph-based workflows, making it easier to understand, maintain, and scale."