# Parallelism in Stride

In practice, derivatives of the optimisation problem are not calculated one data point at a time, but in batches, and the result is averaged to obtain an estimate of the gradient for that iteration. 

Because, in most cases, each of these data points is fully independent, this can be exploited so that they are calculated in parallel. For some simple problems, this can be done within a single workstation. However, in most practical problems, the compute and memory demands require that these computations are mapped across different interconnected sets of hardware, such as multi-GPU systems, CPU clusters, or the cloud.

To facilitate the expression of parallelism in Stride in an intuitive manner, we have also developed Mosaic.

## Tessera

Mosaic is an actor-based parallelisation library based on asynchronous, zero-copy message passing through ZeroMQ sockets. Actors in Mosaic are called tessera, and can be generated by decorating any Python class using ``@mosaic.tessera``.

Let's start by creating some tessera objects.

In [1]:
import time
import mosaic
from mosaic import tessera

@tessera
class Solver1:
    def __init__(self, data):
        self.data = data

    def solve(self, data):
        print('Solve 1')
        self.data = self.data + data

        time.sleep(10)
        print('Done 1')

        return self.data

    def solve_more(self):
        print('Solve More 1')
        time.sleep(5)
        print('Done More 1')


@tessera
class Solver2:
    def __init__(self):
        self.data = 0

    def solve(self, data):
        print('Solve 2')
        self.data = data*2

        time.sleep(10)
        print('Done 2')

        return self.data

    def solve_more(self):
        print('Solve More 2')
        time.sleep(5)
        print('Done More 2')

Before we start working with the tessera classes that we have just created, we need to start the mosaic runtime that will manage them. 

There are several ways to start the runtime but, because we are working with a Jupyter notebook, we want to start it in interactive mode.

In [2]:
await mosaic.interactive('on', num_workers=2)
# to start the runtime with two workers

Listening at <CommsManager object at 139975679367504, uid=monitor, address=CC2416, port=3017, state=connected>HEAD            Listening at <CommsManager object at 139977950808656, uid=head, address=CC2416, port=3037, state=connected>
MONITOR         Listening at <NODE:0 | WORKER:0-2>
WORKER:0:0      Solve 1
WORKER:0:0      Done 1
WORKER:0:1      Solve 2
WORKER:0:1      Done 2
WORKER:0:0      Solve 1
WORKER:0:1      Solve 2
WORKER:0:0      Done 1
WORKER:0:1      Done 2
WORKER:0:0      Solve More 1
WORKER:0:0      Done More 1
WORKER:0:1      Solve More 2
WORKER:0:1      Done More 2


Now, we can start using mosaic's runtime to execute our parallel workload. To do that, let's instantiate some of our tessera by calling the remote method that is now available for each of our classes.

In [3]:
import numpy as np

array = np.ones((1024, 1024, 1), dtype=np.float32)

# These objects will be created remotely
solver_1 = Solver1.remote(array)
solver_2 = Solver2.remote()

When instantiating a class that has been decorated, Mosaic will start a remote instance of that class in one of the workers. At this point, remote method calls to that tessera can be executed and the attributes of that remote object can be accessed.

In [4]:
solver_1

<_TesseraProxy object at 139977283407440, uid=tess-solver1-e89a6b0e59cf45d792fec5f45bb45314, runtime=None, state=pending>

As you can see, the result of calling `remote` is not an instance of the class, but a proxy object.

The mosaic runtime will instantiate `tessera` classes within one of the available workers, and a proxy object will be given to us that points to the remote object. This proxy allows us to access the attributes of the remote object:

In [5]:
# Check the current value of the attribute
np.sum(await solver_1.data)

1048576.0

and to call methods of the remote object as if they were local objects:

In [6]:
# These will run in parallel
# The calls will return immediately by creating a remote task
task_1 = solver_1.solve(array)
task_2 = solver_2.solve(array)
task_1

<TaskArrayProxy object at 139975676373648, uid=task-solver1-solve-89b722d33b5440d1ac0e6584058e5038, runtime=worker:0:0, state=pending>

Unlike a local method call, calling a remote method will return immediately and will not wait until the work is done. Instead, it will generate a task that the mosaic runtime will pass to the worker who owns the `tessera`, who will queue it for execution.

On our side of the code, the call to the remote method will generate a task proxy that points to its remote counterpart.

Method calls to different `tessera` are executed in parallel, whereas method calls to a specific `tessera` instance are guaranteed to be executed in the order in which they were called.

We can wait for the remote calls to finish by awaiting the proxies:

