Problem Statement:

Design a generic Python decorator @route_call that can be applied to instance methods, class methods, static methods, or standalone functions. This decorator should dynamically route the method call to one of two executors:

- A LibraryExecutor (for local execution)
- A ServiceExecutor (for remote/service execution)
- The routing decision is based on attributes of the calling context (e.g., self.use_service).

📌 Functional Requirements:
- Dynamic Routing: Determine at runtime whether the function should be executed locally or via a simulated remote call.
- Context Inspection: Automatically extract context (self, cls, etc.) for the decorator, regardless of whether it's applied to instance methods, class methods, static methods, or standalone functions.
- LibraryExecutor Logic: 
  - Acts as a local call interceptor.
  - If context.remote_user is available, modifies the function’s arguments (e.g., injects an audited_by value).
  - Can restrict execution based on user (e.g., prevent "guest" from calling restricted functions).
- ServiceExecutor Logic:
  - Simulates a remote call to the function.
  - If one of the arguments is a large list, splits it into batches.
  - Uses a ThreadPoolExecutor to process batches concurrently.
  - Merges and returns results from all batches.
- Batching Policy:
  - Configurable batch size.
  - Automatically detects list arguments that are too large and need splitting.

🧪 Sample Use Cases:
- MyService.process_items(large_list) — routed to ServiceExecutor and batched.
- MyService.update_data(record_id) — injected with audited_by = remote_user.
- MyService.delete_data() — blocked if user is "guest".
- @route_call works seamlessly on class/static methods and functions.

### Bonus Points:
- Clean, extensible design using strategy patterns (Executor interface).
- Uses Python introspection to avoid requiring explicit metadata.
- Modular and testable.

In [1]:
from functools import wraps
import inspect
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import concurrent.futures
from abc import ABC, abstractmethod

# Abstract executor interface
class Executor(ABC):
    @abstractmethod
    def execute(self, func: Callable, args: Tuple, kwargs: Dict[str, Any], context: Any = None) -> Any:
        pass

# Local execution implementation
class LibraryExecutor(Executor):
    def execute(self, func: Callable, args: Tuple, kwargs: Dict[str, Any], context: Any = None) -> Any:
        # Add auditing if remote_user is available
        if context and hasattr(context, 'remote_user'):
            kwargs['audited_by'] = context.remote_user
            
            # Restrict guest users from certain operations
            if context.remote_user == "guest" and func.__name__.startswith(("delete", "remove", "drop")):
                raise PermissionError(f"User '{context.remote_user}' is not allowed to perform {func.__name__}")
        
        return func(*args, **kwargs)

