---

### `concurrent.futures`

The `concurrent.futures` modules provides interfaces for running tasks using pools of thread or process workers. The APIs are the same, so applications can switch between threads and processes with minimal changes.

The module provides two types of classes for interacting with the pools. *Executors* are used for managing pools of workers, and *futures* are used for managing results computed by the workers. To use a pool of workers, an application creates an instance of the appropriate executor class and then submits tasks for it to run. When each task is started, a `Future` instance is returned. When the result of the task is needed, an application can use the `Future` to block until the result is available. Various APIs are provided to make it convenient to wait for tasks to complete, so that the `Future` objects do not need to be managed directly.

[Python Doc](https://docs.python.org/3/library/concurrent.futures.html)

#### API Structure


This module provides two classes: `Executor` class and `Future` class. `Executor` class is further subclassed into `ThreadPoolExecutor` class and `ProcessPoolExecutor`.

                                     concurrent.futures
                                            |
                                            |
                       -------------------------------------------------------------------------------
                       |                                                 |                            |
                       |                                                 |                            |
                    Executor class                                     Future class                module functions
                       |                                                 |                            |
                       |--subclasses--ThreadPoolExecutor                 |                             --wait()
                       |            |                                    |                             --as_completed()
                       |            --ProcessPoolExecutor                |
                       |                                                 |
                       |--Methods--submit()-> returns Future instances-->|--methods
                                 --map()                                       |
                                 --shutdown()                                 ---cancel()
                                                                              ---cancelled()
                                                                              ---running()    
                                                                              ---done()
                                                                              ---result()
                                                                              ---exception()
                                                                              ---add_done_callback()

In [29]:
import concurrent.futures as conf

with conf.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.map(pow, [2,3], [2,2])
    for i in future:
        print(i)

4
9


The `ThreadPoolExecutor` manages a set of worker threads, passing tasks to them as they become available for more work. This example uses `map()` to concurrently produce a set of results from an input iterable. The task uses `time.sleep()` to pause a different amount of time to demonstrate that, regardless of the order of execution of concurrent tasks, `map()` always returns the values in order based on the inputs.

**`map(func, *iterables, timeout=None, chunksize=1)`**

It returns a type of iterator. Similar to `map(func, *iterables)` except:
 - the iterables are collected immediately rather than lazily;
 - `func` is executed asynchronously and several calls to func may be made concurrently.

**`class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())`**

`initializer` is an optional callable that is called at the start of each worker thread; `initargs` is a tuple of arguments passed to the `initializer`. Should `initializer` raise an exception, all currently pending jobs will raise a `BrokenThreadPool`, as well as any attempt to submit more jobs to the pool.

In [2]:
%%file conf1.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(threading.current_thread().name,n))
    time.sleep(n / 10)
    print('{}: done with {}'.format(threading.current_thread().name,n))
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results))

Overwriting conf1.py


Output:

```
main: starting
ThreadPoolExecutor-0_0: sleeping 5
ThreadPoolExecutor-0_1: sleeping 4
main: unprocessed results <generator object Executor.map.<locals>.result_iterator at 0x0000007700DD7890>
main: waiting for real results
ThreadPoolExecutor-0_1: done with 4
ThreadPoolExecutor-0_1: sleeping 3
ThreadPoolExecutor-0_0: done with 5
ThreadPoolExecutor-0_0: sleeping 2
ThreadPoolExecutor-0_0: done with 2
ThreadPoolExecutor-0_0: sleeping 1
ThreadPoolExecutor-0_1: done with 3
ThreadPoolExecutor-0_0: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]
```


The return value from `map()` is actually a special type of iterator that knows to wait for each response as the main program iterates over it.

In addition to using `map()`, it is possible to schedule an individual task with an executor using `submit()`, and use the `Future` instance returned to wait for that task’s results.

 **`submit(fn, /, *args, **kwargs)`**

It returns an instance of `Future` class. It schedules the callable, `fn`, to be executed as `fn(*args, **kwargs)` and returns a `Future` object representing the execution of the callable.

In [3]:
%%file conf2.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(threading.current_thread().name,n))
    time.sleep(n / 10)
    print('{}: done with {}'.format(threading.current_thread().name,n))
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))

Writing conf2.py


Output:

```
main: starting
ThreadPoolExecutor-0_0: sleeping 5
main: future: <Future at 0xba34fbe640 state=running>
main: waiting for results
ThreadPoolExecutor-0_0: done with 5
main: result: 0.5
main: future after result: <Future at 0xba34fbe640 state=finished returned float>
```

The status of the future changes after the tasks is completed and the result is made available.

#### Waiting for Tasks in Any Order

Invoking the `result()` method of a `Future` blocks until the task completes (either by returning a value or raising an exception), or is canceled. The results of multiple tasks can be accessed in the order the tasks were scheduled using `map()`. If it does not matter what order the results should be processed, use `as_completed()` to process them as each task finishes.

