# redun's scheduler walk through

In this notebook, we'll walk through how redun's scheduler implements workflow execution. Briefly, redun represents workflows as computional expressions that are iteratively evaluated by a scheduler using a technique called [graph reduction](https://en.wikipedia.org/wiki/Graph_reduction). This approach allows users to specify large computational workflows in a very natural and expressive style.

If you haven't already please see the first two examples for an introduction to redun's basic features. Those examples executed workflows using the `redun` CLI (command-line interface), which is the most common way to execute a workflow. In this notebook, we will run workflows programmatically using the `Scheduler` class in order to inspect the execution process more closely. The redun CLI is just a thin wrapper around the `Scheduler`.

In [1]:
# Imports.
import logging, time

from redun import Scheduler, task

In [2]:
# Create a minimal Scheduler that uses in-memory cache.

scheduler = Scheduler()
scheduler.logger.setLevel(logging.INFO)

[redun] Upgrading db from version -1.0 to 3.1...


In [3]:
# Create a simple redun task.

# Tasks will automatically use the global variable `redun_namespace` as their namespace.
redun_namespace = "redun_notebook"

@task()
def add(a, b):
    return a + b

In [4]:
# The `@task` decorator turns functions into Task objects.
add

Task(fullname=redun_notebook.add, hash=496bc317)

In [5]:
# Tasks have several properties, such as a name, a hash, their source code, and the original wrapped function.
print("name", add.name)
print("hash", add.hash)
print("func", add.func)
print(add.source)

name add
hash 496bc31711bb263aa03d3064f2c6939d8cccec42
func <function add at 0x7fa0a9a5b9d0>
def add(a, b):
    return a + b



In [6]:
# The key trick is that task calls are intercepted and return Expressions.
add(1, 2)

TaskExpression('redun_notebook.add', (1, 2), {})

In [96]:
# The most common Expression is a TaskExpression. The following Expression,
#   TaskExpression('redun_notebook.add', (1, 2), {})
# represents calling the task 'redun_notebook.add' with positional arguments (1, 2) and keyword arguments {}.

In [7]:
# To evaluate an expression, use scheduler.run():
scheduler.run(add(1, 2))

[redun] Start Execution 0f9c60b3-89b6-4fcd-b9ad-3c271a74e3f6:  redun 'TaskExpression('"'"'redun_notebook.add'"'"', (1, 2), {})'
[redun] Run    Job 437dae71:  redun_notebook.add(a=1, b=2) on default
[redun] 
[redun] | JOB STATUS 2022/04/27 16:36:47
[redun] | TASK               PENDING RUNNING  FAILED  CACHED    DONE   TOTAL
[redun] | 
[redun] | ALL                      0       0       0       0       1       1
[redun] | redun_notebook.add       0       0       0       0       1       1
[redun] 
[redun] Execution duration: 0.04 seconds


3

In [8]:
# Expressions can be combined into an expression tree/graph.
add(add(1, 2), 3)

TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (1, 2), {}), 3), {})

In [11]:
# We can evaluate a whole expression tree using the scheduler:
scheduler.run(add(add(1, 2), 3))

[redun] Start Execution 90a49a83-f685-4928-9096-d161078ec71d:  redun 'TaskExpression('"'"'redun.root_task'"'"', (<redun.expression.QuotedExpression object at 0x7fa090286e20>,), {})'
[redun] Cached Job 87c72e97:  redun.root_task(expr=<redun.expression.QuotedExpression object at 0x7fa090286e20>) (eval_hash=beb19b56)
[redun] Cached Job 956e4a66:  redun_notebook.add(a=1, b=2) (eval_hash=a162a83f)
[redun] Cached Job 1f7f2900:  redun_notebook.add(a=3, b=3) (eval_hash=27dfaec4)
[redun] 
[redun] | JOB STATUS 2022/04/27 16:37:40
[redun] | TASK               PENDING RUNNING  FAILED  CACHED    DONE   TOTAL
[redun] | 
[redun] | ALL                      0       0       0       4       6      10
[redun] | redun.root_task          0       0       0       1       2       3
[redun] | redun_notebook.add       0       0       0       3       4       7
[redun] 
[redun] Execution duration: 0.03 seconds


6

In [12]:
# We have defined a small workflow/pipeline using what others call "invocation style".

In [13]:
# Let's redefine the add() task to be artificially slower, so we can more easily see tasks running in parallel.

@task(version='1')
def add(a, b):
    print(f"start add({a}, {b})")
    time.sleep(1)
    print(f"stop  add({a}, {b})")
    return a + b