# Service/Remote execution implementation
class ServiceExecutor(Executor):
    def __init__(self, batch_size: int = 100):
        self.batch_size = batch_size
    
    def execute(self, func: Callable, args: Tuple, kwargs: Dict[str, Any], context: Any = None) -> Any:
        # Identify large list arguments that need batching
        batch_args = {}
        for i, arg in enumerate(args):
            if isinstance(arg, list) and len(arg) > self.batch_size:
                batch_args[i] = arg
        
        for key, value in kwargs.items():
            if isinstance(value, list) and len(value) > self.batch_size:
                batch_args[key] = value
        
        # If no large lists, just execute normally
        if not batch_args:
            print(f"Remote execution of {func.__name__} (no batching needed)")
            return func(*args, **kwargs)
        
        # Handle batched execution
        print(f"Remote execution of {func.__name__} with batching")
        return self._execute_batched(func, args, kwargs, batch_args)
    
    def _execute_batched(self, func: Callable, args: Tuple, kwargs: Dict[str, Any], batch_args: Dict) -> Any:
        # Prepare batches
        batch_jobs = []
        
        # Handle positional arguments
        pos_batch_indices = [i for i in batch_args.keys() if isinstance(i, int)]
        if pos_batch_indices:
            # Get the largest list for determining batch count
            largest_list_index = max(pos_batch_indices, key=lambda i: len(batch_args[i]))
            largest_list = batch_args[largest_list_index]
            batch_count = (len(largest_list) + self.batch_size - 1) // self.batch_size
            
            for batch_idx in range(batch_count):
                start_idx = batch_idx * self.batch_size
                end_idx = min(start_idx + self.batch_size, len(largest_list))
                
                # Create new args tuple for this batch
                batch_args_tuple = list(args)
                for i in pos_batch_indices:
                    if start_idx < len(batch_args[i]):
                        end_for_this_list = min(end_idx, len(batch_args[i]))
                        batch_args_tuple[i] = batch_args[i][start_idx:end_for_this_list]
                    else:
                        batch_args_tuple[i] = []
                
                # Create a copy of kwargs
                batch_kwargs = kwargs.copy()
                
                # Add job
                batch_jobs.append((func, tuple(batch_args_tuple), batch_kwargs))
        
        # Handle keyword arguments (if no positional batching or additional keyword batching)
        kw_batch_keys = [k for k in batch_args.keys() if isinstance(k, str)]
        if kw_batch_keys and not pos_batch_indices:
            # Get the largest list for determining batch count
            largest_list_key = max(kw_batch_keys, key=lambda k: len(batch_args[k]))
            largest_list = batch_args[largest_list_key]
            batch_count = (len(largest_list) + self.batch_size - 1) // self.batch_size
            
            for batch_idx in range(batch_count):
                start_idx = batch_idx * self.batch_size
                end_idx = min(start_idx + self.batch_size, len(largest_list))
                
                # Create a copy of kwargs for this batch
                batch_kwargs = kwargs.copy()
                for key in kw_batch_keys:
                    if start_idx < len(batch_args[key]):
                        end_for_this_list = min(end_idx, len(batch_args[key]))
                        batch_kwargs[key] = batch_args[key][start_idx:end_for_this_list]
                    else:
                        batch_kwargs[key] = []
                
                # Add job
                batch_jobs.append((func, args, batch_kwargs))
        
        # Execute all batches in parallel
        results = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(job[0], *job[1], **job[2]) for job in batch_jobs]
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result is not None:
                    if isinstance(result, list):
                        results.extend(result)
                    else:
                        results.append(result)
        
        return results

# Decorator factory
def route_call(_func=None, *, batch_size=100):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Determine if this is an instance method, class method, or static method
            context = None
            is_instance_method = False
            is_class_method = False
            
            if args:
                # Check if this is an instance method call
                if hasattr(args[0].__class__, func.__name__):
                    method = getattr(args[0].__class__, func.__name__)
                    # Check if the method on the class is the same function or a descriptor
                    if (
                        method is func 
                        or (hasattr(method, '__func__') and method.__func__ is func)
                        or (inspect.ismethod(method) and method.__func__ is func)
                    ):
                        context = args[0]
                        is_instance_method = True
                
                # Check if this is a class method call
                if not is_instance_method and inspect.isclass(args[0]):
                    method = getattr(args[0], func.__name__, None)
                    if method and hasattr(method, '__func__') and method.__func__ is func:
                        context = args[0]  # class itself becomes the context
                        is_class_method = True
            
            # Determine which executor to use
            use_service = False
            if is_instance_method and hasattr(context, 'use_service'):
                use_service = context.use_service
            elif is_class_method and hasattr(context, 'use_service'):
                use_service = context.use_service
            
            # Select executor based on context
            executor = ServiceExecutor(batch_size=batch_size) if use_service else LibraryExecutor()
            
            # Execute with appropriate context and arguments
            if is_instance_method:
                # Skip the 'self' argument as it's the context
                return executor.execute(func, args[1:], kwargs, context)
            elif is_class_method:
                # Skip the 'cls' argument as it's the context
                return executor.execute(func, args[1:], kwargs, context)
            else:
                # For static methods and standalone functions, no special context handling
                return executor.execute(func, args, kwargs)
        
        return wrapper
    
    # Support both @route_call and @route_call(batch_size=...)
    if _func is None:
        return decorator
    else:
        return decorator(_func)