**`concurrent.futures.as_completed(fs, timeout=None)`**

Note that, unlike class methods `submit() or map()`, `as_completed` is a module function.

Returns an iterator over the `Future` instances (possibly created by different `Executor` instances) given by `fs` that yields futures as they complete (finished or cancelled futures). Any futures given by `fs` that are duplicated will be returned once.

In [5]:
%%file conf3.py

from concurrent import futures
import random
import time


def task(n):
    time.sleep(random.random())
    return (n, n / 10)


ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')

wait_for = [
    ex.submit(task, i) for i in range(5, 0, -1)]

for f in futures.as_completed(wait_for):
    print('main: result: {}'.format(f.result()))
    

Overwriting conf3.py


Output:

```
main: starting
main: result: (2, 0.2)
main: result: (3, 0.3)
main: result: (4, 0.4)
main: result: (1, 0.1)
main: result: (5, 0.5)
```

There is another module function `wait`.

**`concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)`**

Wait for the `Future` instances (possibly created by different `Executor` instances) given by `fs` to complete. Duplicate futures given to `fs` are removed and will be returned only once. Returns a named 2-tuple of sets. The first set, named `done`, contains the futures that completed (finished or cancelled futures) before the wait completed. The second set, named `not_done`, contains the futures that did not complete (pending or running futures).

As of writing this, I couldn't immediately find an example of this function so I modified the above program:


In [6]:
%%file conf4.py

from concurrent import futures
import random
import time


def task(n):
    time.sleep(random.random())
    return (n, n / 10)


ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')

wait_for = [
    ex.submit(task, i) for i in range(5, 0, -1)]

print(futures.wait(wait_for))



Writing conf4.py


Output:

```
main: starting
DoneAndNotDoneFutures(done={<Future at 0x702e8bca90 state=finished returned tupl
e>, <Future at 0x702e8bc700 state=finished returned tuple>, <Future at 0x702e89e
f40 state=finished returned tuple>, <Future at 0x702e8bc370 state=finished retur
ned tuple>, <Future at 0x702e8b1fa0 state=finished returned tuple>}, not_done=set())
```


**Future Objects**

As mentioned elsewhere, `Executor.submit()` method returns instances of `Future` class. These instances have following methods available - 

 - `cancel()` - Attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return `False`, otherwise the call will be cancelled and the method will return `True`.
 
 
 - `cancelled()` - Return `True` if the call was successfully cancelled.
 
 
 - `running()` - Return `True` if the call is currently being executed and cannot be cancelled.
 
 
 - `done()` - Return `True` if the call was successfully cancelled or finished running.
 
 
 - `result(timeout = None)` - Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to `timeout` seconds. If the call hasn’t completed in `timeout` seconds, then a `TimeoutError` will be raised. `timeout` can be an `int` or `float`. If `timeout` is not specified or `None`, there is no limit to the wait time. If the future is cancelled before completing then `CancelledError` will be raised. If the call raised an exception, this method will raise the same exception.


 - `exception(timeout = None)` - Return the exception raised by the call. If the call hasn’t yet completed then this method will wait up to `timeout` seconds. If the call hasn’t completed in `timeout` seconds, then a `TimeoutError` will be raised. `timeout` can be an `int` or `float`. If `timeout` is not specified or `None`, there is no limit to the wait time.

    If the future is cancelled before completing then `CancelledError` will be raised.

    If the call completed without raising, `None` is returned.


 - `add_done_callback(fn)` - Attaches the callable `fn` to the future. `fn` will be called, with the future as its only argument, when the future is cancelled or finishes running.

    Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them. If the callable raises an `Exception` subclass, it will be logged and ignored. If the callable raises a `BaseException` subclass, the behavior is undefined.

    If the future has already completed or been cancelled, `fn` will be called immediately.


**Future Callbacks**

To take some action when a task completed, without explicitly waiting for the result, use `add_done_callback()` to specify a new function to call when the `Future` is done. The callback should be a callable taking a single argument, the `Future`
instance.


In [7]:
%%file conf5.py

from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print('{}: error returned: {}'.format(fn.arg, error))
        else:
            result = fn.result()
            print('{}: value returned: {}'.format(fn.arg, result))
            
if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    f = ex.submit(task, 5)
    f.arg = 5
    f.add_done_callback(done)
    result = f.result()            

Writing conf5.py


Output:

```
main: starting
5: sleeping
5: done
5: value returned: 0.5
```

The callback is invoked regardless of the reason the Future is considered “done,” so it is necessary to check the status of the object passed in to the callback before using it in any way.

**Canceling Tasks**

A `Future` can be canceled, if it has been submitted but not started, by calling its `cancel()` method.



In [8]:
%%file conf6.py

from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        print('{}: not canceled'.format(fn.arg))


