## Lecture 21: Ray Actors

Actors are stateful workers in Ray. The Ray docoumentation refers to these as "service centers".
We will use actors to reimplement the Game of Life program (that we did in Dask). This will be a BSP-like program with two (implicit) barriers. They are implicit becaue there is no class/object that is the barrier. The barrier is achieved by waiting on all functions to complete in all Ray actors before submitting the next functions.  The pseuocode is something like:
  1. Startup
        * create a 2x2 array of remote actors and intialize local grids (8x8) plus overlapping ghost cells on each actor.
            * place a glider in the (0,0) actor
        * having created actors, initialize the neighbors (ray oids) in each actor.
  2. Launch remote asynchronous functions on each actor to update the local gird
  3. Await computation to complete (barrier)
  4. Update ghost cells in neighbors (corners and sides)
  5. Await updates to complete (barrier)
  6. Iterate at 2. 

### Actors: The Code

An actor is associated with a decorated `@ray.remote` class. When you define an actor, you get a runtime handle (object identifier) to the actor. That handle provides access to all methods and state within the class. 

All of the methods within the class are `remote()`. You should think of this as
  * the actor is assigned to a compute resource 
  * function calls package arguments (and other needed state) and ships it to the remote resource

When working with actors, you should try to have your remote calls be as minimal as possible and keep state within the actor at all times. I.e., try not to send a lot of state across actors.
  
So here is the class.  NB--I implemented this in PyCharm and moved it over. It's too complicated a piece of code for Jupyter. I need a lot of debugging support.


#### Define the actor class

In [1]:
import ray
import numpy as np

@ray.remote
class RoLSubGrid(object):
    """
    class RoLSubGrid: Rules of life implemented in Ray.

        This class updates local paritions and then pulls data from
        adjacent partitions. It implements the ghost cell patter with
        one cell of overlap.

        The class should be invoked with two barriers per iteration.
            * update all local partitions
            * barrier
            * swap data between partitions
            * barrier
    """
    # Create an actor each with its own data
    def __init__(self, dim, has_glider=False):
        """We actually have this hard wired for 8x8 cells in later functions.
            This needs to be fixed, but won't be ready for class.
        """
        self.local_grid = np.zeros(shape=[dim + 2, dim + 2], dtype=np.uint8)
        if has_glider == True:
            glider = np.array([[0, 1, 0], [0, 0, 1], [1, 1, 1]], dtype=np.uint8)
            self.local_grid[1:glider.shape[0] + 1, 1:glider.shape[1] + 1] = glider

    # neighbors for 2-d GoL gride
    def set_neighbors(self, top, bottom, left, right, tl, tr, bl, br):
        self.tl = tl
        self.bl = bl
        self.tr = tr
        self.br = br
        self.left = left
        self.right = right
        self.top = top
        self.bottom = bottom

    def get_neighbors(self):
        return self.top, self.bottom, self.left, self.right, self.tl, self.bl, self.tr, self.br

    def get_grid(self):
        return self.local_grid

    def step(self):
        """Evaluate the rules of life on a 2-d subarray.
        The array should have an overlap of 1 cell in all dimension
        and on the corner.

        Args:

        Returns:
            outgrid (ndarray): Array updated by rules of life
        """
        # we will receive an array with 1 dimension of padding
        xdim, ydim = self.local_grid.shape

        # output array to keep updates
        outgrid = np.zeros(shape=self.local_grid.shape, dtype=self.local_grid.dtype)

        # update only in center (non-overlapping) regaion
        for x in range(1, xdim - 1):
            for y in range(1, ydim - 1):
                sum = self.local_grid[x - 1, y - 1] + self.local_grid[x, y - 1] + self.local_grid[x + 1, y - 1] + \
                      self.local_grid[x - 1, y] + self.local_grid[x + 1, y] + \
                      self.local_grid[x - 1, y + 1] + self.local_grid[x, y + 1] + self.local_grid[x + 1, y + 1]
                # three neighbors birth
                if (sum == 3):
                    outgrid[x, y] = 1
                # two neighbors no change
                elif (sum == 2):
                    outgrid[x, y] = self.local_grid[x, y]
                    # <2 or >3 death
                else:
                    outgrid[x, y] = 0

        self.local_grid = outgrid

    def cut(self, xl, xh, yl, yh):
        """Return a slice of the local array"""
        return self.local_grid[xl+1:xh+1, yl+1:yh+1]

    def exchange(self):
        """Transfer data from neighbors. Corners and sides."""
        tloid = self.tl.cut.remote(7, 8, 7, 8)  # tl cuts br
        troid = self.tr.cut.remote(7, 8, 0, 1)  # tr cuts bl
        bloid = self.bl.cut.remote(0, 1, 7, 8)  # bl cuts tr
        broid = self.br.cut.remote(0, 1, 0, 1)  # br cuts tl
        toid = self.top.cut.remote(7, 8, 0, 8)  # top
        boid = self.bottom.cut.remote(0, 1, 0, 8)  # bottom
        loid = self.left.cut.remote(0, 8, 7, 8)  # left
        roid = self.right.cut.remote(0, 8, 0, 1)  # right

        self.local_grid[0, 0] = ray.get(tloid)  # tl cuts br
        self.local_grid[0, 9] = ray.get(troid)  # tr cuts bl
        self.local_grid[9, 0] = ray.get(bloid)  # bl cuts tr
        self.local_grid[9, 9] = ray.get(broid)  # br cuts tl
        self.local_grid[0:1, 1:9] = ray.get(toid)  # top
        self.local_grid[9:10, 1:9] = ray.get(boid)  # bottom
        self.local_grid[1:9, 0:1] = ray.get(loid)  # left
        self.local_grid[1:9, 9:10] = ray.get(roid)  # right

