# A Guided Tour of Ray Core: Remote Stateful Classes

© 2019-2022, Anyscale. All Rights Reserved

📖 [Back to Table of Contents](./ex_00_tutorial_overview.ipynb)<br>
➡ [Next notebook](./ex_04_remote_classes_revisited.ipynb) <br>
⬅️ [Previous notebook](./ex_02_remote_objs.ipynb) <br>

### Overview

Actors extend the [Ray API](https://docs.ray.io/en/latest/ray-core/package-ref.html) from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created or an exsisting worker is used. The methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker. Like tasks, actors support CPU, GPU, and custom resource requirements.

### Learning objectives
In this this tutorial, we'll discuss Ray Actors and learn about:
 * How Ray Actors work
 * How to write a stateful Ray Actor
 * How Ray Actors can be written as a stateful distributed service

[Remote Classes](https://docs.ray.io/en/latest/walkthrough.html#remote-classes-actors)  (just as remote tasks) use a `@ray.remote` decorator on a Python class. 

Ray Actor pattern is powerful. They allow you to take a Python class and instantiate it as a stateful microservice that can be queried from other actors and tasks and even other Python applications. Actors can be passed as arguments to other tasks and actors. 

<img src="images/ray_worker_actor_1.png" height="40%" width="70%">

When you instantiate a remote Actor, a separate worker process is attached to a worker process and becomes an Actor process on that worker node—all for the purpose of running methods called on the actor. Other Ray tasks and actors can invoke its methods on that process, mutating its internal state if desried. Actors can also be terminated manually if needed. The examples code below show all these cases.

<img src="images/ray_worker_actor_2.png" height="40%" width="70%">

So let's look at some examples of Python classes converted into Ray Actors.

In [1]:
import logging
import time
import os
import math
import ray
import random
import tqdm
from typing import Dict, Tuple, List
from random import randint
import numpy as np

First, let's start Ray…

In [2]:
if ray.is_initialized:
    ray.shutdown()
ray.init(logging_level=logging.ERROR)

0,1
Python version:,3.8.13
Ray version:,2.2.0
Dashboard:,http://127.0.0.1:8265


## Remote class as a stateful actor

#### Example 1: Method tracking for Actors
**Problem**: We want to keep track of who invoked a particular method in different Actors. This could be a use case for telemetry data we want to track about what Actors are being used and its respective methods invoked. Or what Actor service's methods are most frequently accessed or used.

|<img src="images/cls_actor_calls.png" width="70%" height="30%"> |
|:--|
|Driver code calling different Actor methods.|


Let's use this actor to track method invocation of a Ray actor's methods. Each Ray actor instance will track how many times it's methods were invoked. 

Define a base class `ActorCls`, and define two sublcasses `ActorClsOne` and `ActorClsTwo`.

Our base class `ActorCls`

In [3]:
class ActorCls:
    def __init__(self, name: str):
        self.name = name
        self.method_calls = {"method": 0}

    def method(self, **args) -> None:
        # Overwrite this method is the subclass
        pass

    def get_all_method_calls(self) -> Tuple[str, Dict[str, int]]:
        return self.get_name(), self.method_calls
    
    def get_name(self) -> str:
        return self.name

Our first super class `ActorClsOne`

In [6]:
@ray.remote
class ActorClsOne(ActorCls):
    
    def __init__(self, name: str):
        super().__init__(name)
        
    def method(self, **args) -> None:
        # do something with kwargs here
        time.sleep(args["timeout"])
        
        # update the respective counter
        self.method_calls["method"] += 1

Our second super class `ActorClsTwo`

In [7]:
@ray.remote
class ActorClsTwo(ActorCls):
    
    def __init__(self, name: str):
         super().__init__(name)
        
    def method(self, **args) -> None:
        # do something with kwargs here
        time.sleep(args["timeout"])
        
        # update the respective counter
        self.method_calls["method"] += 1

### Make random calls to Actors 

This is our driver using the two Actors we defined. It randomly calls each Actor
and its respective method.

In [8]:
CALLERS_NAMES = ["ActorClsOne", "ActorClsTwo"]
CALLERS_CLS_DICT = {"ActorClsOne": ActorClsOne.remote("ActorClsOne"), 
                    "ActorClsTwo": ActorClsTwo.remote("ActorClsTwo")}

Iterate over number of classes, and call randomly each super class Actor's method while keeping track locally here for verification.

In [9]:
count_dict = {"ActorClsOne": 0, "ActorClsTwo": 0}
for _ in range(len(CALLERS_NAMES)): 
    for _ in range(15):
        name = random.choice(CALLERS_NAMES)
        count_dict[name] += 1 
        CALLERS_CLS_DICT[name].method.remote(timeout=1, store="mongo_db") if name == "ActorClsOne" else CALLERS_CLS_DICT[name].method.remote(timeout=1.5, store="delta")
        
    print(f"State of counts in this execution: {count_dict}")
    time.sleep(0.5)

State of counts in this execution: {'ActorClsOne': 7, 'ActorClsTwo': 8}
State of counts in this execution: {'ActorClsOne': 14, 'ActorClsTwo': 16}


Fetch the count of all the methods called in each Actor called so far. 

In [10]:
print(ray.get([CALLERS_CLS_DICT[name].get_all_method_calls.remote() for name in CALLERS_NAMES]))

[('ActorClsOne', {'method': 14}), ('ActorClsTwo', {'method': 16})]


**Note** that we did not have to reason about where and how the actors are scheduled.

We did not worry about the socket connection or IP addresses where these actors reside. All that's abstracted away from us. All that's handled by Ray.

All we did is write Python code, using Ray core APIs, convert our classes into distributed stateful services!

#### Look at the Ray Dashboard

You should see Actors running as process on the workers nodes
Also, click on the `Actors` to view more metrics and data on individual Ray Actors

### Recap
In the above exercise, we saw how you can use Actors to keep track how many times its methods were invoked. This could be a useful example for telemetry data if you're interested to obtain the use of Actors deployed as services.

### Exercise
Add another class, call it `ActorClsThree` and modify the code to keep track of its method. For simplicty, keep track of only a single method. 

#### Example 2: Use Actor to keep track of progress 

**Problem**: In our [first tutorial](ex_01_remote_funcs.ipynb), we explored how to approximate the value of π using only tasks. In this example, we extend it by definining a Ray actor that can be called by our Ray sampling tasks to update progress. The sampling tasks send a message (via method call) to the Ray actor to update progress. 

|<img src="https://docs.ray.io/en/latest/_images/monte_carlo_pi.png" width="60%" height="30%">|
|:--|
|Estimating the value of π by sampling random points that fall into the circle.|

#### Defining our progress Actor
Let's define a Ray actor that does the following:
 * keeps track of each task id and its completed tasks
 * can be called (or sent a message to) by sampling tasks to update progress

In [11]:
@ray.remote
class ProgressPIActor:
    def __init__(self, total_num_samples: int):
        # total number of all the samples for all the tasks
        self.total_num_samples = total_num_samples
        # Dict to keep track of each task id
        self.num_samples_completed_per_task = {}

    def report_progress(self, task_id: int, num_samples_completed: int) -> None:
        # Update sample completed for a task id
        self.num_samples_completed_per_task[task_id] = num_samples_completed

    def get_progress(self) -> float:
        # Ratio of tasks completed so far completed and 
        # total number of all the samples == num_of_tasks * num_samples  
        return (
            sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
        )

#### Defining the Sampling Task

As before in our task tutorial, we define a Ray task that does the sampling up to `num_samples` and returns the number of samples that are inside the circle. The
`frequency_report` is the value at which point we want to update the current `task_id`s progress in our progress actor. 

In [12]:
@ray.remote
def sampling_task(num_samples: int, task_id: int, 
                  progress_actor: ray.actor.ActorHandle,
                  frequency_report: int = 1_000_000) -> int:
    num_inside = 0
    for i in range(num_samples):
        # x, y coordinates that bounded by the circle's radius
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1

        # Report progress every requency_report of samples.
        if (i + 1) % frequency_report == 0:
            # Send a message or call the actor method.
            # This is asynchronous.
            progress_actor.report_progress.remote(task_id, i + 1)

    # Report the final progress.
    progress_actor.report_progress.remote(task_id, num_samples)
    
    # Return the total number of samples inside our circle
    return num_inside

#### Defining some tunable parameters 

These values can be changed for experimentation.
 * `NUM_SAMPLING_TASKS`   - you can scale this depending on CPUs on your cluster. 
 * `NUM_SAMPLES_PER_TASK` - you can increase or decrease the number of samples per task to experiment how it affects the accuracy of π
 * `SAMPLE_REPORT_FREQUENCY` - report progress after this number has reached in the sampling Ray task

In [13]:
# Change this for experimentation to match your cluster scale.
NUM_SAMPLING_TASKS = os.cpu_count()
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK
SAMPLE_REPORT_FREQUENCY = 1_000_000

# Create the progress actor.
progress_actor = ProgressPIActor.remote(TOTAL_NUM_SAMPLES)

#### Executing Sampling Tasks in parallel

Using comprehension list, we launch `NUM_SAMPLING_TASKS` as Ray remote tasks, each
sampling with `NUM_SAMPLES_PER_TASK` data points. 

**Note**: We send our progress report  actor as a parameter to each Ray task  

In [14]:
# Create and execute all sampling tasks in parallel.
# It returns a list of ObjectRefIDs returned by each task.
# The ObjectRefID contains the value of points inside the circle
results = [
    sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor, frequency_report=SAMPLE_REPORT_FREQUENCY )
    for i in range(NUM_SAMPLING_TASKS)
]

#### Calling the Progress Actor

While the task are executing asynchronously, let's check how they are progressing using our Ray Actor.

In [15]:
# Query progress periodically.
while True:
    progress = ray.get(progress_actor.get_progress.remote())
    print(f"Progress: {int(progress * 100)}%")

    if progress == 1:
        break

    time.sleep(1)

Progress: 7%
Progress: 27%
Progress: 47%
Progress: 67%
Progress: 87%
Progress: 100%


### Calculating π
As before the value of π is the ratio of total_num_inside * 4 / total samples. 

In [16]:
# Get all the sampling tasks results.
total_num_inside = sum(ray.get(results))
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {pi}")

Estimated value of π is: 3.14143452


### Recap
Ray Actors are stateful and their methods can be invoked to pass messages or to alter the internal state of the class. Actors are scheduled on a dedicated Ray node's worker process. As such, all actor's method are executed on that particular worker process.

In the above two examples, we saw how you can use Actors to keep track how many times its methods were invoked. This could be a useful example for telemetry data if you're interested to obtain the use of Actors deployed as services.

We also demonstrated how you can use Actors to keep progress of certain Ray tasks; in our case, we tracked progress of Ray tasks approximating the value of π.

Finally, shutdown Ray

In [18]:
ray.shutdown()

### Next Step

We covered how to use Ray Actors and write a distributed service. Next, let's explore
how Actors can be used to write more distributed applications using Ray Actor Tree pattern.

Let's move on to the [Ray Actor Revised](ex_04_remote_classes_revisited.ipynb).

## Homework

Read these references

 * [Writing your First Distributed Python Application with Ray](https://www.anyscale.com/blog/writing-your-first-distributed-python-application-with-ray)
 * [Using and Programming with Actors](https://docs.ray.io/en/latest/actors.html)
 * [Ray Asynchronous and Threaded Actors: A way to achieve concurrency]()

📖 [Back to Table of Contents](./ex_00_tutorial_overview.ipynb)<br>
➡ [Next notebook](./ex_04_remote_classes_revisited.ipynb) <br>
⬅️ [Previous notebook](./ex_02_remote_objs.ipynb) <br>