# Why We Need Ray Actors

Using Ray tasks is great for distributing work around a cluster, but we've said nothing so far about managing distributed state, one of the big challenges in distributed computing. Ray tasks are excellent for stateless computation, but we need something for stateful computation. Python classes are a familiar mechanism for encapsulating state. Just as Ray tasks extend the familiar concept of Python functions, Ray addresses stateful computation by extending classes to become Ray actors.

## What We Mean by Distributed State

If you've worked with data processing libraries like Pandas or big data tools like Apache Spark, you know that they provide rich features for manipulating large, structured data sets, i.e., the analogs of tables in a database. Some tools even support partitioning of these data sets over clusters for scalability. This isn't the kind of distributed "state" Ray addresses. Instead, it's the more open-ended graph of objects found in more general-purpose applications. For example, it could be the state of a game engine used in a reinforcement learning (RL) application or the total set of parameters in a giant neural network, some of which now have hundreds of millions of parameters. In the following examples, we'll explore how Ray actors can be used to manage distributed state in various scenarios.

# Monte Carlo Estimation of π
This tutorial shows you how to estimate the value of π using a Monte Carlo method that works by randomly sampling points within a 2x2 square. We can use the proportion of the points that are contained within the unit circle centered at the origin to estimate the ratio of the area of the circle to the area of the square. Given that we know the true ratio to be π/4, we can multiply our estimated ratio by 4 to approximate the value of π. The more points that we sample to calculate this approximation, the closer the value should be to the true value of π.

In [1]:
import ray
import math
import time
import random

ray.init()

2024-10-20 19:40:35,335	INFO worker.py:1786 -- Started a local Ray instance.


0,1
Python version:,3.10.15
Ray version:,2.37.0


[33m(raylet)[0m The autoscaler failed with the following error:
Terminated with signal 15
  File "/home/vscode/.local/lib/python3.10/site-packages/ray/autoscaler/_private/monitor.py", line 709, in <module>
    monitor.run()
  File "/home/vscode/.local/lib/python3.10/site-packages/ray/autoscaler/_private/monitor.py", line 584, in run
    self._run()
  File "/home/vscode/.local/lib/python3.10/site-packages/ray/autoscaler/_private/monitor.py", line 438, in _run
    time.sleep(AUTOSCALER_UPDATE_INTERVAL_S)



## Defining the Progress Actor

Next, we define a Ray actor that can be called by sampling tasks to update progress. Ray actors are essentially stateful services that anyone with an instance (a handle) of the actor can call its methods.


In [2]:
@ray.remote
class ProgressActor:
    def __init__(self, total_num_samples: int):
        self.total_num_samples = total_num_samples
        self.num_samples_completed_per_task = {}

    def report_progress(self, task_id: int, num_samples_completed: int) -> None:
        self.num_samples_completed_per_task[task_id] = num_samples_completed

    def get_progress(self) -> float:
        return (
            sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
        )

We define a Ray actor by decorating a normal Python class with ray.remote. The progress actor has report_progress() method that will be called by sampling tasks to update their progress individually and get_progress() method to get the overall progress.

## Defining the Sampling Task
After our actor is defined, we now define a Ray task that does the sampling up to num_samples and returns the number of samples that are inside the circle. Ray tasks are stateless functions. They execute asynchronously, and run in parallel.

In [3]:
@ray.remote
def sampling_task(num_samples: int, task_id: int,
                  progress_actor: ray.actor.ActorHandle) -> int:
    num_inside = 0
    for i in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1

        # Report progress every 1 million samples.
        if (i + 1) % 1_000_000 == 0:
            # This is async.
            progress_actor.report_progress.remote(task_id, i + 1)

    # Report the final progress.
    progress_actor.report_progress.remote(task_id, num_samples)
    return num_inside

As we learnt in the `ray_tasks.ipynb` we can convert a normal Python function as a Ray task, we decorate the function with ray.remote. The sampling task takes a progress actor handle as an input and reports progress to it. The above code shows an example of calling actor methods from tasks.

## Creating a Progress Actor
Once the actor is defined, we can create an instance of it.

In [4]:
# Change this to match your cluster scale.
NUM_SAMPLING_TASKS = 10
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK

# Create the progress actor.
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)

To create an instance of the progress actor, simply call ActorClass.remote() method with arguments to the constructor. This creates and runs the actor on a remote worker process. The return value of ActorClass.remote(...) is an actor handle that can be used to call its methods.

## Executing Sampling Tasks
Now the task is defined, we can execute it asynchronously.

In [5]:
# Create and execute all sampling tasks in parallel.
results = [
    sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)
    for i in range(NUM_SAMPLING_TASKS)
]

We execute the sampling task by calling remote() method with arguments to the function. This immediately returns an ObjectRef as a future and then executes the function asynchronously on a remote worker process.

## Calling the Progress Actor
While sampling tasks are running, we can periodically query the progress by calling the actor get_progress() method.

In [6]:
# Query progress periodically.
while True:
    progress = ray.get(progress_actor.get_progress.remote())
    print(f"Progress: {int(progress * 100)}%")

    if progress == 1:
        break

    time.sleep(1)

Progress: 0%
Progress: 2%
Progress: 4%
Progress: 4%
Progress: 6%
Progress: 8%
Progress: 10%
Progress: 10%
Progress: 12%
Progress: 14%
Progress: 14%
Progress: 16%
Progress: 18%
Progress: 19%
Progress: 20%
Progress: 22%
Progress: 24%
Progress: 25%
Progress: 26%
Progress: 28%
Progress: 28%
Progress: 30%
Progress: 32%
Progress: 33%
Progress: 35%
Progress: 36%
Progress: 38%
Progress: 39%
Progress: 40%
Progress: 42%
Progress: 44%
Progress: 45%
Progress: 46%
Progress: 48%
Progress: 49%
Progress: 51%
Progress: 52%
Progress: 54%
Progress: 55%
Progress: 56%
Progress: 57%
Progress: 59%
Progress: 61%
Progress: 63%
Progress: 64%
Progress: 65%
Progress: 67%
Progress: 68%
Progress: 70%
Progress: 71%
Progress: 73%
Progress: 74%
Progress: 76%
Progress: 77%
Progress: 79%
Progress: 80%
Progress: 81%
Progress: 83%
Progress: 85%
Progress: 86%
Progress: 87%
Progress: 89%
Progress: 91%
Progress: 92%
Progress: 93%
Progress: 95%
Progress: 96%
Progress: 97%
Progress: 99%
Progress: 100%


To call an actor method, use actor_handle.method.remote(). This invocation immediately returns an ObjectRef as a future and then executes the method asynchronously on the remote actor process. To fetch the actual returned value of ObjectRef, we use the blocking ray.get().

## Calculating π

Finally, we get number of samples inside the circle from the remote sampling tasks and calculate π.

In [7]:
# Get all the sampling tasks results.
total_num_inside = sum(ray.get(results))
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {pi}")

Estimated value of π is: 3.14141148


: 

As we can see from the above code, besides a single ObjectRef, ray.get() can also take a list of ObjectRef and return a list of results.

If you run this tutorial, you will see output like:
```bash
Progress: 0%
Progress: 15%
Progress: 28%
Progress: 40%
Progress: 50%
Progress: 60%
Progress: 70%
Progress: 80%
Progress: 90%
Progress: 100%
Estimated value of π is: 3.1412202
```