### Driver script 

Create and manipulate actors to run the simulation. Let's setup a cluster and define actors.

In [2]:
# script to drive parallel program
ray.init(num_cpus=4, ignore_reinit_error=True)

# 2x2 array of partitions
dim = 2

# ray objects for actors
oids = np.empty([dim,dim], dtype=object)

# this list will be used to implement barriers, i.e. wait for the completion of a set of jobs
roids = []

# create grids
for ix in range(dim):
    for iy in range(dim):
        if ix == 0 and iy == 0:
            oids[ix, iy] = RoLSubGrid.remote(8, True)
        else:
            oids[ix, iy] = RoLSubGrid.remote(8, False)
        print(f"({ix},{iy}) {oids[ix,iy]}")

# set neighbors T B L R TL TR BL BR
for ix in range(dim):
    for iy in range(dim):
        roids.append(oids[ix, iy].set_neighbors.remote(
            oids[(ix - 1) % dim, iy], oids[(ix + 1) % dim, iy], oids[ix, (iy - 1) % dim], oids[ix, (iy + 1) % dim],
            oids[(ix - 1) % dim, (iy - 1) % dim], oids[(ix - 1) % dim, (iy + 1) % dim],
            oids[(ix + 1) % dim, (iy - 1) % dim], oids[(ix + 1) % dim, (iy + 1) % dim]))

# await the initialization of all neighbors
# this is a barrier.
[ray.get(oid) for oid in roids]

# let's look at the neighbor list
for ix in range(dim):
    for iy in range(dim):
        print(ray.get(oids[ix,iy].get_neighbors.remote()))

(0,0) Actor(RoLSubGrid, 341a226ca1ad800aa82b09e901000000)
(0,1) Actor(RoLSubGrid, 21418dcd805f6792e9e65efd01000000)
(1,0) Actor(RoLSubGrid, 3e9a355f8a404c699bf3fd4f01000000)
(1,1) Actor(RoLSubGrid, 7a42c1b35f9778c799116ca801000000)