In [14]:
# Task source and hash should now be updated.

print(add.source)

add.hash

def add(a, b):
    print(f"start add({a}, {b})")
    time.sleep(1)
    print(f"stop  add({a}, {b})")
    return a + b



'037d0e72cc62a8360d7cb321a68135eb790f17cd'

In [15]:
# Expressions that do not depend on each other (e.g. `add(1, 2)` and `add(3, 4)`) will 
# automatically run in parallel. Notice how we see two starts and then two stops.
scheduler = Scheduler()
scheduler.run(add(add(1, 2), add(3, 4)))

[redun] Upgrading db from version -1.0 to 3.1...
[redun] Start Execution df0c5f2d-3bd5-4a38-b2a4-e10eba5c03ac:  redun 'TaskExpression('"'"'redun.root_task'"'"', (<redun.expression.QuotedExpression object at 0x7fa0c068f940>,), {})'
[redun] Run    Job 857d825b:  redun.root_task(expr=<redun.expression.QuotedExpression object at 0x7fa0c068f940>) on default
[redun] Run    Job 80d4aa09:  redun_notebook.add(a=1, b=2) on default


start add(1, 2)

[redun] Run    Job 4a4de46b:  redun_notebook.add(a=3, b=4) on default



start add(3, 4)
stop  add(1, 2)
stop  add(3, 4)


[redun] Run    Job 72131e56:  redun_notebook.add(a=3, b=7) on default


start add(3, 7)
stop  add(3, 7)


[redun] 
[redun] | JOB STATUS 2022/04/27 16:38:15
[redun] | TASK               PENDING RUNNING  FAILED  CACHED    DONE   TOTAL
[redun] | 
[redun] | ALL                      0       0       0       0       4       4
[redun] | redun.root_task          0       0       0       0       1       1
[redun] | redun_notebook.add       0       0       0       0       3       3
[redun] 
[redun] Execution duration: 2.08 seconds


10

## Executing workflows with graph reduction

