# No input blocking

This notebook is going to detail a slightly technical issue that is easy to encounter when first learning Deltaflow. We shall give an example of this issue in action, explain the cause behind it and then show how to fix it.

The imports needed are given below.

In [1]:
import logging
import time

from deltalanguage.data_types import NoMessage
from deltalanguage.runtime import DeltaPySimulator, DeltaRuntimeExit
from deltalanguage.wiring import (DeltaBlock,
                                  DeltaGraph,
                                  DeltaMethodBlock,
                                  Interactive)

## A simple counter

Let's say that you want to have a counter node in your Deltaflow. This node will simply increment a number each time it is called.

We can implement this using a `DeltaMethodBlock` such as the one below.

In [2]:
class Counter:

    def __init__(self, lim):
        self.x = 0
        self.lim = lim

    @DeltaMethodBlock()
    def count(self) -> int:
        self.x += 1
        if self.x > self.lim:
            raise DeltaRuntimeExit
        return self.x

Note that there is a limit in this counter which, when reached, will raise a `DeltaRuntimeExit`. We shall see why this is needed in a minute. But for now, what is important is that we have a `DeltaMethodBlock` which, each time it is called, will increment an internal counter and send the current value of said counter on.

We are also going to implement a `DeltaBlock` which, upon receiving said message, prints it. This block will also have a limit which, when reached, will raise a `DeltaRuntimeExit`.

In [3]:
@DeltaBlock()
def print_to_lim(n: int, lim: int) -> NoMessage:
    print(n)
    if n >= lim:
        raise DeltaRuntimeExit

Now, we wire up our graph and run it:

In [4]:
cnt = Counter(lim=100)

with DeltaGraph() as graph:
    print_to_lim(cnt.count(), lim=100)

rt = DeltaPySimulator(graph)
rt.run()

1
2
3
4
5
6


There is a very good chance that you did not see the print node print 100 to the display. However, if we look at the Counter object itself we will see that it did reach its limit:

In [5]:
print("Final count", cnt.x)

Final count 101


So why is there a difference here? Why did one node raise an exit signal so significantly before the other?

To answer this, we need to understand a little bit about how the Deltaflow Python runtime is implemented, and how multithreading in Python works.

## The need for blocking

Each node in a Deltaflow graph is implemented as a thread in Python. Threads are a form of concurrent programming: each thread is a program, and these programs all run concurrently on a single CPU or processor. To avoid multiple threads trying to use the same resource at the same time, threads must acquire locks, which specify that only the given thread can use a resource until they release it. For a broader introduction to multithreading in general, see the [Wikipedia article](https://en.wikipedia.org/wiki/Multithreading_(computer_architecture)).

In Python, and particularly CPython, there is an extra caveat to multithreading, which is that only one thread is ever run at a given time. This is because of the Global Interpreter Lock (GIL), which is a lock implemented over all objects, to prevent multiple threads from trying to modify the same object at the same time. For more information on the GIL, see [this article on the Python Wiki](https://wiki.python.org/moin/GlobalInterpreterLock), or [this blog post](https://hackernoon.com/concurrent-programming-in-python-is-not-what-you-think-it-is-b6439c3f3e6a).

To know when is a good time for a thread to release the GIL, Python runs a thread until that thread is blocked, meaning that there is some time delay before it can run its next step. At this point, the GIL is released, another thread acquires it and that thread is run until it is blocked. In DeltaFlow, by default the times when a node is blocked is when the node is waiting for some input.

And this is where our problem lies. In the above example, `Counter.count` has no input, so it is (almost) never blocked and Python therefore allows it to run indefinitely. This means that the vast majority of computation time is spent on that node rather than the `print_to_lim` node. This is why we need the counter node to raise a `DeltaRuntimeExit` itself: if it didn't, then there is a good probability that the program would have taken significantly longer, or possibly even never terminated, as so much of the computation time is spent on the counter node.

## Time for sleep

To resolve cases like this, we need to implement the node in a way such that it is blocked more often. Thankfully, there is an easy way to do this which is already implemented in the Python standard library: `time.sleep`. This is a function which blocks a thread for a given amount of time, specified in seconds. While this thread is blocked by `time.sleep` the GIL is released and another thread can acquire it to run.

Below we implement a new counter `DeltaMethodBlock`. This time, we use `time.sleep` to block the node for 1ms with each evaluation. Note that this time we are not going to worry about including a limit on the counter node, as the printer node will be able to terminate in a reasonable time.

In [6]:
class CounterSleep:

    def __init__(self):
        self.x = 0

    @DeltaMethodBlock()
    def count(self) -> int:
        self.x += 1
        time.sleep(0.1)
        return self.x

We are now going to wire up and run a graph with this new node the same way as before.

In [7]:
cnt = CounterSleep()

with DeltaGraph() as graph:
    print_to_lim(cnt.count(), lim=100)

rt = DeltaPySimulator(graph)
rt.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100


And thus we are able to run successfully.

It is worth noting that this can happen for other nodes as well. Take the interactive node below, which receives an input `lim` at the start of its computation but then goes into a `while True` loop in which no blocking input is received.

In [8]:
@Interactive(in_params={"lim": int}, out_type=int)
def counter_interactive(node):
    x = 0
    lim = node.receive("lim")
    while True:
        x += 1
        if x > lim:
            raise DeltaRuntimeExit
        node.send(x)

If we wire this node up as we did with the previous examples, we will have the same issues as we had in our first example.

In [9]:
with DeltaGraph() as graph:
    print_to_lim(counter_interactive.call(lim=100), lim=100)

rt = DeltaPySimulator(graph)
rt.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14


To resolve this, we need to use `time.sleep` again to allow the interactive node to block.

In [10]:
@Interactive(in_params={}, out_type=int)
def counter_interactive_sleep(node):
    x = 0
    while True:
        x += 1
        node.send(x)
        time.sleep(0.1)

with DeltaGraph() as graph:
    print_to_lim(counter_interactive_sleep.call(), lim=100)

rt = DeltaPySimulator(graph)
rt.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100


Finally, it is worth noting that even if a node receives inputs on _every_ evaluation, we might still need to use `time.sleep` to block a node if none of its inputs are blocking. For example, if every input to a node is constant, or every input is optional, then none of these inputs will be blocking and therefore we need to block the node ourselves.