RayActorError: The actor died because of an error raised in its creation task, [36mray::RoLSubGrid.__init__()[39m (pid=33737, ip=127.0.0.1)
ray._private.memory_monitor.RayOutOfMemoryError: More than 95% of the memory on node randals-mbp.win.ad.jhu.edu is used (15.21 / 16.0 GB). The top 10 memory consumers are:

PID	MEM	COMMAND
18573	0.58GiB	/Applications/PyCharm CE.app/Contents/MacOS/pycharm
390	0.33GiB	/Applications/Google Chrome.app/Contents/MacOS/Google Chrome
33694	0.24GiB	/Applications/Google Chrome.app/Contents/Frameworks/Google Chrome Framework.framework/Versions/95.0.
24444	0.22GiB	/Applications/Google Chrome.app/Contents/Frameworks/Google Chrome Framework.framework/Versions/95.0.
33371	0.2GiB	/Applications/Google Chrome.app/Contents/Frameworks/Google Chrome Framework.framework/Versions/95.0.
472	0.18GiB	/Applications/Google Chrome.app/Contents/Frameworks/Google Chrome Framework.framework/Versions/95.0.
33714	0.15GiB	/Applications/Google Chrome.app/Contents/Frameworks/Google Chrome Framework.framework/Versions/95.0.
30984	0.14GiB	/Applications/Google Chrome.app/Contents/Frameworks/Google Chrome Framework.framework/Versions/95.0.
33719	0.12GiB	/opt/anaconda3/envs/parallel.python/bin/python -m ipykernel_launcher -f /Users/randal/Library/Jupyte
33726	0.11GiB	/Applications/Google Chrome.app/Contents/Frameworks/Google Chrome Framework.framework/Versions/95.0.

In addition, up to 0.0 GiB of shared memory is currently being used by the Ray object store.
---
--- Tip: Use the `ray memory` command to list active objects in the cluster.
--- To disable OOM exceptions, set RAY_DISABLE_MEMORY_MONITOR=1.
---

[2m[36m(pid=33736)[0m 2021-11-15 16:17:27,018	ERROR worker.py:425 -- Exception raised in creation task: The actor died because of an error raised in its creation task, [36mray::RoLSubGrid.__init__()[39m (pid=33736, ip=127.0.0.1)
[2m[36m(pid=33736)[0m ray._private.memory_monitor.RayOutOfMemoryError: More than 95% of the memory on node randals-mbp.win.ad.jhu.edu is used (15.21 / 16.0 GB). The top 10 memory consumers are:
[2m[36m(pid=33736)[0m 
[2m[36m(pid=33736)[0m PID	MEM	COMMAND
[2m[36m(pid=33736)[0m 18573	0.58GiB	/Applications/PyCharm CE.app/Contents/MacOS/pycharm
[2m[36m(pid=33736)[0m 390	0.33GiB	/Applications/Google Chrome.app/Contents/MacOS/Google Chrome
[2m[36m(pid=33736)[0m 33694	0.24GiB	/Applications/Google Chrome.app/Contents/Frameworks/Google Chrome Framework.framework/Versions/95.0.
[2m[36m(pid=33736)[0m 24444	0.22GiB	/Applications/Google Chrome.app/Contents/Frameworks/Google Chrome Framework.framework/Versions/95.0.
[2m[36m(pid=33736)[0m 33371	0.

#### Comments on the program setup

Before executions we have:
* Ray actors: one for each 8x8 data partition
* Local grids (data partitions) in each actor, with a glider in the 0,0 partition
* Ghost cells in the partition and a topology to link ghost cells to other ndoes
    
Object identifiers: Everything is an `oid` in ray
* remote functions that are running asnychronously
* actors that have been created, but are idle waiting for remote invocation of functions.
* data items in the key value store

Actors are stateful, service centers:
* Actors are created (remotely) and wait for functions to be invoked on them
* Functions can alter the state of the actor
* Subsequent functions see the updated state

Parallelism and completion:
* `.remote()` launches the computation and returns an oid
* `.wait()` awaits the completion of the asynchronous function
* `.get()` awaits completion and copies the return value 
    * recall that values are stored in the object store
    * gets are repeatable (we learned this last class)
    
Anti-pattern (things not to do):
* `ray.get(actor.function.remote())` makes a synchronous call (not parallel)
* I do this only

The Ray program (the driver script) is the coordinator of remote computation. The way to get parallelism is to call `remote()` on many actors. And, then `get()`.

In [None]:
# 64 iterations will return the glider home. But we print at 0, so 68 is needed.
for it in range(68):
    
    # this is printing only. not part of the program 
    if (it % 4 == 0):
        # Glider only lives in top left and bottom right really
        print("Step")
        print(ray.get(oids[0, 0].get_grid.remote()))
        print(ray.get(oids[1, 1].get_grid.remote()))

    # asychronously update the local grids
    roids = []
    for ix in range(dim):
        for iy in range(dim):
            roids.append(oids[ix, iy].step.remote())
    # first barrier awaiting local updates
    [ray.get(oid) for oid in roids]
    
    # exchange data among sides and corners.
    roids = []
    for ix in range(dim):
        for iy in range(dim):
            #print(f"Exchange {ix,iy}")
            ray.get(oids[ix,iy].exchange.remote())
    # second barrier awaiting exchanges
    [ray.get(oid) for oid in roids]


#### Comments on the Driver

Barriers:
* `ray.get()` awaits completion
* calling get on all oids implements a barrier
* __CAUTION__ do not use `ray.wait()` it returns ready and not ready jobs
    * it is useful for control flow in complex programs

BSP:
* This is not strictly BSP, because we have a barrier before communication.
* This type of double barrier is typical of many algorithms, e.g. most implementations of parallel k-means does this. 
* This could be turned into a full BSP algorithm with queues or asynchronous messaging
    * finish computing
    * push udpates into queues
    * get updates from queues
* In fact, the entire last barrier can be removed by making the program depend entirely on point-to-point data exchanges
    * That is how the MPI implementation of this program works.