# Ray Core
In this notebook we are going to explore the following topics:  

- Introduction to Ray system architecture;
- Connecting to a Ray Cluster;
- Ray tasks and actors;
- The object storage;
- Specifying resources usage.



## Ray Architecture
A Ray cluster is made by the following components
- A single **head node**.
- Zero or more **worker nodes**.

<div style="text-align: center;">
    <img src="Images/ray-cluster.svg" alt="Structure of a Ray Cluster" style="width:50%;">
</div>

The **head node is a special cluster node**, which, additionally to worker processes, hosts processes responsible for cluster management (mainly the GCS, the Ray Autoscaler). Usually the Ray driver process, which is the program responsible to submit jobs, is run on the Ray head node, but it can also run on another computer, and connect to a remote cluster.  
The **GCS** (Global Control Service) is a service used for monitoring resource availability and to dispatch changes to other workers. The GCS is also used to save remote function definitions.  

The **worker nodes** are regular nodes which do not run any cluster management process. These nodes simply contribute to the cluster by sharing their resources (CPUs, RAM, and GPUs) that will be used to execute the jobs. It is important to note that each node has a **scheduler** and an **object store**.  
The nodes schedulers are used to schedule tasks and to evaluate if a worker has enough resources to execute a task. Similarly, the workers have an object store that is used to save intermediate results (i.e. variables and objects) to be used later. In case a node has no computing resources available but owns the data needed to execute the task, the data used will be serialized and sent to the worker node that was selected to execute the job.  

