In [1]:
from concurrent.futures import ThreadPoolExecutor
from random import random
from time import sleep

There are three points you may need to consider exception handling when using the ThreadPoolExecutor; they are:

* Exception Handling During Thread Initialization
* Exception Handling During Task Execution
* Exception Handling During Task Completion Callbacks


## Exception Handling During Thread Initialization

We can specify a custom initialization function when configuring `ThreadPoolExecutor`.

This can be set via the `initializer` argument to specify the function name and `initargs` to specify a tuple of arguments to the function.

Each thread started by the thread pool will call the initialization function before starting the thread.

If the initialization function raises an exception, it will break the thread pool.

All current tasks and any future tasks executed by the thread pool will not run and will raise a `BrokenThreadPool` exception.

We can demonstrate this with an example of a contrived initializer function that raises an exception.

In [3]:
# Function for initializing the worker thread
def initializer_worker():
    raise Exception("Something bad happened!")


# A mock task that sleeps for a random amount of time less than one second
def task(identifier):
    sleep(random())
    return identifier


# Create a thread pool
with ThreadPoolExecutor(max_workers=2, initializer=initializer_worker) as executor:
    # Execute tasks
    for result in executor.map(task, range(10)):
        print(result)

Exception in initializer:
Traceback (most recent call last):
  File "/Users/kenwu/opt/anaconda3/envs/python_for_machine_learning/lib/python3.9/concurrent/futures/thread.py", line 72, in _worker
    initializer(*initargs)
  File "/var/folders/z_/pqnd1zt14yb6fdthpr75md480000gn/T/ipykernel_24974/3471916382.py", line 3, in initializer_worker
    raise Exception('Something bad happened!')
Exception: Something bad happened!
Exception in initializer:
Traceback (most recent call last):
  File "/Users/kenwu/opt/anaconda3/envs/python_for_machine_learning/lib/python3.9/concurrent/futures/thread.py", line 72, in _worker
    initializer(*initargs)
  File "/var/folders/z_/pqnd1zt14yb6fdthpr75md480000gn/T/ipykernel_24974/3471916382.py", line 3, in initializer_worker
    raise Exception('Something bad happened!')
Exception: Something bad happened!


BrokenThreadPool: A thread initializer failed, the thread pool is not usable anymore

The thread pool is created as per normal, but as soon as we try to execute tasks, new worker threads are created, the custom worker thread initialization function is called, and raises an exception.

Multiple threads attempted to start, and in turn, multiple threads failed with an Exception. Finally, the thread pool itself logged a message that the pool is broken and cannot be used any longer.

## Exception Handling During Task Execution

An exception occurring while executing tasks will cause the task to stop executing, but will not break the thread pool. Instead, the exception will be caught by the thread pool and will be available via the `Future` object associated with the task via the `exception()` function.

Alternately, the exception will be re-raised if we call `result()` in the `Future` object. This will impact both calls to `submit()` and `map()` when adding tasks to the thread pool.

We have two options for handling exceptions in tasks:

1. Handle exceptions within the task function.
2. Handle exceptions when getting results from tasks.

### Handle Exception Within the Task

Handling the exception within the task means that we need some mechanism (try catch) to let the recipient of the result know that something unexpected happened.

This could be via the return value from the function, e.g. `None`.


In [4]:
def work():
    sleep(1)
    try:
        raise Exception("Something bad happened!")
    except Exception:
        return "Unable to get the result"
    return "never gets here"


# Create a thread pool
with ThreadPoolExecutor() as executor:
    # execute our task
    future = executor.submit(work)
    # Get the result from the task
    result = future.result()
    print(result)

Unable to get the result


Alternatively, we can re-raise an exception and have the recipient handle it directly.

In [5]:
def work():
    sleep(1)
    raise Exception("Something bad happened!")
    return "never gets here"


# Create a thread pool
with ThreadPoolExecutor() as executor:
    future = executor.submit(work)
    # Get the result from the task
    try:
        result = future.result()
    except Exception:
        print("Unable to get the result")

Unable to get the result


We can also check for the exception directly via a call to the `exception()` function on the `Future` object. This function blocks until an exception occurs and takes a timeout, just like a call to `result()`.

If an exception never occurs and the task is cancelled or completes successfully, then `exception()` will return `None`.



In [6]:
def work():
    sleep(1)
    raise Exception("Something bad happened!")
    return "never gets here"


with ThreadPoolExecutor() as executor:
    future = executor.submit(work)
    exception = future.exception()
    # Handle exceptional case
    if exception:
        print(exception)
    else:
        result = future.result()
        print(result)

Something bad happened!


We cannot check the `exception()` function of the `Future` object for each task when using `map()`, since it does not provide access to Future objects.

If `map()` is used to submit tasks to the thread pool, then the tasks should handle their own exceptions or be simple enough that exceptions are not expected.

## Exception Handling in Callbacks

When issuing tasks to the thread pool with a call to `submit()`, we receive a `Future` object in return on which we can register callback functions to call when the task completes via the `add_done_callback()` function.

This allows one or more callback functions to be registered that will be executed in the order in which they are registered.

These callbacks are always called, even if the task is cancelled or fails itself with an exception.

A callback can fail with an exception and it **will not impact other callback functions** that have been registered or the task.

The exception is caught by the thread pool, logged as an exception type message, and the procedure moves on. In a sense, **callbacks are able to fail silently**.

In [7]:
def custom_callback1(future):
    raise Exception("Something bad happened!")
    # Never gets here
    print("Callback 1 called.")


# Callback function to call when a task is completed
def custom_callback2(future):
    print("Callback 2 called.")


def work():
    sleep(1)
    return "Task is done"


with ThreadPoolExecutor() as executor:
    future = executor.submit(work)
    # Add the custom callbacks
    future.add_done_callback(custom_callback1)
    future.add_done_callback(custom_callback2)
    # Wait for the task to complete and get the result
    result = future.result()
    sleep(0.1)
    print(result)

exception calling callback for <Future at 0x10b12cc10 state=finished returned str>
Traceback (most recent call last):
  File "/Users/kenwu/opt/anaconda3/envs/python_for_machine_learning/lib/python3.9/concurrent/futures/_base.py", line 330, in _invoke_callbacks
    callback(self)
  File "/var/folders/z_/pqnd1zt14yb6fdthpr75md480000gn/T/ipykernel_24974/1074310337.py", line 2, in custom_callback1
    raise Exception('Something bad happened!')
Exception: Something bad happened!


Callback 2 called.
Task is done


When the task completes, the first callback is called, which fails with an exception. The exception is logged and reported on the console (the default behavior for logging).

The thread pool is not broken and carries on.

The second callback is called successfully, and finally, the main thread gets the result of the task.