In [2]:
import numpy as np
from PIL import Image
import multiprocessing as mp
from functools import partial
import cv2
from typing import Callable, List, Tuple

class ParallelImageProcessor:
    def __init__(self, num_processes: int = None):
        """
        Initialize the parallel image processor.
        
        Args:
            num_processes: Number of processes to use. Defaults to CPU count.
        """
        self.num_processes = num_processes or mp.cpu_count()
    
    def _split_image(self, image: np.ndarray) -> List[Tuple[int, np.ndarray]]:
        """
        Split the image into chunks for parallel processing.
        
        Args:
            image: Input image as numpy array
            
        Returns:
            List of (index, chunk) tuples
        """
        chunks = np.array_split(image, self.num_processes, axis=0)
        return list(enumerate(chunks))
    
    def _merge_chunks(self, chunks: List[Tuple[int, np.ndarray]]) -> np.ndarray:
        """
        Merge processed chunks back into a single image.
        
        Args:
            chunks: List of (index, processed_chunk) tuples
            
        Returns:
            Merged image as numpy array
        """
        # Sort chunks by index
        sorted_chunks = sorted(chunks, key=lambda x: x[0])
        return np.vstack([chunk for _, chunk in sorted_chunks])
    
    def process_image(self, image_path: str, operation: Callable, **kwargs) -> np.ndarray:
        """
        Process an image in parallel using the provided operation.
        
        Args:
            image_path: Path to input image
            operation: Image processing function to apply
            **kwargs: Additional arguments for the operation
            
        Returns:
            Processed image as numpy array
        """
        # Read image
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"Could not read image at {image_path}")
            
        # Split image into chunks
        chunks = self._split_image(image)
        
        # Create partial function with kwargs
        operation_with_args = partial(operation, **kwargs)
        
        # Process chunks in parallel
        with mp.Pool(self.num_processes) as pool:
            processed_chunks = pool.starmap(
                lambda idx, chunk: (idx, operation_with_args(chunk)),
                chunks
            )
            
        # Merge chunks
        return self._merge_chunks(processed_chunks)

# Example operations
def rotate_image(chunk: np.ndarray, angle: float) -> np.ndarray:
    """Rotate image chunk by specified angle."""
    height, width = chunk.shape[:2]
    center = (width // 2, height // 2)
    rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
    return cv2.warpAffine(chunk, rotation_matrix, (width, height))

def adjust_brightness(chunk: np.ndarray, factor: float) -> np.ndarray:
    """Adjust brightness of image chunk."""
    return cv2.convertScaleAbs(chunk, alpha=factor, beta=0)

def sharpen_image(chunk: np.ndarray, kernel_size: int = 3) -> np.ndarray:
    """Sharpen image chunk using unsharp masking."""
    blurred = cv2.GaussianBlur(chunk, (kernel_size, kernel_size), 0)
    return cv2.addWeighted(chunk, 1.5, blurred, -0.5, 0)

In [4]:
# Initialize processor
processor = ParallelImageProcessor()

# Process image with different operations
rotated_image = processor.process_image('input.JPEG', rotate_image, angle=45)
brightened_image = processor.process_image('input.JPEG', adjust_brightness, factor=1.5)
sharpened_image = processor.process_image('input.JPEG', sharpen_image, kernel_size=3)

AttributeError: Can't get local object 'ParallelImageProcessor.process_image.<locals>.<lambda>'