## Future Monad

We can use the native `Thread` python class to execute some operations on a different "execution environment" (thread). BUT we have a problem: how do we operate over the results from threads? How can we compose  between different future values? etc.

The `Future` monad should allow us to manage the composition of:
* Map transformations: `A => B`
* FlatMap transformations: `A => Future[B]`

Therefore, we could operate and compose future values.

Let's start with the standard imports:
* `uuid`: create random ID
* `threading`: use the native thread python class.
* `typing`: provide type annotations

In [1]:
import uuid
import time
from threading import Thread
from typing import Generic, TypeVar, Callable

A = TypeVar('A')
B = TypeVar('B')


Create a new function that facilitates the composition of two functions:
* `this: X => Y`
* `and_then: Y => Z`
* `compose: X => Z`

In [2]:
def compose(this: Callable[..., A], and_then: Callable[[A], B]) -> Callable[..., B]:
    return lambda *x: and_then(this(*x))

Empty dictionary `results_registry` that will allow us to manage the results from the threads:

In [3]:
results_registry = {}

The `FutureValueStatus` class should help us identify the status of the thread:
* `pending`: waiting for result.
* `done`: thread exited.

```python
# Initialize as pending
val = FutureValueStatus.pending()

# Update to 'done' status with value=5
val.done(value=5)
```

In [4]:
class FutureValueStatus:
    PENDING = "pending"
    DONE = "done"

    def __init__(self, value, status):
        self.value = value
        self.status = status

    def __repr__(self):
        return f"FutureValueStatus({self.value}, {self.status})"

    @classmethod
    def pending(cls):
        return FutureValueStatus(value=None, status=cls.PENDING)

    def done(self, value):
        self.value = value
        self.status = self.DONE

Create a wrapper function to pass to the thread:
* Call the `worker` function
* Save the results on the `results_registry`

In [5]:

def worker_wrapper(worker: Callable, key: str, *args, **kwargs) -> Callable:
    def wrapper():
        global results_registry
        results_registry[key].done(value=worker(*args, **kwargs))
    return wrapper


Finally, create the `Future` class:

In [6]:
class Future(Generic[A]):

    def __init__(self, worker: Callable[..., A], *args, **kwargs):
        self.worker = worker
        self.key = str(uuid.uuid4())
        self.wrapper = worker_wrapper(worker, self.key, *args, **kwargs)
        self.thread = Thread(target=self.wrapper)
        self._result = None
        results_registry[self.key] = FutureValueStatus.pending()
        self.thread.start()

    def __str__(self):
        self.cleanup()
        if self._result is None:
            return f"Future({self.key})"
        return f"Future({self._result})"

    def __repr__(self):
        return str(self)
    
    def is_resolved(self):
        return self.key not in results_registry or results_registry[self.key].status == "done"
    
    def cleanup(self):
        if self.key in results_registry and results_registry[self.key].status == "done":
            self._result = results_registry.pop(self.key).value

    def wait(self, cleanup: bool = False) -> A:
        if self.key not in results_registry:
            return self._result
        if results_registry[self.key].status == "pending":
            self.thread.join()
        self._result = results_registry[self.key].value
        if cleanup:
            self.cleanup()
        return self._result

    def flat_map(self, f: Callable[[A], 'Future[B]']) -> 'Future[B]':
        return Future(worker=lambda: f(self.wait(cleanup=True)).wait(cleanup=True))

    @staticmethod
    def pure(x: A) -> 'Future[A]':
        return Future(worker=lambda: x)

    def map(self, f: Callable[[A], B]) -> 'Future[B]':
        function = compose(this=f, and_then=self.pure)
        return self.flat_map(function)

## Example: Single future

In [7]:
# Worker function

def add(a: int, b: int) -> int:
    time.sleep(10)
    return a + b

Executing the `add` function takes 10 seconds every time!

In [8]:
%%timeit -n 1 -r 1

add(5 ,2)

10 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


By using the `Future` class, this execution is done on a different thread. Therefore, the main program continues
without any delay.

In [9]:
%%timeit -n 1 -r 1

future_a = Future(worker=add, a=5, b=2)
future_a

1.18 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


The future object will eventually contain the future result (in this case 5 + 2 = 7) without blocking the execution of the main program.


In [10]:
future_a = Future(worker=add, a=5, b=2)
future_a

Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)

In [11]:
seconds = 0
while not future_a.is_resolved():
    time.sleep(1)
    seconds += 1 
    print(f"t={seconds}",future_a)

t=1 Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)
t=2 Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)
t=3 Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)
t=4 Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)
t=5 Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)
t=6 Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)
t=7 Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)
t=8 Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)
t=9 Future(c1d94210-0b43-46fe-86a4-45a9152ce3ea)
t=10 Future(7)


You can also wait on the result:

In [12]:
%%timeit -n 1 -r 1 

# When using wait, the execution time is back to 10s
Future(worker=add, a=5, b=2).wait()

10 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [13]:
# The return of 'wait' is the actual value we were waiting for
res = Future(worker=add, a=5, b=2).wait()
res

7

In essence, you can work in the present with values from the future **without waiting** by using the Future class. You can use the `wait` method to transform the future into a present value.

## Example: Future operations

You can operate over a future value even without having the value itself. Using `map` and `flat_map` will allow you to "transform" the future. 

In [14]:
future_a = Future(add, a=5, b=2)
future_a

Future(c5063f53-12fc-440d-990a-8c968e3eb3b2)

The `future_a` will contain the value 7 in 10 seconds from now.

We can multiply this value by 2 by applying `map`:

In [15]:
future_b = future_a.map(lambda a: 2 * a)
future_b

Future(1476fc9b-5a3a-4a3f-8e3e-6d1a10fb43a3)

The `future_b` will contain the value 14 one the `future_a` is resolved.