The redun scheduler treats the expression `add(add(1, 2), add(3, 4))` as a tree (or DAG more generally). In the figure below, concrete values (anything that's not a subclass of `Expression`) are drawn in grey boxes, and `TaskExpressions` are drawn in blue boxes.

<img width="50%" src="images/expression-tree.png">

When `scheduler.run()` is applied, it begins to identify subtrees that can be *reduced*, namely tasks with concrete arguments (see red boxes below). The scheduler will perform these reductions by calling tasks (e.g. `add`) with the child nodes as the arguments. These task executions can occur in parallel, which is safe to do because tasks are required to be written as pure functions with no side-effects. The scheduler can also consult the cache before performing each reduction to see if it can skip executing a task and instead just replay the result.

<img width="50%" src="images/expression-tree2.png">

When a task completes its execution, its result is replaced into the expression tree.

<img width="25%" src="images/expression-tree3.png">

As reductions complete, additional reductions become possible.

<img width="25%" src="images/expression-tree4.png">

Eventually the expression graph reduces to a single concrete value and the workflow execution concludes.

<img width="12%" src="images/expression-tree5.png">

In [16]:
# Let's say we add four numbers a lot. Let's abstract it into its own task.

@task()
def add4(a, b, c, d):
    return add(add(a, b), add(c, d))

In [17]:
scheduler.run(add4(5, 6, 7, 8))

[redun] Start Execution d12b0029-c71e-43c1-8647-b6628d20ab21:  redun 'TaskExpression('"'"'redun_notebook.add4'"'"', (5, 6, 7, 8), {})'
[redun] Run    Job 46f3fdca:  redun_notebook.add4(a=5, b=6, c=7, d=8) on default
[redun] Run    Job 253320ed:  redun_notebook.add(a=5, b=6) on default


start add(5, 6)


[redun] Run    Job db923a81:  redun_notebook.add(a=7, b=8) on default


start add(7, 8)
stop  add(5, 6)
stop  add(7, 8)


[redun] Run    Job c897a0c1:  redun_notebook.add(a=11, b=15) on default


start add(11, 15)
stop  add(11, 15)


[redun] 
[redun] | JOB STATUS 2022/04/27 16:39:56
[redun] | TASK                PENDING RUNNING  FAILED  CACHED    DONE   TOTAL
[redun] | 
[redun] | ALL                       0       0       0       0       8       8
[redun] | redun.root_task           0       0       0       0       1       1
[redun] | redun_notebook.add        0       0       0       0       6       6
[redun] | redun_notebook.add4       0       0       0       0       1       1
[redun] 
[redun] Execution duration: 2.09 seconds


26

In [106]:
# This returns the expected result. If a redun task returns an expression, 
# the scheduler will perform a follow up evaluation.

## Graph reductions that return more expressions

The expression `add4(5, 6, 7, 8)` would look like the following expression tree.

<img width="50%" src="images/expression-tree6.png">

As we saw above, instead of returning a concrete value, task `add4` actually returned more expressions. This is a completely valid thing to do, and when it happens, the redun scheduler will simply place those expressions into a expression graph.

<img width="50%" src="images/expression-tree7.png">

The scheduler will begin to recurse into the new expressions looking for more reductions. 

In this way, the expression graph can actually expand and contract. Being able to dynamically grow the expression graph is what gives redun its ability to express very dynamic workflows.

This process actually follows the implementation of classic functional programming language compilers and interpreters.

In [18]:
# Let's generalize 'add' to arbitrary number of values.

@task()
def add_many(values, init=0):
    if len(values) == 0:
        return init
    elif len(values) == 1:
        return values[0]
    else:
        k = len(values) // 2
        sum1 = add_many(values[:k])
        sum2 = add_many(values[k:])
        return add(sum1, sum2)

In [19]:
# As this expression is evaluated imagine how the expression tree is expanding and contracting.
scheduler.run(add_many(list(range(100))))

[redun] Start Execution cfc2b087-c8ce-47e7-aeec-33fbde829fb5:  redun 'TaskExpression('"'"'redun_notebook.add_many'"'"', ([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,...'
[redun] Run    Job addae7e9:  redun_notebook.add_many(values=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51..., init=0) on default
[redun] Run    Job 8d650987:  redun_notebook.add_many(values=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49], init=0) on default
[redun] Run    Job b9afed88:  redun_notebook.add_many(values=[50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 

[redun] Run    Job 31bc3ab3:  redun_notebook.add_many(values=[10, 11], init=0) on default
[redun] Run    Job 95534ee7:  redun_notebook.add_many(values=[12], init=0) on default
[redun] Run    Job 1ca0c27e:  redun_notebook.add_many(values=[13, 14], init=0) on default
[redun] Run    Job 9f5943bc:  redun_notebook.add_many(values=[15], init=0) on default
[redun] Run    Job 43b2d353:  redun_notebook.add_many(values=[16, 17], init=0) on default
[redun] Run    Job 279e9c93:  redun_notebook.add_many(values=[18], init=0) on default
[redun] Run    Job 392b2d3e:  redun_notebook.add_many(values=[19, 20], init=0) on default
[redun] Run    Job b76afed9:  redun_notebook.add_many(values=[21, 22], init=0) on default
[redun] Run    Job fa03aa9f:  redun_notebook.add_many(values=[23, 24], init=0) on default
[redun] Run    Job 1bc1d233:  redun_notebook.add_many(values=[25], init=0) on default
[redun] Run    Job 1855523a:  redun_notebook.add_many(values=[26, 27], init=0) on default
[redun] Run    Job cd47226

[redun] Run    Job 66baf4ec:  redun_notebook.add_many(values=[52], init=0) on default
[redun] Run    Job fe488a45:  redun_notebook.add_many(values=[54], init=0) on default
[redun] Run    Job b3e71845:  redun_notebook.add_many(values=[55], init=0) on default
[redun] Run    Job 2d2044f1:  redun_notebook.add_many(values=[57], init=0) on default
[redun] Run    Job ce8779de:  redun_notebook.add_many(values=[58], init=0) on default
[redun] Run    Job bb0cf754:  redun_notebook.add_many(values=[60], init=0) on default
[redun] Run    Job 1e4a65b6:  redun_notebook.add_many(values=[61], init=0) on default
[redun] Run    Job d6f1c24b:  redun_notebook.add_many(values=[63], init=0) on default
[redun] Run    Job 099f9d26:  redun_notebook.add_many(values=[64], init=0) on default
[redun] Run    Job 4e6f5e92:  redun_notebook.add_many(values=[66], init=0) on default
[redun] Run    Job 265f5986:  redun_notebook.add_many(values=[67], init=0) on default
[redun] Run    Job 863f754a:  redun_notebook.add_many(

start add(4, 5)

[redun] Cached Job afa10d75:  redun_notebook.add(a=7, b=8) (eval_hash=b6ce28ae)





[redun] Run    Job cb98d01e:  redun_notebook.add(a=10, b=11) on default


start add(10, 11)


[redun] Run    Job b36e7bcc:  redun_notebook.add(a=13, b=14) on default


start add(13, 14)


[redun] Run    Job 6212b8f4:  redun_notebook.add(a=16, b=17) on default


start add(16, 17)


[redun] Run    Job aef724ff:  redun_notebook.add(a=19, b=20) on default


start add(19, 20)


[redun] Run    Job 26325d9c:  redun_notebook.add(a=21, b=22) on default


start add(21, 22)


[redun] Run    Job 8eea82fc:  redun_notebook.add(a=23, b=24) on default


start add(23, 24)


[redun] Run    Job 631469f9:  redun_notebook.add(a=26, b=27) on default


start add(26, 27)


[redun] Run    Job 5fe64643:  redun_notebook.add(a=29, b=30) on default


start add(29, 30)

[redun] Run    Job a87b165e:  redun_notebook.add(a=32, b=33) on default



start add(32, 33)


[redun] Run    Job 0b208b17:  redun_notebook.add(a=35, b=36) on default


start add(35, 36)


[redun] Run    Job d3e0a985:  redun_notebook.add(a=38, b=39) on default


start add(38, 39)


[redun] Run    Job 540b0879:  redun_notebook.add(a=41, b=42) on default


start add(41, 42)


[redun] Run    Job 21973c42:  redun_notebook.add(a=44, b=45) on default


start add(44, 45)


[redun] Run    Job 3a73a96a:  redun_notebook.add(a=46, b=47) on default


start add(46, 47)


[redun] Run    Job 4bc932a3:  redun_notebook.add(a=48, b=49) on default


start add(48, 49)


[redun] Run    Job ae3011e3:  redun_notebook.add(a=51, b=52) on default


start add(51, 52)


[redun] Run    Job 4cf0a46a:  redun_notebook.add(a=54, b=55) on default


start add(54, 55)


[redun] Run    Job 477c7531:  redun_notebook.add(a=57, b=58) on default


start add(57, 58)


[redun] Run    Job b6d73688:  redun_notebook.add(a=60, b=61) on default


start add(60, 61)


[redun] Run    Job 1a9464f9:  redun_notebook.add(a=63, b=64) on default
[redun] Run    Job 900e0f22:  redun_notebook.add(a=66, b=67) on default
[redun] Run    Job f2910646:  redun_notebook.add(a=69, b=70) on default
[redun] Run    Job 5e7d9f3a:  redun_notebook.add(a=71, b=72) on default
[redun] Run    Job 180e6958:  redun_notebook.add(a=73, b=74) on default
[redun] Run    Job 59540c8b:  redun_notebook.add(a=76, b=77) on default
[redun] Run    Job d20001bc:  redun_notebook.add(a=79, b=80) on default
[redun] Run    Job 625819e4:  redun_notebook.add(a=82, b=83) on default
[redun] Run    Job fab37b81:  redun_notebook.add(a=85, b=86) on default
[redun] Run    Job 8ce0aaa4:  redun_notebook.add(a=88, b=89) on default
[redun] Run    Job f27e678c:  redun_notebook.add(a=91, b=92) on default
[redun] Run    Job cc9cf98a:  redun_notebook.add(a=94, b=95) on default
[redun] Run    Job d6f3d5ad:  redun_notebook.add(a=96, b=97) on default
[redun] Run    Job 0dbf27eb:  redun_notebook.add(a=98, b=99) on 

stop  add(4, 5)
start add(63, 64)stop  add(10, 11)

stop  add(13, 14)start add(66, 67)

start add(69, 70)
stop  add(16, 17)
start add(71, 72)
stop  add(19, 20)
start add(73, 74)stop  add(21, 22)

start add(76, 77)
stop  add(23, 24)
start add(79, 80)
stop  add(26, 27)
start add(82, 83)
stop  add(29, 30)
start add(85, 86)stop  add(32, 33)

start add(88, 89)
stop  add(35, 36)
start add(91, 92)
stop  add(38, 39)stop  add(41, 42)

start add(94, 95)start add(96, 97)



[redun] Run    Job 5e8e5937:  redun_notebook.add(a=3, b=9) on default


stop  add(44, 45)
start add(98, 99)
stop  add(46, 47)
start add(0, 3)
stop  add(48, 49)
stop  add(51, 52)start add(6, 15)

start add(3, 9)
stop  add(54, 55)
stop  add(57, 58)
stop  add(60, 61)


[redun] Run    Job 594ac217:  redun_notebook.add(a=9, b=21) on default


start add(9, 21)


[redun] Run    Job ce18dd96:  redun_notebook.add(a=12, b=27) on default


start add(12, 27)


[redun] Run    Job d896994f:  redun_notebook.add(a=15, b=33) on default


start add(15, 33)


[redun] Run    Job 50d08b3e:  redun_notebook.add(a=18, b=39) on default
[redun] Run    Job 6c2eec70:  redun_notebook.add(a=43, b=47) on default
[redun] Run    Job d39a679b:  redun_notebook.add(a=25, b=53) on default
[redun] Run    Job 8e7de8f4:  redun_notebook.add(a=28, b=59) on default
[redun] Run    Job 37732602:  redun_notebook.add(a=31, b=65) on default
[redun] Run    Job e7abfddb:  redun_notebook.add(a=34, b=71) on default
[redun] Run    Job e74ec112:  redun_notebook.add(a=40, b=83) on default
[redun] Run    Job 19f88f56:  redun_notebook.add(a=37, b=77) on default
[redun] Run    Job 5fe39a33:  redun_notebook.add(a=43, b=89) on default
[redun] Run    Job 8fcc9395:  redun_notebook.add(a=93, b=97) on default
[redun] Run    Job f1283005:  redun_notebook.add(a=50, b=103) on default
[redun] Run    Job 32418b5d:  redun_notebook.add(a=53, b=109) on default
[redun] Run    Job 3f32adc6:  redun_notebook.add(a=56, b=115) on default
[redun] Run    Job 48f7d0e5:  redun_notebook.add(a=59, b=121)

stop  add(63, 64)
start add(18, 39)stop  add(66, 67)stop  add(69, 70)


start add(43, 47)
start add(25, 53)stop  add(71, 72)

start add(28, 59)
stop  add(76, 77)stop  add(73, 74)

start add(31, 65)start add(34, 71)

stop  add(79, 80)
start add(40, 83)
stop  add(82, 83)
start add(37, 77)
stop  add(85, 86)
start add(43, 89)
stop  add(91, 92)stop  add(88, 89)

start add(93, 97)start add(50, 103)

stop  add(94, 95)stop  add(96, 97)
start add(53, 109)

start add(56, 115)
stop  add(98, 99)
start add(59, 121)
stop  add(0, 3)
stop  add(6, 15)
stop  add(3, 9)


[redun] Run    Job dc48e686:  redun_notebook.add(a=62, b=127) on default


start add(62, 127)
stop  add(9, 21)


[redun] Run    Job 06e19316:  redun_notebook.add(a=65, b=133) on default


start add(65, 133)
stop  add(12, 27)
stop  add(15, 33)


[redun] Run    Job b050f450:  redun_notebook.add(a=68, b=139) on default


start add(68, 139)


[redun] Run    Job b1db9d22:  redun_notebook.add(a=75, b=153) on default


start add(75, 153)


[redun] Run    Job 62fe935a:  redun_notebook.add(a=143, b=147) on default


start add(143, 147)


[redun] Run    Job e954d8a2:  redun_notebook.add(a=78, b=159) on default


start add(78, 159)


[redun] Run    Job edbf8ed7:  redun_notebook.add(a=81, b=165) on default
[redun] Run    Job b884435b:  redun_notebook.add(a=84, b=171) on default
[redun] Run    Job cf7c9351:  redun_notebook.add(a=90, b=183) on default
[redun] Run    Job 07ff42ab:  redun_notebook.add(a=87, b=177) on default
[redun] Run    Job 9f256a50:  redun_notebook.add(a=93, b=189) on default
[redun] Run    Job 30f4bca4:  redun_notebook.add(a=193, b=197) on default
[redun] Run    Job 7474c186:  redun_notebook.add(a=3, b=12) on default
[redun] Run    Job 8f68648f:  redun_notebook.add(a=21, b=30) on default
[redun] Run    Job e3088685:  redun_notebook.add(a=39, b=48) on default


stop  add(18, 39)
stop  add(43, 47)start add(81, 165)
start add(84, 171)
stop  add(28, 59)stop  add(25, 53)


start add(90, 183)start add(87, 177)

stop  add(31, 65)
start add(93, 189)stop  add(34, 71)

start add(193, 197)
stop  add(40, 83)
start add(3, 12)
stop  add(37, 77)
start add(21, 30)
stop  add(43, 89)
start add(39, 48)
stop  add(50, 103)stop  add(93, 97)

stop  add(53, 109)
stop  add(56, 115)
stop  add(59, 121)
stop  add(62, 127)
stop  add(65, 133)


[redun] Run    Job c8553b58:  redun_notebook.add(a=57, b=90) on default


start add(57, 90)


[redun] Run    Job bb07dea6:  redun_notebook.add(a=78, b=87) on default


start add(78, 87)


[redun] Run    Job 9a01e00f:  redun_notebook.add(a=96, b=105) on default


start add(96, 105)
stop  add(68, 139)


[redun] Run    Job 0489de2c:  redun_notebook.add(a=114, b=123) on default


start add(114, 123)


[redun] Run    Job 06cf4362:  redun_notebook.add(a=132, b=190) on default


start add(132, 190)
stop  add(75, 153)


[redun] Run    Job 0fbf0069:  redun_notebook.add(a=153, b=162) on default


start add(153, 162)
stop  add(143, 147)


[redun] Run    Job 78a0c74a:  redun_notebook.add(a=171, b=180) on default


start add(171, 180)
stop  add(78, 159)


[redun] Run    Job 8b52f21e:  redun_notebook.add(a=189, b=198) on default


start add(189, 198)


[redun] Run    Job 90c83e1c:  redun_notebook.add(a=207, b=290) on default


start add(207, 290)


[redun] Run    Job 78fe19ce:  redun_notebook.add(a=228, b=237) on default


start add(228, 237)
stop  add(81, 165)stop  add(84, 171)

stop  add(90, 183)stop  add(87, 177)

stop  add(93, 189)
stop  add(193, 197)
stop  add(3, 12)
stop  add(21, 30)
stop  add(39, 48)


[redun] Run    Job cccd9ec6:  redun_notebook.add(a=246, b=255) on default


start add(246, 255)


[redun] Run    Job 90bb4990:  redun_notebook.add(a=264, b=273) on default


start add(264, 273)


[redun] Run    Job 601bcdb3:  redun_notebook.add(a=282, b=390) on default


start add(282, 390)


[redun] Run    Job 45bfb75c:  redun_notebook.add(a=15, b=51) on default


start add(15, 51)
stop  add(57, 90)


[redun] Run    Job 453f52f0:  redun_notebook.add(a=87, b=147) on default


start add(87, 147)
stop  add(78, 87)
stop  add(96, 105)


[redun] Run    Job 439d4333:  redun_notebook.add(a=165, b=201) on default


start add(165, 201)
stop  add(114, 123)
stop  add(132, 190)
stop  add(153, 162)
stop  add(171, 180)


[redun] Run    Job 695a5d41:  redun_notebook.add(a=237, b=322) on default


start add(237, 322)
stop  add(189, 198)


[redun] Run    Job 3b86d22b:  redun_notebook.add(a=315, b=351) on default


start add(315, 351)
stop  add(207, 290)
stop  add(228, 237)


[redun] Run    Job dc96ab5a:  redun_notebook.add(a=387, b=497) on default


start add(387, 497)
stop  add(246, 255)
stop  add(264, 273)


[redun] Run    Job b53adb1d:  redun_notebook.add(a=465, b=501) on default


start add(465, 501)
stop  add(282, 390)
stop  add(15, 51)
stop  add(87, 147)


[redun] Run    Job 892d07f3:  redun_notebook.add(a=537, b=672) on default


start add(537, 672)


[redun] Run    Job 191bc338:  redun_notebook.add(a=66, b=234) on default


start add(66, 234)
stop  add(165, 201)
stop  add(237, 322)


[redun] Run    Job 1a58d2e8:  redun_notebook.add(a=366, b=559) on default


start add(366, 559)
stop  add(315, 351)
stop  add(387, 497)


[redun] Run    Job 71db9539:  redun_notebook.add(a=666, b=884) on default


start add(666, 884)
stop  add(465, 501)
stop  add(537, 672)
stop  add(66, 234)


[redun] Run    Job 9dd7fc1b:  redun_notebook.add(a=966, b=1209) on default


start add(966, 1209)
stop  add(366, 559)


[redun] Run    Job 6fd8b5fd:  redun_notebook.add(a=300, b=925) on default


start add(300, 925)
stop  add(666, 884)
stop  add(966, 1209)


[redun] Run    Job af636817:  redun_notebook.add(a=1550, b=2175) on default


start add(1550, 2175)
stop  add(300, 925)
stop  add(1550, 2175)


[redun] Run    Job ab2ed3bf:  redun_notebook.add(a=1225, b=3725) on default


start add(1225, 3725)
stop  add(1225, 3725)


[redun] 
[redun] | JOB STATUS 2022/04/27 16:42:19
[redun] | TASK                    PENDING RUNNING  FAILED  CACHED    DONE   TOTAL
[redun] | 
[redun] | ALL                           0       0       0       2     304     306
[redun] | redun.root_task               0       0       0       0       1       1
[redun] | redun_notebook.add            0       0       0       2     103     105
[redun] | redun_notebook.add4           0       0       0       0       1       1
[redun] | redun_notebook.add_many       0       0       0       0     199     199
[redun] 
[redun] Execution duration: 11.28 seconds


4950

In [20]:
# Let's suppress the scheduler's logging output.

import logging
scheduler.logger.setLevel(logging.ERROR)

In [21]:
# Let's try a scatter-gather pipeline.

# Scatter (aka map).
init_values = list(range(10))
values = [add(i, 1) for i in init_values]

# Gather (aka reduce).
total = add_many(values)
scheduler.run(total)

start add(0, 1)
start add(1, 1)
start add(2, 1)
start add(3, 1)
start add(4, 1)
start add(5, 1)
start add(6, 1)
start add(7, 1)
start add(8, 1)
start add(9, 1)
stop  add(0, 1)
stop  add(2, 1)stop  add(1, 1)

stop  add(3, 1)
stop  add(4, 1)
stop  add(5, 1)
stop  add(6, 1)
stop  add(7, 1)
stop  add(8, 1)
stop  add(9, 1)
start add(6, 7)
start add(9, 10)
stop  add(6, 7)
stop  add(9, 10)
start add(8, 19)
stop  add(8, 19)
start add(13, 27)
stop  add(13, 27)
start add(15, 40)
stop  add(15, 40)


55

In [22]:
# Notice `values` was actually a list of Expressions.
values

[TaskExpression('redun_notebook.add', (0, 1), {}),
 TaskExpression('redun_notebook.add', (1, 1), {}),
 TaskExpression('redun_notebook.add', (2, 1), {}),
 TaskExpression('redun_notebook.add', (3, 1), {}),
 TaskExpression('redun_notebook.add', (4, 1), {}),
 TaskExpression('redun_notebook.add', (5, 1), {}),
 TaskExpression('redun_notebook.add', (6, 1), {}),
 TaskExpression('redun_notebook.add', (7, 1), {}),
 TaskExpression('redun_notebook.add', (8, 1), {}),
 TaskExpression('redun_notebook.add', (9, 1), {})]

redun's primary evaluation rule is:

```py
eval(TaskExpression(func, arg1, arg2, ...)) => eval(func(eval(arg1), eval(arg2), ...))
```

In addition, the redun scheduler recurses into "nested values", looking for expressions to evaluate.
That is achieved using additional evaluation rules:

```py
# Evaluate list
eval([x for x in xs]) => [eval(x) for x in xs]

# Evaluate dict
eval({k: v for k, v in dct.items()}) => {eval(k): eval(v) for k, v in dct.items()}

# Evaluate set
eval({x for x in xs}) => {eval(x) for x in xs}

# Evaluate tuple
eval(tuple(x for x in xs)) => tuple(eval(x) for x in xs)

# Evaluate namedtuple
eval(MyNamedTuple(x for x in xs)) => MyNamedTuple(eval(x) for x in xs)
```

In [23]:
# Let's tryout some of these evaluation rules.

In [24]:
expr = {"first": add(1, 2), "second": add(3, 4)}
print(expr)
scheduler.run(expr)

{'first': TaskExpression('redun_notebook.add', (1, 2), {}), 'second': TaskExpression('redun_notebook.add', (3, 4), {})}


{'first': 3, 'second': 7}

In [25]:
expr = [
    {
        "id": 1,
        "price": add(1, 2),
    },
    {
        "id": 2,
        "price": add(3, 4),
    }
]
print(expr)
scheduler.run(expr)

[{'id': 1, 'price': TaskExpression('redun_notebook.add', (1, 2), {})}, {'id': 2, 'price': TaskExpression('redun_notebook.add', (3, 4), {})}]


[{'id': 1, 'price': 3}, {'id': 2, 'price': 7}]

In [26]:
# Elements of a list do not need to be independent.
# For example, here we are doing a cumulative summation where each entry depends on the previous.

values = list(range(10))
cumsum = [0]
for value in values:
    cumsum.append(add(cumsum[-1], value))

print(cumsum)
scheduler.run(cumsum)

[0, TaskExpression('redun_notebook.add', (0, 0), {}), TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (0, 0), {}), 1), {}), TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (0, 0), {}), 1), {}), 2), {}), TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (0, 0), {}), 1), {}), 2), {}), 3), {}), TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (0, 0), {}), 1), {}), 2), {}), 3), {}), 4), {}), TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (TaskExpression('redun_notebook.add', (0, 0), {}), 1), {}), 2), {}), 3), {}), 4), {}), 