if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    tasks = []

    for i in range(10, 0, -1):
        print('main: submitting {}'.format(i))
        f = ex.submit(task, i)
        f.arg = i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, t in reversed(tasks):
        if not t.cancel():
            print('main: did not cancel {}'.format(i))

    ex.shutdown()

Writing conf6.py


Output:

```
main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled
```

**Exceptions in Tasks**

If a task raises an unhandled exception, it is saved to the `Future` for the task and made available through the `result()` or `exception()` methods.

If `result()` is called after an unhandled exception is raised within a task function, the same exception is re-raised in the current context.

In [10]:
%%file conf7.py

from concurrent import futures


def task(n):
    print('{}: starting'.format(n))
    raise ValueError('the value {} is no good'.format(n))


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)

error = f.exception()
print('main: error: {}'.format(error))

try:
    result = f.result()
except ValueError as e:
    print('main: saw error "{}" when accessing result'.format(e))

Writing conf7.py


Output:

```
main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result
```

#### Context Manager

Executors work as context managers, running tasks concurrently and waiting for them all to complete. When the context manager exits, the `shutdown()` method of the executor is called.

In [11]:
%%file conf8.py

from concurrent import futures


def task(n):
    print(n)


with futures.ThreadPoolExecutor(max_workers=2) as ex:
    print('main: starting')
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)

print('main: done')

Writing conf8.py


Output:

```
main: starting
1
2
3
4
main: done
```

**Deadlocks**

Deadlocks can occur when the callable associated with a `Future` waits on the results of another `Future`. For example:

In [3]:
%%file deadlock1.py

import concurrent.futures as conf
import time

def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = conf.ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
print(a.running())
print(b.running())

Overwriting deadlock1.py


In [4]:
%%file deadlock2.py

import concurrent.futures as conf

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = conf.ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

Writing deadlock2.py


**Process Pools**

The `ProcessPoolExecutor` works in the same way as `ThreadPoolExecutor`, but uses processes instead of threads. This allows CPU-intensive operations to use a separate CPU and not be blocked by the CPython interpreter’s global interpreter lock.



In [6]:
%%file conf9.py

from concurrent import futures
import os


def task(n):
    return (n, os.getpid())

def main():
    ex = futures.ProcessPoolExecutor(max_workers=2)
    results = ex.map(task, range(5, 0, -1))

    for n, pid in results:
        print('ran task {} in process {}'.format(n, pid))
        
if __name__ == '__main__':
    main()        

Overwriting conf9.py


above example is raising `BrokenProcessPool` exception. Not sure what went wrong. 

Edit: This [SO Post](https://stackoverflow.com/questions/15900366/all-example-concurrent-futures-code-is-failing-with-brokenprocesspool?rq=1) raised somewhat similar issue. As indicated by this post, I used the `if __name__== '__main__'` idiom and reran the code. It worked.

I later got to know that [Python official docs](https://docs.python.org/3.7/library/multiprocessing.html#the-process-class) explicitly asks us to use `if __name__== '__main__'` idiom to protect the entry point in `multiprocessing` codes.

Output:

```
ran task 5 in process 4152
ran task 4 in process 1592
ran task 3 in process 4152
ran task 2 in process 1592
ran task 1 in process 4152
```


As with the thread pool, individual worker processes are reused for multiple tasks.

If something happens to one of the worker processes to cause it to exit unexpectedly, the `ProcessPoolExecutor` is considered “broken” and will no longer schedule tasks.

In [14]:
%%file conf10.py

from concurrent import futures
import os
import signal


with futures.ProcessPoolExecutor(max_workers=2) as ex:
    print('getting the pid for one worker')
    f1 = ex.submit(os.getpid)
    pid1 = f1.result()

    print('killing process {}'.format(pid1))
    os.kill(pid1, signal.SIGHUP)

    print('submitting another task')
    f2 = ex.submit(os.getpid)
    try:
        pid2 = f2.result()
    except futures.process.BrokenProcessPool as e:
        print('could not start new tasks: {}'.format(e))
        
#although it was supposed to raise exception, which it did, I'm still not sure if this ran as intended.        

Writing conf10.py


The `BrokenProcessPool` exception is actually thrown when the results are processed, rather than when the new task is submitted.

In [16]:
%%file conf11.py

#ProcessPoolExecutor example
#this example is from Python docs

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Writing conf11.py


Output:

```
112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False
```

In [17]:
%%file conf12.py

#ThreadPoolExecutor example
#this example is from Python docs


import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

Writing conf12.py


Output:

```
'http://europe.wsj.com/' generated an exception: HTTP Error 403: Forbidden
'http://www.bbc.co.uk/' page is 457219 bytes
'http://www.foxnews.com/' page is 282703 bytes
'http://www.cnn.com/' page is 1145339 bytes
'http://some-made-up-domain.com/' generated an exception: HTTP Error 403: Forbidden
```