# Quick introduction to OTF

> On the fly distributed workflows in ipython


I've been building infrastructure for quants and data-scientists since 2016. Over that time I observed that there's a big mismatch between how data-scientists work on a day to day basis and the interface provided by most workflow management systems. Typically data-scientist do *explorative programming* in notebooks, they rely on the data staying in memory and use the cells in the notebook as a form of checkpointing system (if you make a mistake you can edit the cell in which the mistake was made and re-run the code from there).

The transition to big-data often involves throwing away the tools they are used to, dealing with errors that come from layers that are abstracted deep under the API they use (e.g.: serialisation problems). A lot of the pain comes from the fact that the systems they are transition too are designed to write complex distributed computing workload, not one off computations or glue code between existing algorithms.


## What is *OTF*

*OTF* is a framework to write glue code. It provides the building blocks to edit functions and launch computations directly from notebooks.

### Under the hood

*OTF* is a framework that does "continuation-based" workflow orchestration. Other workflow managment systems (*wms*) either:
+ Use a **static computation graph**: create a graph of computations and then run that graph (aka: Define then run)
+ Are **tied to a machine**: The *wms* relies on having a node stay up for the whole duration on the workflow.

*OTF* is unique in that it is able to save snapshots of the orchestration code when it's waiting on computations. As a result the orchestration code can be resumed on any machine after the result of an awaited computation are awailable. As we'll see these snapshots also provide a great tool to debug runs, fix any issues we might have found with the code and resume our edited workflow from the middle.

In [1]:
import math
import otf
import pickle
from otf import local_scheduler

All the functions we are defining live in an environment. They only have access to the values defined in that environement:

In [2]:
e = otf.Environment(
    math=otf.NamedReference(math),
    local_scheduler=otf.NamedReference(local_scheduler),
)

In [3]:
@e.function
def is_prime(n: int) -> float:
    """check whether a number is prime

    Taken from https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example
    """
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

`is_prime` is a function. It can be called:

In [4]:
is_prime(5), is_prime(435345235)

(True, False)

But it can also be serialised in a format that is easy to introspect and is stable:

In [5]:
print(otf.dumps(is_prime, indent=4))

otf.Closure(
    {
        'environment': {
            'math': otf.NamedReference('math'),
            'local_scheduler': otf.NamedReference(
                'otf.local_scheduler'
            ),
            'is_prime': otf.Function(
                {
                    'name': 'is_prime',
                    'signature': ['n'],
                    'body': (
                        '    """check whether a number is prime\n'
                        '\n'
                        '    Taken from https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example\n'
                        '    """\n'
                        '    if n < 2:\n'
                        '        return False\n'
                        '    if n == 2:\n'
                        '        return True\n'
                        '    if n % 2 == 0:\n'
                        '        return False\n'
                        '    sqrt_n = int(math.floor(math.sqrt(n)))\n'
                        '   

## Defining a workflow

In [6]:
@e.workflow
async def get_primes(candidates: list[int]) -> list[int]:
    primes = []
    futures = [local_scheduler.defer(is_prime, x) for x in candidates]
    while futures:
        candidate = candidates.pop()
        fut = futures.pop()
        ok = await fut
        if ok:
            primes.append(candidate)
    return primes

In [7]:
V = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419,
]

In [8]:
with local_scheduler.Scheduler() as schd:
    trace = schd.run(get_primes, V)

The scheduler returns a trace: it saved a checkpoint every time it has hit an `await`.

> *NOTE* This part uses IPyWidgets, it works fine in Jupyter but some notebook renderers do not support it well.

In [9]:
trace

Tab(children=(VBox(children=(HTML(value='<div class="otf-highlight"><pre><span></span>    <span class="n">prim…

The trace has a linked list of all the checkpoints. The checkpoints contain all the information required to restart the computation:

In [10]:
print(otf.dumps(trace.parent.suspension, indent=4))

otf.Suspension(
    {
        'environment': otf.Environment(
            {
                'math': otf.NamedReference('math'),
                'local_scheduler': otf.NamedReference(
                    'otf.local_scheduler'
                ),
                'is_prime': otf.Function(
                    {
                        'name': 'is_prime',
                        'signature': ['n'],
                        'body': (
                            '    """check whether a number is prime\n'
                            '\n'
                            '    Taken from https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example\n'
                            '    """\n'
                            '    if n < 2:\n'
                            '        return False\n'
                            '    if n == 2:\n'
                            '        return True\n'
                            '    if n % 2 == 0:\n'
                            '        return 

And of-course, we can also access the result of our computation:

In [11]:
trace.value

[115797848077099,
 115280095190773,
 112272535095293,
 112582705942171,
 112272535095293]

## Further reading
*OTF* is still in pre-alpha. All of the code is a proof of concept and there's not too much documentation. Here are a couple of follow up that might be worth reading:

+ We rewrite the AST of workflows to make the functions re-rentrant: [rtd](https://otf.readthedocs.io/en/latest/deep_dives/workflow_compilation.html)
+ The documentation of the serialization library has some more details on how it works: [rtd](https://otf.readthedocs.io/en/latest/modules/pack.html)