We can call again `add` over the future value of `future_b` to add 5):

In [16]:
future_c = future_b.map(lambda b: add(a=5, b=b))
future_c

Future(efa5904d-0ed3-40dc-b811-a8a9788a8bff)

Now, the `future_c` will be resolved 10 seconds after the `future_a`. Let's wait and see:

In [17]:
time.sleep(10)
print("Future A:", future_a)
print("Future C:", future_c)

Future A: Future(7)
Future C: Future(efa5904d-0ed3-40dc-b811-a8a9788a8bff)


Since we waited 10 seconds, the `future_a` should already be resolved, but the `future_c` is still pending. We can wait for `future_c` by either waiting another 10 seconds or calling the `wait` method.

In [18]:
future_c.wait()
print("Future C:", future_c)

Future C: Future(19)


Note that you don't have to wait for a future to be resolved, since you can use `map` or `flat_map` to compose new futures!

## Example: Future composition

Let's assume that we have two different functions:
* `add` takes 10 seconds to execute.
* `mult` takes 15 seconds to execute.

In [19]:
def add(a, b):
    time.sleep(10)
    return a + b

def mult(a, b):
    time.sleep(15)
    return a * b

Let's create a tuple with the following values:
* Element 1: mult 5 * 7
* Element 2: add 3 + 1 and then add another 7

In [20]:
%%timeit -n 1 -r 1

res = (mult(a=5, b=7), add(a=add(a=3, b=1), b=7))

35 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


Normal function composition in python takes 35 seconds since each function is executed sequentially.

Can we do any better with futures?

In [21]:
%%timeit -n 1 -r 1

future_1 = Future(worker=mult, a=5, b=7) # 15
future_2 = Future(worker=add, a=3, b=1).map(lambda x: add(a=x, b=7)) # 10 + 10 = 20


res = (future_1.wait(), future_2.wait())

20 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


We are able to reduce the execution time from 35s to 20s because:
* We can separate the problem into 2 different futures.
* The 'futures' are running 'at the same time'.

Why 20s?

Because we are calling `add` twice and each execution takes 10 seconds. Calling `mult` takes 15 seconds. Therefore, the longest waiting time (20) is the execution time of the program.

What if we wanted to sum (simple sum) the results from `future_1` and `future_2` ?

In [22]:
%%timeit -n 1 -r 1

future_1 = Future(worker=mult, a=5, b=7)
future_2 = Future(worker=add, a=3, b=1).map(lambda x: add(a=x, b=7)) 

future_3 = future_1.flat_map(lambda a: future_2.map(lambda b: a + b))
res = future_3.wait()

print(res)

46
20 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


We could create a new `future_3` with the  sum of the first 2 futures and wait for the results. This will still get 20s.

What if we try to solve the problem in a single expression?

In [23]:
%%timeit -n 1 -r 1

res = Future(worker=mult, a=5, b=7).flat_map(
    lambda a: Future(worker=add, a=3, b=1).map(
        lambda x: add(a=x, b=7)).map(
        lambda b: a + b)
).wait()

print(res)

46
35 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In order to solve the problem in the same expression, we had to declare nested futures. There's a limitation with this approach, as the nested future will only be executed once the top-level future finishes. Therefore, we are back to sequential execution (35s).

Avoid using nested futures, unless you want a sequence dependency.

Consider the following scenario:
* Apply the `add` function over each element of the argument list independently.
* Use the regular `sum` function over the result set to get the total.

In [24]:
arguments = [
    (50, 71),
    (10, 42),
    (22, 65),
    (57, 38),
    (40, 92),
    (10, 15)
]

Doing it sequentially takes about 60 seconds:

In [None]:
from typing import List

In [25]:
start = time.time()

# Apply add function over the arguments
results: List[int] = [add(*args) for args in arguments]

# Get the total
total = sum(results)

print(f"Took {round(time.time() - start, 2)}s: ", total)

Took 60.06s:  512


Doing it with futures only takes 10 seconds:

In [26]:
start = time.time()

# Apply add function over the arguments
results: List[Future] = [Future(add, *args) for args in arguments]

# Get the total
total = sum(res.wait() for res in results)

print(f"Took {round(time.time() - start, 2)}s: ", total)

Took 10.01s:  512


## Example: exceptions

Consider the following `div` function. This function fails after 5s when `b=0`.

In [27]:
def div(a, b):
    time.sleep(5)
    return a / b

An exception on a separate thread does not affects the main program execution:

In [28]:
seconds = 0

future_a = Future(worker=div, a=1, b=0)
future_b = Future(worker=div, a=1, b=2)

while seconds < 10:
    print(f"t={seconds}",future_a, future_b)
    time.sleep(1)
    seconds += 1 
    

t=0 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(2b3064e7-4c26-44a7-b9cd-9404b1c2f102)
t=1 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(2b3064e7-4c26-44a7-b9cd-9404b1c2f102)
t=2 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(2b3064e7-4c26-44a7-b9cd-9404b1c2f102)
t=3 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(2b3064e7-4c26-44a7-b9cd-9404b1c2f102)
t=4 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(2b3064e7-4c26-44a7-b9cd-9404b1c2f102)


Exception in thread Thread-37:
Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-5-eab3929bef55>", line 4, in wrapper
  File "<ipython-input-27-9232cd332f44>", line 3, in div
ZeroDivisionError: division by zero


t=5 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(0.5)
t=6 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(0.5)
t=7 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(0.5)
t=8 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(0.5)
t=9 Future(3f87821d-e675-4629-8607-dfbbadc93c74) Future(0.5)


As you have noticed, there was a failure between `t=4 & t=5` BUT the program execution continued and the `future_a` kept unresolved.

We could solve this problem by incorporating the `Option` monad!