[0, 0, 1, 3, 6, 10, 15, 21, 28, 36, 45]

In [27]:
# Let's implement a task with two outputs. Just use a container type, such as dict, list, namedtuple, etc.
# redun has no need for special syntax for multiple outputs.

@task()
def intdiv(numerator, denominator):
    quotient, remainder = divmod(numerator, denominator)
    return {
        "quotient": quotient,
        "remainder": remainder,
    }

In [28]:
# Key and attribute access can be done lazily.
# This allows us to route just some of the output to another task.
intdiv(12, 5)["quotient"]

SimpleExpression('getitem', (TaskExpression('redun_notebook.intdiv', (12, 5), {}), 'quotient'), {})

In [29]:
# Here we route the two outputs to different downstream computations.
result = intdiv(12, 5)
x = add(result["quotient"], 1)
y = add(result["remainder"], 2)
scheduler.run(add(x, y))

start add(2, 2)
stop  add(2, 2)


7

In [33]:
class User:
    def __init__(self, name, friend=None):
        self.name = name
        self.friend = friend
    
    def get_friend(self):
        return self.friend

@task()
def returns_alot():
    return User("Alice", friend=User("Bob"))

# Many operations can be chained lazily. For example, attribute access and function calls.
expr = returns_alot().get_friend().name
expr