# Example usage classes and methods
class MyService:
    def __init__(self, use_service=False, remote_user=None):
        self.use_service = use_service
        self.remote_user = remote_user
    
    @route_call
    def process_items(self, items):
        print(f"Processing {len(items)} items")
        return [item * 2 for item in items]
    
    @route_call
    def update_data(self, record_id, data=None, **kwargs):
        print(f"Updating record {record_id} with {data}")
        print(f"Additional kwargs: {kwargs}")
        return record_id
    
    @route_call
    def delete_data(self, record_id):
        print(f"Deleting record {record_id}")
        return f"Deleted {record_id}"
    
    @classmethod
    @route_call
    def class_method_example(cls, value):
        return f"Class method processed {value}"
    
    @staticmethod
    @route_call
    def static_method_example(value):
        return f"Static method processed {value}"

@route_call
def standalone_function(value):
    return f"Standalone function processed {value}"

# Set a class attribute for class method routing
MyService.use_service = False

# Test the implementation
if __name__ == "__main__1":
    # Create instances with different configurations
    local_service = MyService(use_service=False, remote_user="admin")
    remote_service = MyService(use_service=True, remote_user="user1")
    guest_service = MyService(use_service=False, remote_user="guest")
    
    # Test instance methods
    print("\n-- Local service, instance method --")
    result1 = local_service.process_items([1, 2, 3])
    print(f"Result: {result1}")
    
    print("\n-- Remote service with batching, instance method --")
    large_list = list(range(250))
    result2 = remote_service.process_items(large_list)
    print(f"Result length: {len(result2)}")
    
    print("\n-- Local service with auditing --")
    local_service.update_data(42, data={"name": "Test"})
    
    print("\n-- Guest user attempting delete --")
    try:
        guest_service.delete_data(99)
    except PermissionError as e:
        print(f"Expected error: {e}")
    
    # Test class and static methods
    print("\n-- Class method --")
    result3 = MyService.class_method_example("test")
    print(f"Result: {result3}")
    
    print("\n-- Static method --")
    result4 = MyService.static_method_example("test")
    print(f"Result: {result4}")
    
    # Test standalone function
    print("\n-- Standalone function --")
    result5 = standalone_function("test")
    print(f"Result: {result5}")
    
    # Change class attribute and test class method routing
    print("\n-- Class method with service enabled --")
    MyService.use_service = True
    result6 = MyService.class_method_example("test")
    print(f"Result: {result6}")

### Now change scope 
- decorate instance-method alone
- use access policy : routing policy

In [2]:
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import concurrent.futures
from abc import ABC, abstractmethod

# Access Policy Manager - determines routing based on context and function
class AccessPolicyManager:
    @staticmethod
    def should_use_service(instance: Any, func_name: str, args: Tuple, kwargs: Dict[str, Any]) -> bool:
        """
        Determines if a method call should be routed to service based on access policy.
        
        This method evaluates various conditions:
        1. User roles and permissions
        2. Resource type being accessed
        3. Operation type (read, write, delete)
        4. Data size and complexity
        5. System load/capacity
        """
        # Example implementation - can be expanded with more complex logic
        
        # Check if user has specific roles that require service execution
        if hasattr(instance, 'remote_user'):
            # Admin users might have direct library access for certain operations
            if instance.remote_user == "admin" and not func_name.startswith("batch"):
                return False
                
            # Service users always go through service
            if instance.remote_user.startswith("service_"):
                return True
                
            # Guest users use service for non-sensitive operations
            if instance.remote_user == "guest" and not func_name.startswith(("delete", "update")):
                return True
        
        # Check data size - large data operations go to service
        for arg in args:
            if isinstance(arg, list) and len(arg) > 100:
                return True
        
        for value in kwargs.values():
            if isinstance(value, list) and len(value) > 100:
                return True
        
        # Default routing based on operation type
        if func_name.startswith(("get", "list", "search")):
            return False  # Read operations default to library
        elif func_name.startswith(("create", "update", "delete", "process")):
            return True   # Write/process operations default to service
            
        # Default to library execution
        return False

