In [1]:
import ray
import os
import time
import numpy as np
import socket

In [2]:
ray.init(address="ray://172.31.3.39:10001")

2025-03-06 19:39:46,560	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: log_to_driver
SIGTERM handler is not set because current thread is not the main thread.


0,1
Python version:,3.10.12
Ray version:,2.43.0
Dashboard:,http://127.0.0.1:8265


# Ray tasks

In [3]:
database = ["learning", "Ray", "for", "distributed", "data", "processing"]

In [4]:
def retrieve(idx):
    time.sleep(idx / 10.)
    return idx, database[idx]

In [11]:
t0 = time.time()
results = [retrieve(idx) for idx in range(6)]
t1 = time.time()
print(results)
print(t1-t0, "seconds")

[(0, 'learning'), (1, 'Ray'), (2, 'for'), (3, 'distributed'), (4, 'data'), (5, 'processing')]
1.5028445720672607 seconds


In [7]:
@ray.remote
def retrieve_task(idx):
    return retrieve(idx)

In [12]:
t0 = time.time()
res_refs = [retrieve_task.remote(idx) for idx in range(6)]
results = ray.get(res_refs)  # serve as a barrier
t1 = time.time()

print(results)
print(t1-t0, "seconds")

[(0, 'learning'), (1, 'Ray'), (2, 'for'), (3, 'distributed'), (4, 'data'), (5, 'processing')]
0.6391372680664062 seconds


In [13]:
db_refs = ray.put(database)

In [14]:
db_refs

ClientObjectRef(00ffffffffffffffffffffffffffffffffffffff0200000002e1f505)

In [15]:
@ray.remote
def retrieve_task_by_ref(idx, db_refs):
    time.sleep(idx / 10.)
    return idx, db_refs[idx], socket.gethostbyname(socket.gethostname())

In [16]:
t0 = time.time()
res_refs = [retrieve_task_by_ref.remote(idx, db_refs) for idx in range(6)]
results = ray.get(res_refs)  # serve as a barrier
t1 = time.time()

print(results)
print(t1-t0, "seconds")

[(0, 'learning', '172.31.3.39'), (1, 'Ray', '172.31.3.39'), (2, 'for', '172.31.66.239'), (3, 'distributed', '172.31.66.239'), (4, 'data', '172.31.3.39'), (5, 'processing', '172.31.3.39')]
0.6572628021240234 seconds


In [17]:
data = np.random.randint(10, size=[10_000])
data

array([9, 9, 6, ..., 4, 4, 4], shape=(10000,))

In [18]:
num_partitions = 2
partition_sz = len(data) // num_partitions
input_buckets = [data[i * partition_sz : (i+1) * partition_sz] for i in range(num_partitions)]

In [19]:
@ray.remote
def upstream_task(input):
    return input, socket.gethostbyname(socket.gethostname())

In [20]:
@ray.remote
def downstream_task(input):
    intermediate_res, hostname = input
    return np.sum(intermediate_res), hostname

In [21]:
input_buckets

[array([9, 9, 6, ..., 6, 9, 6], shape=(5000,)),
 array([5, 5, 8, ..., 4, 4, 4], shape=(5000,))]

In [22]:
%%time

obj_refs = [upstream_task.remote(input) for input in input_buckets]
final_refs = [downstream_task.remote(obj_ref) for obj_ref in obj_refs]

print(ray.get(final_refs))

[(np.int64(22370), '172.31.3.39'), (np.int64(22554), '172.31.66.239')]
CPU times: user 10.5 ms, sys: 151 μs, total: 10.6 ms
Wall time: 52.7 ms


In [23]:
print(obj_refs)

[ClientObjectRef(f9b454884f032cedffffffffffffffffffffffff0200000001000000), ClientObjectRef(c5287b29a01a0306ffffffffffffffffffffffff0200000001000000)]


In [24]:
print(final_refs)

[ClientObjectRef(c1c9ddb7e87e0a85ffffffffffffffffffffffff0200000001000000), ClientObjectRef(5bed7b81f9990839ffffffffffffffffffffffff0200000001000000)]


# Ray actors

In [25]:
@ray.remote  # indicates this is a Ray actor class
class Actor:
    def __init__(self):
        self.counts = 0

    def increment(self):
        self.counts += 1

    def counts(self):
        return self.counts

In [26]:
@ray.remote
def downstream_task_actor(input, actor):
    intermediate_result, hostname = input
    actor.increment.remote()
    return np.sum(intermediate_result), hostname

In [27]:
actor = Actor.remote() # initialize an actor

In [30]:
%%timeit

t0 = time.time()
upstream_task_result_refs = [upstream_task.remote(input) for input in input_buckets]
downstream_task_result_refs = [
    downstream_task_actor.remote(upstream_task_result_ref, actor) for upstream_task_result_ref in upstream_task_result_refs
]

final_results = ray.get(downstream_task_result_refs)
t1 = time.time()

t2 = time.time()
print(ray.get(actor.counts.remote()))
t3 = time.time()

print(upstream_task_result_refs)
print(downstream_task_result_refs)
print(final_results)
print(t1-t0, "seconds")
print(t3-t2, "seconds")

6
[ClientObjectRef(9103f9b6672865afffffffffffffffffffffffff0200000001000000), ClientObjectRef(0455bda690ff7b95ffffffffffffffffffffffff0200000001000000)]
[ClientObjectRef(744553850c97129bffffffffffffffffffffffff0200000001000000), ClientObjectRef(3b86534cae58b4adffffffffffffffffffffffff0200000001000000)]
[(np.int64(22370), '172.31.3.39'), (np.int64(22554), '172.31.66.239')]
0.05763721466064453 seconds
0.013470888137817383 seconds
8
[ClientObjectRef(d8cec09e077956f2ffffffffffffffffffffffff0200000001000000), ClientObjectRef(7fbb0cc2a15180f1ffffffffffffffffffffffff0200000001000000)]
[ClientObjectRef(b3634f2efcd82ae1ffffffffffffffffffffffff0200000001000000), ClientObjectRef(b85ef66c81df058dffffffffffffffffffffffff0200000001000000)]
[(np.int64(22370), '172.31.3.39'), (np.int64(22554), '172.31.3.39')]
0.03872203826904297 seconds
0.007459878921508789 seconds
10
[ClientObjectRef(0f2e512e68272472ffffffffffffffffffffffff0200000001000000), ClientObjectRef(4d72e7416d1d3cdeffffffffffffffffffffffff020