SimpleExpression('getattr', (SimpleExpression('call', (SimpleExpression('getattr', (TaskExpression('redun_notebook.returns_alot', (), {}), 'get_friend'), {}), (), {}), {}), 'name'), {})

In [34]:
scheduler.run(expr)

'Bob'

In [35]:
# Most operators can be used lazily.
expr = (add(1, 2) + 1) * add(1, 1)
print(expr)
scheduler.run(expr)

SimpleExpression('mul', (SimpleExpression('add', (TaskExpression('redun_notebook.add', (1, 2), {}), 1), {}), TaskExpression('redun_notebook.add', (1, 1), {})), {})


8

In [36]:
# Tasks are first-class values and can be used as arguments and return values.
# Let's create a higher-order task that performs divide and conquer generically.

@task()
def divide_and_conquer(a_task, values, init=0):
    if len(values) == 0:
        return init
    elif len(values) == 1:
        return values[0]
    else:
        k = len(values) // 2
        sum1 = divide_and_conquer(a_task, values[:k])
        sum2 = divide_and_conquer(a_task, values[k:])
        return a_task(sum1, sum2)

In [37]:
scheduler.run(divide_and_conquer(add, list(range(10))))

start add(8, 9)
start add(2, 7)
stop  add(8, 9)
stop  add(2, 7)
start add(7, 17)
start add(1, 9)
stop  add(7, 17)
stop  add(1, 9)
start add(11, 24)
stop  add(11, 24)
start add(10, 35)
stop  add(10, 35)


