### Using pebble for Time and memory limit
* Pebble runs a function in a separate process (cpu bound) and kills it if it takes too long or uses too much memory.

In [2]:
# pip install pebble

* ProcessPool is a pool of processes that can be used to execute tasks concurrently.

In [3]:
from pebble import ProcessPool
import time
import os

# A sample function to demonstrate
def heavy_function(x):
    print(f"Process ID: {os.getpid()} - Starting calculation for {x}")
    time.sleep(5)  # Simulate a heavy computation
    return x ** 2

# Define a timeout and memory control
def run_with_limits():
    with ProcessPool(max_workers=2) as pool:  # Limit to 2 concurrent processes
        future = pool.schedule(heavy_function, args=(10,), timeout=3)  # Timeout in 3 seconds
        
        try:
            result = future.result()  # This will block until the result is ready or timeout occurs
            print(f"Result: {result}")
        except TimeoutError:
            print("Function execution timed out ‚è∞")
        except Exception as e:
            print(f"An error occurred: {e}")

# Execute the example
run_with_limits()

Process ID: 3504 - Starting calculation for 10
Function execution timed out ‚è∞


In [4]:
import resource

def memory_limited_function(x):
    # Limit memory to 100 MB
    soft, hard = 100 * 1024 * 1024, 100 * 1024 * 1024  # Bytes
    resource.setrlimit(resource.RLIMIT_AS, (soft, hard))
    
    # Heavy computation (this will fail if memory limit is exceeded)
    result = [i * x for i in range(10**7)]
    return sum(result)

def run_with_memory_limit():
    with ProcessPool(max_workers=1) as pool:
        future = pool.schedule(memory_limited_function, args=(10,), timeout=5)  # 5s timeout
        
        try:
            result = future.result()
            print(f"Result: {result}")
        except TimeoutError:
            print("Function execution timed out ‚è∞")
        except MemoryError:
            print("Memory limit exceeded üí•")
        except Exception as e:
            print(f"An error occurred: {e}")

# Execute
run_with_memory_limit()

Memory limit exceeded üí•


* Thread pool is the pool of threads that can be executed concurrently

In Pebble, the timeout parameter is available in the ProcessPool's schedule method but not in the ThreadPool's schedule method. This is because threads share the same memory space and are subject to Python's Global Interpreter Lock (GIL), making it challenging to enforce timeouts at the thread level.

In [8]:
import time
import threading
import concurrent.futures

def cooperative_worker(stop_event, work_time=10):
    elapsed = 0
    while elapsed < work_time:
        if stop_event.is_set():
            print("Worker stopping cooperatively.")
            return
        time.sleep(1)
        elapsed += 1
    print("Worker done without stop event.")

def run_with_threadpool():
    stop_event = threading.Event()
    
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit the task
        future = executor.submit(cooperative_worker, stop_event, 10)
        
        # Let work run for 3s, then signal stop
        time.sleep(2)
        stop_event.set()
        
        # Wait for the task to complete
        try:
            result = future.result(timeout=2)  # Add timeout for safety
        except concurrent.futures.TimeoutError:
            print("Task timed out")

if __name__ == "__main__":
    run_with_threadpool()

Worker stopping cooperatively.


In [3]:
### Approach 2 - old way
import time
import threading
import concurrent.futures

# Approach 1: Cooperative stop event
def cooperative_worker(stop_event, work_time=10):
    elapsed = 0
    while elapsed < work_time:
        if stop_event.is_set():
            print("Worker stopping cooperatively.")
            return
        time.sleep(1)
        elapsed += 1
    print("Worker done without stop event.")

def run_cooperative():
    stop_event = threading.Event()
    t = threading.Thread(target=cooperative_worker, args=(stop_event, 10))
    t.start()
    # Let work run for 3s, then signal stop
    time.sleep(11)
    stop_event.set()
    t.join()

if __name__ == "__main__":
    run_cooperative()

Worker done without stop event.


* Problem is we cannot limit memory used by each thread. We can only limit memory used by each process.