# A Guided Tour of Ray Core: Remote Stateful Classes

[*Remote Classes*](https://docs.ray.io/en/latest/walkthrough.html#remote-classes-actors)
involve using a `@ray.remote` decorator on a class. 

This implements an [*actor*](https://patterns.eecs.berkeley.edu/?page_id=258) pattern, with properties: *stateful*, *message-passing semantics*

Actors are extremely powerful. They allow you to take a Python class and instantiate it as a stateful microservice that can be queried from other actors and tasks and even other Python applications.

When you instantiate a remote Actor, a separate worker process is created as a worker process and becomes an Actor process on the workder node, for the purpose of running methods called on the actor. Other Ray tasks and actors can invoke its methods on that process, mutating its internal state. Actors can also be terminated manually if needed. The examples code below show all these cases.

<img src="../images/ray_worker_actor_1.png" height="30%" width="60%">
<img src="../images/ray_worker_actor_2.png" height="30%" width="60%">

---

First, let's start Ray…

In [1]:
import logging
import time
import ray
import random
from random import randint
import numpy as np

In [2]:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
ray.init()

{'node_ip_address': '127.0.0.1',
 'raylet_ip_address': '127.0.0.1',
 'redis_address': None,
 'object_store_address': '/tmp/ray/session_2022-03-16_15-50-05_688620_57443/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2022-03-16_15-50-05_688620_57443/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2022-03-16_15-50-05_688620_57443',
 'metrics_export_port': 65431,
 'gcs_address': '127.0.0.1:60006',
 'address': '127.0.0.1:60006',
 'node_id': '99a61e2e13f4abf096d4bbee6af4971171e6c5375ade0a13d448e4bc'}

## 3. Remote Class as a Stateful Actor Pattern

To start, we'll define a class and use the decorator: `@ray.remote`

Let's use Python class and convert that to a remote Actor class actor service as a Parameter Server.
This is a common example in machine learning where you may have a central Parameter server updating gradients
from other worker processes computing individual gradients. 

<img src="https://terrytangyuan.github.io/img/inblog/mpi-operator-1.png" width="40%" height="20%">

In [3]:
@ray.remote
class ParameterSever:
    def __init__(self):
        # Initialized our gradients to zero
        self.params = np.zeros(10)

    def get_params(self):
        # Return current gradients
        return self.params

    def update_params(self, grad):
        # Update the gradients 
        self.params -= grad

Define work or task as a function for a remote Worker process. This could be a machine learning task that
computes gradients and sends them to the parameter server

In [4]:
@ray.remote
def worker(ps):
    # Iterate over some epoch
    for i in range(100):
        time.sleep(1.5)  # this could be your task computing gradients
        grad = np.ones(10)
        # update the gradients in the parameter server
        ps.update_params.remote(grad)

Start our Parameter Server actor. This will be scheduled as a process on a remote Ray Worker

In [5]:
param_server = ParameterSever.remote()
param_server

Actor(ParameterSever, a1d3d52739b22c3eecc5588101000000)

Let's get the initial values of the parameter server

In [6]:
print(f"Initial params: {ray.get(param_server.get_params.remote())}")

Initial params: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


### Create Workers Nodes Computing Gradients
Let's create three separate workers as our machine learning tasks that compute gradients.
These will be scheduled as tasks on a Ray cluster.

You can use list comprehension. Quite Pythonic!

If we need more workers to scale, we can always bump them up.

In [7]:
[worker.remote(param_server) for _ in range(3)]

[ObjectRef(c2668a65bda616c1ffffffffffffffffffffffff0100000001000000),
 ObjectRef(32d950ec0ccf9d2affffffffffffffffffffffff0100000001000000),
 ObjectRef(e0dc174c83599034ffffffffffffffffffffffff0100000001000000)]

Now, let's iterate over a loop and query the Parameter Server 
as the workers are running independently and updating the gradients

In [8]:
for _i in range(20):
    print(f"Updated params: {ray.get(param_server.get_params.remote())}")
    time.sleep(1)

Updated params: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Updated params: [-3. -3. -3. -3. -3. -3. -3. -3. -3. -3.]
Updated params: [-6. -6. -6. -6. -6. -6. -6. -6. -6. -6.]
Updated params: [-6. -6. -6. -6. -6. -6. -6. -6. -6. -6.]
Updated params: [-9. -9. -9. -9. -9. -9. -9. -9. -9. -9.]
Updated params: [-12. -12. -12. -12. -12. -12. -12. -12. -12. -12.]
Updated params: [-12. -12. -12. -12. -12. -12. -12. -12. -12. -12.]
Updated params: [-15. -15. -15. -15. -15. -15. -15. -15. -15. -15.]
Updated params: [-18. -18. -18. -18. -18. -18. -18. -18. -18. -18.]
Updated params: [-18. -18. -18. -18. -18. -18. -18. -18. -18. -18.]
Updated params: [-21. -21. -21. -21. -21. -21. -21. -21. -21. -21.]
Updated params: [-24. -24. -24. -24. -24. -24. -24. -24. -24. -24.]
Updated params: [-24. -24. -24. -24. -24. -24. -24. -24. -24. -24.]
Updated params: [-27. -27. -27. -27. -27. -27. -27. -27. -27. -27.]
Updated params: [-30. -30. -30. -30. -30. -30. -30. -30. -30. -30.]
Updated params: [-30. -30. -30. -30. -30

# Tree of Actors Pattern

A common pattern used in Ray libraries [Ray Tune](https://docs.ray.io/en/latest/tune/index.html), [Ray Train](https://docs.ray.io/en/latest/train/train.html), and [RLlib](https://docs.ray.io/en/latest/rllib/index.html) to train models in a parallel or conduct distributed HPO.

In this common pattern, tree of actors, a collection of workers as actors, are managed by a supervisor. For example, you want to train multiple models at the same time, while being able to checkpoint/inspect its state.

<img src="https://docs.ray.io/en/latest/_images/tree-of-actors.svg" width="40%" height="20%">

Let's implement a simple example to illustrate this pattern.

In [9]:
STATES = ["RUNNING", "DONE"]

class Model:

    def __init__(self, m:str):
        self._model = m

    def train(self):
        # do some training work here
        time.sleep(1)

# Factory function to return an instance of a model type
def model_factory(m: str):
    return Model(m)

### Create a Worker Actor

In [10]:
@ray.remote
class Worker(object):
    def __init__(self, m:str):
        # type of a model: lr, cl, or nn
        self._model = m                  
        
    def state(self) -> str:
        return random.choice(STATES)
    # Do the work for this model
    def work(self) -> None:
        model_factory(self._model).train()

### Create Supervisor Actor 

In [11]:
@ray.remote
class Supervisor:
    def __init__(self):
        # Create three Actor Workers, each by its unique model type
        self.workers = [Worker.remote(name) for name in ["lr", "cl", "nn"]]
                        
    def work(self):
        # do the work 
        [w.work.remote() for w in self.workers]
        
    def terminate(self):
        [ray.kill(w) for w in self.workers]
        
    def state(self):
        return ray.get([w.state.remote() for w in self.workers])

Create a Actor instance for supervisor and launch its workers

In [12]:
sup = Supervisor.remote()

# Launch remote actors as workers
sup.work.remote()

ObjectRef(bcb4fef46b376cafe9fcb45911f1848cba1000850100000001000000)

### Look at the Ray Dashboard
http://127.0.0.1:8265

In [13]:
# check their status
while True:
    # Fetch the states of all its workers
    states = ray.get(sup.state.remote())
    print(states)
    # check if all are DONE
    result = all('DONE' == e for e in states)
    if result:
        # Note: Actor processes will be terminated automatically when the initial actor handle goes out of scope in Python. 
        # If we create an actor with actor_handle = ActorClass.remote(), then when actor_handle goes out of scope and is destructed, 
        # the actor process will be terminated. Note that this only applies to the original actor handle created for the actor 
        # and not to subsequent actor handles created by passing the actor handle to other tasks.
        
        # kill supervisors all worker manually, only for illustrtation and demo
        sup.terminate.remote()

        # kill the supervisor manually, only for illustration and demo
        ray.kill(sup)
        break

['RUNNING', 'DONE', 'RUNNING']
['DONE', 'RUNNING', 'RUNNING']
['RUNNING', 'RUNNING', 'RUNNING']
['DONE', 'DONE', 'RUNNING']
['RUNNING', 'DONE', 'RUNNING']
['RUNNING', 'RUNNING', 'DONE']
['DONE', 'RUNNING', 'DONE']
['RUNNING', 'RUNNING', 'RUNNING']
['DONE', 'DONE', 'DONE']


### Passing Actor handles to Ray Tasks

You can pass actor handle instances to remote Ray tasks, which can change its 
state. The `MessageActor` keeps or clears messages, depending on the its method
invoked.

In [14]:
@ray.remote
class MessageActor(object):
    def __init__(self):
        # Keep the state of the messages
        self.messages = []
    
    def add_message(self, message):
        self.messages.append(message)
    
    # reset and clear all messages
    def get_and_clear_messages(self):
        messages = self.messages
        self.messages = []
        return messages

Define a remote function which loops around and pushes messages to the actor, having access to a handle instance as an argument. That is, we are sending it a `MessageActor` instance handle ref as an argument to it.

In [15]:
@ray.remote
def worker(message_actor, j):
    for i in range(10):
        time.sleep(1)
        message_actor.add_message.remote(
            f"Message {i} from worker {j}.")


Create a message actor.

In [16]:
message_actor = MessageActor.remote()

Start 3 tasks that push messages to the actor.

In [17]:
[worker.remote(message_actor, j) for j in range(3)]

[ObjectRef(c7528efcb2fd36edffffffffffffffffffffffff0100000001000000),
 ObjectRef(6efb86ef2d286c40ffffffffffffffffffffffff0100000001000000),
 ObjectRef(89af82725933373effffffffffffffffffffffff0100000001000000)]

Periodically get the messages and print them.

In [18]:
for _ in range(10):
    new_messages = ray.get(message_actor.get_and_clear_messages.remote())
    print("New messages\n:", new_messages)
    time.sleep(1)

New messages
: ['Message 0 from worker 2.', 'Message 0 from worker 0.', 'Message 0 from worker 1.']
New messages
: ['Message 1 from worker 2.', 'Message 1 from worker 0.', 'Message 1 from worker 1.']
New messages
: ['Message 2 from worker 2.', 'Message 2 from worker 0.', 'Message 2 from worker 1.']
New messages
: ['Message 3 from worker 2.', 'Message 3 from worker 0.', 'Message 3 from worker 1.']
New messages
: ['Message 4 from worker 2.', 'Message 4 from worker 0.', 'Message 4 from worker 1.']
New messages
: ['Message 5 from worker 2.', 'Message 5 from worker 0.', 'Message 5 from worker 1.']
New messages
: ['Message 6 from worker 2.', 'Message 6 from worker 1.', 'Message 6 from worker 0.']
New messages
: ['Message 7 from worker 2.', 'Message 7 from worker 1.', 'Message 7 from worker 0.']
New messages
: ['Message 8 from worker 1.', 'Message 8 from worker 2.', 'Message 8 from worker 0.']
New messages
: ['Message 9 from worker 0.', 'Message 9 from worker 1.', 'Message 9 from worker 2.']


Finally, shutdown Ray

In [19]:
ray.shutdown()

### Excercises

1. Add a remote class, such as a logging actgor, that keeps states by logging info (may be only in memory)
2. Implement methods that alters the state
3. Instantiate it and call its methods

### Solution hints

This solution is just a structural hint. There are few missing bits:
 * instantiation of `LoggingActor`
 * Need to use `ray.get()` to fetch the values from the object store

In [None]:
from collections import defaultdict
@ray.remote
class LoggingActor(object):
    def __init__(self):
        self.logs = defaultdict(list)
    
    def log(self, index, message):
        self.logs[index].append(message)
    
    def get_logs(self):
        return dict(self.logs)
    
@ray.remote
def run_experiment(experiment_index, logging_actor):
    for i in range(60):
        time.sleep(1)
        # Push a logging message to the actor.
        logging_actor.log.remote(experiment_index, 'On iteration {}'.format(i))    

In [None]:
# logging_actor = # TODO Instantiate Actor here
experiment_ids = []
for i in range(3):
    experiment_ids.append(run_experiment.remote(i, logging_actor))

In [None]:
logs = logging_actor.get_logs.remote()
# TODO use ray.get() to fetch the logs


shutdown_ray_cluster()

---
## References

 * [Writing your First Distributed Python Application with Ray](https://www.anyscale.com/blog/writing-your-first-distributed-python-application-with-ray)
 * [Using and Programming with Actors](https://docs.ray.io/en/latest/actors.html)
 * [Advanced Patterns and Anti-Patterns in Ray](https://docs.ray.io/en/latest/ray-design-patterns/index.htmlhttps://docs.ray.io/en/latest/ray-design-patterns/index.html)