In [None]:
from dataclasses import dataclass
from typing import List, TypeVar, Callable, Any, Generic, Iterator
import asyncio
from uuid import uuid4
import random

# Generic types for input, intermediate, and output values
T = TypeVar('T')  # Input type
U = TypeVar('U')  # Mapped type
R = TypeVar('R')  # Reduced type

@dataclass
class DocumentChunk:
    id: str
    sequence: int
    content: str
    word_count: int
    timestamp: float

class MapReduceProcessor(Generic[T, U, R]):
    def __init__(
        self,
        map_fn: Callable[[T], U],
        reduce_fn: Callable[[U, U], R],
        num_workers: int
    ):
        self.map_fn = map_fn
        self.reduce_fn = reduce_fn
        self.num_workers = num_workers

    async def process_item(self, item: T) -> U:
        """Process a single item using the map function"""
        await asyncio.sleep(random.uniform(0.1, 0.5))  # Simulate processing time
        return self.map_fn(item)

    def chunk_data(self, items: List[T]) -> Iterator[List[T]]:
        """Split items into chunks based on number of workers"""
        effective_workers = min(self.num_workers, len(items))
        chunk_size = len(items) // effective_workers
        remainder = len(items) % effective_workers
        
        start_idx = 0
        for i in range(effective_workers):
            worker_chunk_size = chunk_size + (1 if i < remainder else 0)
            yield items[start_idx:start_idx + worker_chunk_size]
            start_idx += worker_chunk_size

    async def process(self, items: List[T]) -> R:
        """Execute map-reduce operation on items"""
        if not items:
            raise ValueError("Cannot process empty item list")

        # Create tasks for all items
        tasks = [self.process_item(item) for item in items]
        
        # Execute all tasks concurrently
        mapped_results = await asyncio.gather(*tasks)
        
        # Reduce results
        final_result = mapped_results[0]
        for result in mapped_results[1:]:
            final_result = self.reduce_fn(final_result, result)
        
        return final_result

# Example usage
async def main():
    # Example 1: Word count for document chunks
    chunks = [
        DocumentChunk(
            id=str(uuid4()),
            sequence=i,
            content=f"Sample content {i}",
            word_count=random.randint(50, 200),
            timestamp=random.random()
        )
        for i in range(10)
    ]

    word_counter = MapReduceProcessor[DocumentChunk, int, int](
        map_fn=lambda chunk: chunk.word_count,
        reduce_fn=lambda a, b: a + b,
        num_workers=4
    )
    total_words = await word_counter.process(chunks)
    print(f"Total word count: {total_words}")

    # Example 2: Processing numbers
    numbers = list(range(1, 101))
    sum_squares = MapReduceProcessor[int, int, int](
        map_fn=lambda x: x * x,
        reduce_fn=lambda a, b: a + b,
        num_workers=4
    )
    result = await sum_squares.process(numbers)
    print(f"Sum of squares: {result}")

    # Example 3: String processing
    strings = ["hello", "world", "python", "asyncio"]
    string_processor = MapReduceProcessor[str, int, int](
        map_fn=len,
        reduce_fn=max,
        num_workers=2
    )
    max_length = await string_processor.process(strings)
    print(f"Maximum string length: {max_length}")

if __name__ == "__main__":
    asyncio.run(main())