# Abstract executor interface
class Executor(ABC):
    @abstractmethod
    def execute(self, func: Callable, instance: Any, args: Tuple, kwargs: Dict[str, Any]) -> Any:
        pass

# Local execution implementation
class LibraryExecutor(Executor):
    def execute(self, func: Callable, instance: Any, args: Tuple, kwargs: Dict[str, Any]) -> Any:
        print(f"Library execution: {func.__name__}")
        
        # Add auditing if remote_user is available
        if hasattr(instance, 'remote_user'):
            kwargs['audited_by'] = instance.remote_user
            
            # Restrict guest users from certain operations
            if instance.remote_user == "guest" and func.__name__.startswith(("delete", "remove", "drop")):
                raise PermissionError(f"User '{instance.remote_user}' is not allowed to perform {func.__name__}")
        
        return func(instance, *args, **kwargs)

# Service/Remote execution implementation
class ServiceExecutor(Executor):
    def __init__(self, batch_size: int = 100):
        self.batch_size = batch_size
    
    def execute(self, func: Callable, instance: Any, args: Tuple, kwargs: Dict[str, Any]) -> Any:
        print(f"Service execution: {func.__name__}")
        
        # Identify large list arguments that need batching
        batch_args = {}
        for i, arg in enumerate(args):
            if isinstance(arg, list) and len(arg) > self.batch_size:
                batch_args[i] = arg
        
        for key, value in kwargs.items():
            if isinstance(value, list) and len(value) > self.batch_size:
                batch_args[key] = value
        
        # If no large lists, just execute normally
        if not batch_args:
            print(f"No batching needed")
            return func(instance, *args, **kwargs)
        
        # Handle batched execution
        print(f"Using batching with size {self.batch_size}")
        return self._execute_batched(func, instance, args, kwargs, batch_args)
    
    def _execute_batched(self, func: Callable, instance: Any, args: Tuple, kwargs: Dict[str, Any], batch_args: Dict) -> Any:
        # Prepare batches
        batch_jobs = []
        
        # Handle positional arguments
        pos_batch_indices = [i for i in batch_args.keys() if isinstance(i, int)]
        if pos_batch_indices:
            # Get the largest list for determining batch count
            largest_list_index = max(pos_batch_indices, key=lambda i: len(batch_args[i]))
            largest_list = batch_args[largest_list_index]
            batch_count = (len(largest_list) + self.batch_size - 1) // self.batch_size
            
            for batch_idx in range(batch_count):
                start_idx = batch_idx * self.batch_size
                end_idx = min(start_idx + self.batch_size, len(largest_list))
                
                # Create new args tuple for this batch
                batch_args_tuple = list(args)
                for i in pos_batch_indices:
                    if start_idx < len(batch_args[i]):
                        end_for_this_list = min(end_idx, len(batch_args[i]))
                        batch_args_tuple[i] = batch_args[i][start_idx:end_for_this_list]
                    else:
                        batch_args_tuple[i] = []
                
                # Create a copy of kwargs
                batch_kwargs = kwargs.copy()
                
                # Add job
                batch_jobs.append((func, instance, tuple(batch_args_tuple), batch_kwargs))
        
        # Handle keyword arguments (if no positional batching or additional keyword batching)
        kw_batch_keys = [k for k in batch_args.keys() if isinstance(k, str)]
        if kw_batch_keys and not pos_batch_indices:
            # Get the largest list for determining batch count
            largest_list_key = max(kw_batch_keys, key=lambda k: len(batch_args[k]))
            largest_list = batch_args[largest_list_key]
            batch_count = (len(largest_list) + self.batch_size - 1) // self.batch_size
            
            for batch_idx in range(batch_count):
                start_idx = batch_idx * self.batch_size
                end_idx = min(start_idx + self.batch_size, len(largest_list))
                
                # Create a copy of kwargs for this batch
                batch_kwargs = kwargs.copy()
                for key in kw_batch_keys:
                    if start_idx < len(batch_args[key]):
                        end_for_this_list = min(end_idx, len(batch_args[key]))
                        batch_kwargs[key] = batch_args[key][start_idx:end_for_this_list]
                    else:
                        batch_kwargs[key] = []
                
                # Add job
                batch_jobs.append((func, instance, args, batch_kwargs))
        
        # Execute all batches in parallel
        results = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Notice we're passing instance as the first arg to each function
            futures = [executor.submit(job[0], job[1], *job[2], **job[3]) for job in batch_jobs]
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result is not None:
                    if isinstance(result, list):
                        results.extend(result)
                    else:
                        results.append(result)
        
        return results