For a detailed and rich description of Ray architecture, consider reading [this](https://docs.daocloud.io/en/blogs/2023/230612-ray).

# Connecting to a Ray cluster

In [1]:
import time
from datetime import datetime
import os
from glob import glob

import ray
from ray.util.actor_pool import ActorPool
from sentence_transformers import SentenceTransformer

import pandas as pd

  from .autonotebook import tqdm as notebook_tqdm
2026-01-29 08:02:12,109	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


We have already started our Ray cluster on Leonardo by running the jobscript `start_ray_interactive.sh`. We can now connect to it using ray.init() (since we have exported the environment variable RAY_ADDRESS) 

In [2]:
ray.init(log_to_driver = False, ignore_reinit_error = True)

#os.environ["HF_HOME"]="/leonardo_scratch/fast/tra25_bbs/rmioli00/models/hub"
#os.environ["HF_HUB_CACHE"]="/leonardo_scratch/fast/tra25_bbs/rmioli00/models/hub"
os.environ["HF_OFFLINE"]="1"

2026-01-29 08:04:01,966	INFO worker.py:1520 -- Using address 10.11.1.34:23707 set in the environment variable RAY_ADDRESS
2026-01-29 08:04:01,966	INFO worker.py:1660 -- Connecting to existing Ray cluster at address: 10.11.1.34:23707...
2026-01-29 08:04:01,975	INFO worker.py:1843 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://10.11.1.34:8265 [39m[22m




In a Ray cluster we have various types of resources:
- CPUs;
- GPUs;
- Memory ("execution memory" and object store memory).

The difference between memory and object store memory is that the first one is the memory used by the workers during the execution of remote functions, datasets, etc; on the opposite, the object store memory is memory reserved to share objects between workers and to save "intermediate results".

In [3]:
resources = ray.cluster_resources()
print(f"Cluster has {resources['CPU']} CPUs, {resources['GPU'] if 'GPU' in resources else 0} GPUs, execution memory {resources['memory'] * 1e-9} GBs, object storage memory {resources['object_store_memory'] * 1e-9} GBs")

Cluster has 8.0 CPUs, 1.0 GPUs, execution memory 361.684707328 GBs, object storage memory 155.007731712 GBs


# Ray Core - Tasks, Actors and Objects
A program using Ray is called a job. In a job you usually make various operations (e.g. reading datasets, manipulating columns, training classifiers, etc), so this jobs are composed by "subtasks". To accomplish the work, Ray offers two basic components: **tasks and actors**

## A serial workflow
Suppose we have built a website or some sort of a platform that needs to retrieve user data and then, when all data is loaded, it greets the user. We can simulate five users logging to this website using the following function.  
The users are served sequentially, but this represents a bottleneck due to the fact that the data retrival happens using a FIFO queue.

In [4]:
%%time

def load_people_data(name:str):
    time.sleep(2) # We simulate a very long operation on a database
    return f"Welcome back, {name}! Your data was successfully loaded."

names = ["Domitilla", "Eleonora", "Laura", "Michele", "Riccardo"]
for name in names:
    print(load_people_data(name))

Welcome back, Domitilla! Your data was successfully loaded.
Welcome back, Eleonora! Your data was successfully loaded.
Welcome back, Laura! Your data was successfully loaded.
Welcome back, Michele! Your data was successfully loaded.
Welcome back, Riccardo! Your data was successfully loaded.
CPU times: user 13.3 ms, sys: 3.51 ms, total: 16.8 ms
Wall time: 10 s


## Ray Tasks
We now define the same function, but we parallelize its execution with Ray Tasks. Ray can execute the user defined function in parallel by sending a copy of it to each worker process. The remote function will be then executed receiving as input the names of the users.  

Functions using the `@ray.remote` decorator are called Ray remote functions and their execution is called a Ray Tasks. It's important to note that **tasks are stateless**, which means that once a task is executed, the only thing that is saved is the output of the function in the object storage of the worker that executed the task.

In [5]:
%%time

@ray.remote
def load_people_data(name:str):
    time.sleep(2) # We simulate a very long operation on a database
    return f"Welcome back, {name}! Your data was successfully loaded."

for name in names:
    print(load_people_data.remote(name))

ObjectRef(67a2e8cfa5a06db3ffffffffffffffffffffffff0200000001000000)
ObjectRef(e082c90ab8422b00ffffffffffffffffffffffff0200000001000000)
ObjectRef(e5cbd90b7f1fb776ffffffffffffffffffffffff0200000001000000)
ObjectRef(39088be3736e590affffffffffffffffffffffff0200000001000000)
ObjectRef(ce868e48e2fa9a94ffffffffffffffffffffffff0200000001000000)
CPU times: user 26.1 ms, sys: 8.56 ms, total: 34.7 ms
Wall time: 52.5 ms


Here we see that the execution time is on the order of milliseconds, which is obviously much less than the initial 10 seconds. Even if we had run the function in parallel, we couldn't have retrieved the data for all the users in less than 10 seconds due to the `time.sleep(2)`!  
What you have observed is called **asynchronous execution**. When you call a remote function, its execution is scheduled on some worker node and a "promise" (the Ray `ObjectRef`), an ID of the operations' reuslt, is immediatly returned to the caller.  
To get the actual results we can use `ray.get()` on the IDs of the results. Note that `ray.get()` on Ray task results will block until the task finished execution. For example, if you want to schedule the execution of tasks while waiting for some of them to finish, or if you want to monitor intermediate results of different computations and make a plot, you can use `ray.wait()`. `ray.wait()` returns two lists: list of completed ObjectRefs and list of not ready ObjectRefs.

In [6]:
%%time 

refs = [load_people_data.remote(name) for name in names]

while len(refs) > 0:
    finished, refs = ray.wait(refs)
    print(ray.get(finished))

['Welcome back, Eleonora! Your data was successfully loaded.']
['Welcome back, Domitilla! Your data was successfully loaded.']
['Welcome back, Laura! Your data was successfully loaded.']
['Welcome back, Michele! Your data was successfully loaded.']
['Welcome back, Riccardo! Your data was successfully loaded.']
CPU times: user 12.6 ms, sys: 6.8 ms, total: 19.4 ms
Wall time: 5.01 s


# Ray Actors

Let's now suppose we want to keep track of the number of times a user has logged into the website.

Tasks are stateless, to overcome this issue, we have actors. Actors are Python classes decorated with `@ray.remote`, when we call `ClassName.remote()` the actor is scheduled on some worker and the `__init__` function is executed to instantiate the actor. Then **we can maintain state using actor's instance variables**.  
This is also useful if the class has a long initialization tasks that must be performed once; for example, loading llm weights.  

In [7]:
@ray.remote
class FakeDB():
    def __init__(self, db_backend:str="MySQL", server_id:int=0):
        self.access_no = {}
        self.db_backend = db_backend
        self.server_id = server_id
        
    def load_people_data(self, name:str):
        time.sleep(2) # We simulate a very long operation on a database
        if name in self.access_no:
            self.access_no[name] +=1
        else:
            self.access_no[name] = 1
        return f"Welcome back, {name}! Your data was successfully loaded from {self.db_backend}_{self.server_id}. Today you logged in {self.access_no[name]} times."
    
db_actor = FakeDB.remote(db_backend = "mongodb")

We can launch the task on the actor.

In [8]:
%%time

refs = [db_actor.load_people_data.remote(name) for name in names]
print(refs)

while len(refs) > 0:
    finished, refs = ray.wait(refs)
    print(ray.get(finished))

[ObjectRef(4e42fef2e4fc2164e161e609afab3c6efb57ed330200000001000000), ObjectRef(a634bb3b3ca530f0e161e609afab3c6efb57ed330200000001000000), ObjectRef(d10410bd3222d6a4e161e609afab3c6efb57ed330200000001000000), ObjectRef(ca701c49794a1d14e161e609afab3c6efb57ed330200000001000000), ObjectRef(dd09e9796bb10db3e161e609afab3c6efb57ed330200000001000000)]
['Welcome back, Domitilla! Your data was successfully loaded from mongodb_0. Today you logged in 1 times.']
['Welcome back, Eleonora! Your data was successfully loaded from mongodb_0. Today you logged in 1 times.']
['Welcome back, Laura! Your data was successfully loaded from mongodb_0. Today you logged in 1 times.']
['Welcome back, Michele! Your data was successfully loaded from mongodb_0. Today you logged in 1 times.']
['Welcome back, Riccardo! Your data was successfully loaded from mongodb_0. Today you logged in 1 times.']
CPU times: user 15 ms, sys: 4.89 ms, total: 19.9 ms
Wall time: 10.7 s


As you can see, the object refs were immediatly returned (i.e. the tasks were scheduled), but the actual execution required much more time.
In this case, having a single actor constitutes a bottleneck because when the actor is executing the code for an user it's "busy" and we can't serve another user. However, this is the intended expected behaviour, and it's because actors functions by default are single threaded.
To overcome this issue we can create different actors, the methods called on the different actors will execute in parallel.

In [9]:
%%time
actor_pool = ActorPool([FakeDB.remote(db_backend = "mongodb", server_id = i) for i in range(5)])

# We map each actor to a value
mapped_jobs = actor_pool.map(lambda actor, value: actor.load_people_data.remote(value), names)

print(list(mapped_jobs))

['Welcome back, Domitilla! Your data was successfully loaded from mongodb_4. Today you logged in 1 times.', 'Welcome back, Eleonora! Your data was successfully loaded from mongodb_3. Today you logged in 1 times.', 'Welcome back, Laura! Your data was successfully loaded from mongodb_2. Today you logged in 1 times.', 'Welcome back, Michele! Your data was successfully loaded from mongodb_1. Today you logged in 1 times.', 'Welcome back, Riccardo! Your data was successfully loaded from mongodb_0. Today you logged in 1 times.']
CPU times: user 8.82 ms, sys: 7.19 ms, total: 16 ms
Wall time: 2.91 s


# Object storage
The distributed object storage is the sum of the portions of memory of the workers dedicated to save intermediate results originating from the executions of tasks and actors. By default, the **30% of the memory of a worker** will be used as object storage. When the space of the object storage finishes, the data in will be spilled to disk.  

The user can interact with the object storage with two commands:  
- `ray.put`: save data in some worker's memory;
- `ray.get`: retrieve data associated to an `ObjectRef` (Ray solves which workers has this data automatically).

In [10]:
api_key = "EXAMPLE_0af8791jd91j651hgjalmnc"

key_ref = ray.put(api_key)
print(key_ref)

ObjectRef(00ffffffffffffffffffffffffffffffffffffff0200000001e1f505)


In [11]:
print(ray.get(key_ref))

EXAMPLE_0af8791jd91j651hgjalmnc


In [12]:
@ray.remote
class APIHelper():
    def __init__(self, api_key):
        self.api_key = api_key
        
    def call_endpoint(self):
        import random
        
        time.sleep(2)
        return f"Authentication succeded using {self.api_key}. Server returned {random.randint(0, 1)}"


When an `ObjectRef` is passed as input to tasks or actors, Ray will call automatically the `.get` method.
api_actor = APIHelper.remote(key_ref)
ray.get(api_actor.call_endpoint.remote())

In [13]:
api_actor = APIHelper.remote(key_ref)
ray.get(api_actor.call_endpoint.remote())

'Authentication succeded using EXAMPLE_0af8791jd91j651hgjalmnc. Server returned 1'

# Specifying resources for Tasks and Actors

When defining tasks and actors in Ray, **you can specify the resources** required for their execution. The Ray scheduler uses this information to determine which worker node is suitable for running the task.
This feature is particularly useful as it provides a high-level yet powerful **mechanism to control resource allocation**. It helps prevent issues such as crashes due to out-of-memory (OOM) errors by ensuring tasks are only assigned to nodes with sufficient resources.

Here is an example: Suppose you have a cluster with 16 GB per GPU and you want to execute several LLMs (each using 8 GB of GPU memory). You can instantiate actors by specifying that they must use 0.5 GPUs. This will ensure that a maximum of two LLM actors are executed per node.
Similarly, if you have functions that require a certain amount of memory and CPU cores to execute, you can specify them in the same way. These tasks can be scheduled on the same nodes that execute LLM actors, because LLM actors only use GPU resources and CPU cores and memory are available.

In [14]:
# Two cpu cores, and 500MiB of RAM
@ray.remote(num_cpus = 2, memory=500 * 1024 * 1024)
def example_task():
    import os
    
    return f"Even if num_cpus is 2, the task sees {resources['CPU']} CPUs. Resources are logical!"

ray.get(example_task.remote())

'Even if num_cpus is 2, the task sees 8.0 CPUs. Resources are logical!'

An important note is that **Ray resources are "logical"**, meaning that the number of CPU cores, or GPUs specified are not enforced by Ray.  
Going back to the previous example, if the GPU memory occupied by the LLMs is greater than 8GB, specifiying that each actor must use 0.5 GPUs will not be beneficial because two actors will be scheduled on the workers, incurring in an OOM error.  

# Exercises
- What happens if you set `num_gpus = 3` to the previous `example_task` function? Can you explain why?
- What happens if you set `num_cpus = 5` to the task `load_people_data`? Can you explain why?
- Create a calculator actor which supports addition and subtraction operations between two operands. The calculator needs to support a history of the results and a way to retrieve the last three calculations.  

# Embeddings workflow

In [15]:
@ray.remote(num_cpus=0, num_gpus=0.05)
class Embedder():
    from sentence_transformers import SentenceTransformer
    
    def __init__(self, model_name:str = "/leonardo_scratch/fast/tra26_castiel2/models/all-mpnet-base-v2"):
        self.embedder = SentenceTransformer(model_name)
        
    def __call__(self, input_path:str):
        # Read the txt file associated to the canto
        with open(input_path, mode="r") as f:
            data = f.read()
            
        # Extract the number of the canto    
        canto = int(input_path.split("/")[-1].split("_")[-1].split(".")[0])
        
        # Calculate the embeddings
        embeddings = self.embedder.encode([data])[0].tolist()
        # Return a tuple with the number of the canto and the embeddings
        return_dict = {"canto": canto }
        for i in range(len(embeddings)):
            return_dict[f"{i}"] = embeddings[i]
        return return_dict

Let's load a bunch of files, which contain the full Dante's Divine Comedy. 

In [16]:
files = glob("/leonardo_scratch/fast/tra26_castiel2/data/ray_core/*")
files[:5]

['/leonardo_scratch/fast/tra26_castiel2/mviscia1/data/ray_core/canto_81.txt',
 '/leonardo_scratch/fast/tra26_castiel2/mviscia1/data/ray_core/canto_71.txt',
 '/leonardo_scratch/fast/tra26_castiel2/mviscia1/data/ray_core/canto_86.txt',
 '/leonardo_scratch/fast/tra26_castiel2/mviscia1/data/ray_core/canto_60.txt',
 '/leonardo_scratch/fast/tra26_castiel2/mviscia1/data/ray_core/canto_56.txt']

Here we create an ActorPool with 10 embedders. We could create more than 10, but we set num_gpus 0.10 and we have only one GPU in this job.  

In [17]:
from ray.util.actor_pool import ActorPool

# We instantiate 20 actors - always check the number of actors you schedule fit on the available gpu memory
actor_pool = ActorPool([Embedder.remote() for _ in range(20)])

In [18]:
embeddings = actor_pool.map(lambda actor, path: actor.__call__.remote(path), files)

In [19]:
%%time
embeddings = list(embeddings)

CPU times: user 109 ms, sys: 32.7 ms, total: 142 ms
Wall time: 33.1 s


In [20]:
pd.DataFrame(embeddings)

Unnamed: 0,canto,0,1,2,3,4,5,6,7,8,...,758,759,760,761,762,763,764,765,766,767
0,81,-0.000924,-0.013673,-0.042178,0.020039,-0.012347,0.008459,-0.047387,0.033142,-0.020576,...,-0.064089,-0.014925,0.020101,-0.022937,-0.045240,0.069715,-0.009456,0.010452,0.071358,-0.037445
1,71,0.016341,0.020545,-0.035634,0.000869,-0.014652,0.030680,-0.046138,0.067697,-0.007474,...,-0.020258,0.030111,-0.001424,-0.022194,0.000098,0.003161,-0.009550,0.030902,0.055947,-0.047270
2,86,0.022221,0.018050,-0.022917,0.034534,-0.004871,0.025595,-0.065609,0.037637,-0.023134,...,-0.059692,-0.006871,-0.001749,-0.017815,-0.019615,0.031071,-0.034607,0.025230,0.038682,-0.024533
3,60,0.014424,-0.051991,-0.022669,0.003660,-0.003718,0.031651,0.004777,0.032315,-0.020957,...,-0.055458,-0.006090,-0.020694,-0.003623,-0.041768,-0.014294,-0.023965,0.041703,0.002475,-0.056352
4,56,0.018417,-0.032619,-0.012547,0.008214,0.014980,0.042901,-0.014164,0.053587,-0.012052,...,-0.039766,0.021823,0.030155,-0.004375,-0.032736,0.008290,-0.005280,0.035342,0.051874,-0.025414
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,8,0.005598,-0.029875,-0.030824,-0.024022,-0.003487,0.011112,0.008071,0.033106,-0.024023,...,-0.074173,0.004086,-0.013067,0.013031,-0.024527,0.022993,-0.015042,0.023245,0.020135,-0.034723
96,92,0.018613,-0.013663,-0.023569,0.030094,-0.014003,0.024391,-0.020590,0.022258,-0.045003,...,-0.044722,-0.006877,0.042726,-0.033205,-0.042823,0.045139,0.002348,0.025191,0.020137,-0.027508
97,2,-0.007506,0.034004,-0.024442,-0.032003,0.000467,0.027168,-0.022948,0.056362,-0.019632,...,-0.024906,0.007345,0.000835,-0.006021,-0.021256,0.007798,-0.045879,0.020427,0.028602,-0.029445
98,35,0.018430,0.016690,-0.034403,0.001225,0.006255,0.014304,-0.037441,0.014420,-0.071528,...,-0.055584,-0.014020,-0.013413,-0.006020,-0.033642,0.015155,-0.012674,0.021981,0.020901,-0.039151


# Release resources

In [21]:
ray.shutdown()