45

In [124]:
# Now we can apply our divide and conquer to other "reducer" tasks.

In [38]:
@task()
def mult(a, b):
    return a * b

In [39]:
scheduler.run(divide_and_conquer(mult, list(range(1, 10))))

362880

In [40]:
@task()
def pair(a, b):
    return (a, b)

In [43]:
scheduler.run(divide_and_conquer(pair, list(range(10))))

(((0, 1), (2, (3, 4))), ((5, 6), (7, (8, 9))))

In [44]:
# Tasks also support partial application.
inc = add.partial(1)
print(inc)
print(inc(10))
scheduler.run(inc(10))

PartialTask(fullname=redun_notebook.add, hash=6c7d07ac, args=(1,), kwargs={})
TaskExpression('redun_notebook.add', (1, 10), {})
start add(1, 10)
stop  add(1, 10)


11

In [49]:
# We can use task options to modify how tasks are run.
# For example, we can turn off caching on a per task-basis.
@task(cache=False)
def double(x):
    print("Doubling", x)
    return x * 2

In [50]:
# Notice we see the "Doubling" print statement occurs twice, so caching is disabled.
print(scheduler.run(double(1)))
print(scheduler.run(double(1)))

Doubling 1
2
Doubling 1
2


In [51]:
# We'll see in later examples, that when running tasks on a remote cluster, we can 
# specify options such as `memory`, `cpus`, and `gpus`.
# We can also override task options at call time.
# With caching, we won't see the "Doubling" print statements any more.
print(scheduler.run(double.options(cache=True)(1)))
print(scheduler.run(double.options(cache=True)(1)))

2
2


In [133]:
# Try your own expressions!