# Decorator factory with access policy-based routing
def route_call(_func=None, *, batch_size=100):
    def decorator(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            # Use the policy manager to determine routing
            use_service = AccessPolicyManager.should_use_service(
                instance=self, 
                func_name=func.__name__,
                args=args,
                kwargs=kwargs
            )
            
            # Select executor based on policy decision
            executor = ServiceExecutor(batch_size=batch_size) if use_service else LibraryExecutor()
            
            # Execute with instance as the context
            return executor.execute(func, self, args, kwargs)
        
        return wrapper
    
    # Support both @route_call and @route_call(batch_size=...)
    if _func is None:
        return decorator
    else:
        return decorator(_func)

# Example usage class with instance methods
class MyService:
    def __init__(self, remote_user=None):
        self.remote_user = remote_user
    
    @route_call
    def process_items(self, items):
        print(f"Processing {len(items)} items")
        return [item * 2 for item in items]
    
    @route_call
    def get_items(self, filter_value=None):
        print(f"Getting items with filter: {filter_value}")
        return [1, 2, 3] if filter_value else [1, 2, 3, 4, 5]
    
    @route_call
    def update_data(self, record_id, data=None, **kwargs):
        print(f"Updating record {record_id} with {data}")
        print(f"Additional kwargs: {kwargs}")
        return record_id
    
    @route_call
    def delete_data(self, record_id):
        print(f"Deleting record {record_id}")
        return f"Deleted {record_id}"
    
    @route_call
    def batch_process(self, items):
        print(f"Batch processing {len(items)} items")
        return [f"Processed {item}" for item in items]

# Test the implementation
if __name__ == "__main__1":
    # Create instances with different user types
    admin_service = MyService(remote_user="admin")
    service_user = MyService(remote_user="service_account1")
    guest_user = MyService(remote_user="guest")
    
    # Test with admin user
    print("\n-- Admin user tests --")
    result1 = admin_service.process_items([1, 2, 3])  # Should use library (small list)
    print(f"Result: {result1}")
    
    large_list = list(range(250))
    result2 = admin_service.process_items(large_list)  # Should use service (large list)
    print(f"Result length: {len(result2)}")
    
    result3 = admin_service.get_items()  # Should use library (read operation)
    print(f"Result: {result3}")
    
    result4 = admin_service.batch_process([1, 2, 3])  # Should use service (batch operation)
    print(f"Result: {result4}")
    
    # Test with service user
    print("\n-- Service user tests --")
    result5 = service_user.get_items("test")  # Should use service regardless of operation
    print(f"Result: {result5}")
    
    # Test with guest user
    print("\n-- Guest user tests --")
    result6 = guest_user.get_items()  # Should use service
    print(f"Result: {result6}")
    
    print("\n-- Guest attempting delete --")
    try:
        guest_user.delete_data(99)  # Should raise permission error
    except PermissionError as e:
        print(f"Expected error: {e}")