In [7]:
# Wait until the remote tasks are finished
await task_1
await task_2

<TaskArrayProxy object at 139977550894736, uid=task-solver2-solve-736c1e497119430483609039258e096b, runtime=worker:0:1, state=done>

The return value of the method calls, if any, will not be transferred back to the user code unless we explicitly request it:

In [8]:
# The results of the tasks stay in the remote worker
# until we request it back
result_1 = await task_1.result()
result_2 = await task_2.result()

print(result_1.shape)
print(result_2.shape)

(1024, 1024, 1)
(1024, 1024, 1)


We can also check the new value of the remote attribute:

In [9]:
# Check the current value of the attribute
np.sum(await solver_1.data)

2097152.0

## Execution order

In Mosaic, subsequent method calls to a remote object are guaranteed to be executed in order, but calls to different remote objects are not. However, if there are explicit dependencies between two or more remote method calls, Mosaic will ensure that these are executed in the right order.

In [10]:
# These will wait for each other because
# their results depend on each other
task_1 = solver_1.solve(array)
task_2 = solver_2.solve(task_1)

In this case, we only need to wait for the second task to finish because an implicit dependency exists between the results of the two tasks:

In [11]:
# Wait until the remote tasks are finished
# Now we only need to wait for the second task
await task_2

<TaskArrayProxy object at 139975676461264, uid=task-solver2-solve-3744b801e9e94348af36ed14f5aeb7af, runtime=worker:0:1, state=done>

We can also create explicit dependencies between two tasks to ensure that they are executed in order:

In [12]:
# These will also wait for each other
task_1 = solver_1.solve_more()
task_2 = solver_2.solve_more(task_1.done)

Again, we only need to wait for the second task to finish.

In [13]:
# Wait until the remote tasks are finished
# Now we only need to wait for the second task
await task_2

<TaskArrayProxy object at 139975676303568, uid=task-solver2-solve_more-dbd073a424494c929df2be561c4db81f, runtime=worker:0:1, state=done>

## Warehouse

The warehouse is a centralised storage that can be accessed from any point on the network.

To transfer an object to the warehouse, we can use the function `put`:

In [14]:
obj = dict(a=1, b=2)

runtime = mosaic.runtime()
ref = await runtime.put(obj)

ref

<warehouse object uid=ware-dict-6d181684f42648d6b2c0fb6516ddc8b1>

The returned `ref` contains a reference to the warehouse that can be used to retrieve the object and that can be sent to other workers.

To retrieve an object from the warehouse:

In [15]:
await ref.value()

# or

await runtime.get(ref)

{'a': 1, 'b': 2}

The warehouse stores objects in key-value pairs, with a unique ID assigned to the object when it is `put` into the warehouse. Importantly, calling `put` multiple times with the same object will create multiple entries in the warehouse, and not update an existing entry.

The internal buffer of the warehouse will keep objects in memory until a threshold is exceeded, at which point elements in storage will start to be spilled to disk. By default, this threshold is defined as 25% of the available system memory.

When we `get` an object from the warehouse, a local copy of the object is cached, and subsequent calls to `get` will access this cached copy instead of pulling it from the warehouse every time. We can ensure that an object is distributed to all workers in the network and stored in this cache by publishing it:

In [16]:
await runtime.put(obj, publish=True)

<warehouse object uid=ware-dict-e7e4c916bdb944f4bc34a16654edb4a3>

Objects can also be removed from storage:

In [17]:
await ref.drop()

# or

# await runtime.drop(ref)

Before leaving, we should ensure that we tear down the mosaic runtime.

In [18]:
await mosaic.interactive('off')

## Mosaic structure

The structure of the Mosaic runtime, which can be seen in the following figure, is composed by a series of processing units, that could be located in single, local workstation or distributed across a remote network.

<img src="figures/mosaic_structure.png" width=350 />

The first of such units contains a `monitor` process, a `warehouse` process, and a `head` process. The `monitor` process collects information about the Mosaic network, including occupation rate, resource use and connection state. The `warehouse` process acts like a centralised storage location that is accessible from across the whole network. The `head` process is the place where the main user code is executed. 

In each of the remaining processing units, a `node monitor` and one or more `workers` are allocated. The `node monitor` keeps track of the runtime status of its local processing unit and oversees the life cycle of each of the `workers` in its unit. 

Finally, the `workers` act as containers for tessera actors, whose methods can be executed remotely. All processing units in the Mosaic network are directly interconnected to each other, creating a decentralised communication mesh.