https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8

In [1]:
import ray
import time

2024-04-21 06:56:44,921	INFO util.py:90 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [2]:
# Start Ray.
ray.init()

2024-04-21 06:57:38,859	INFO worker.py:1642 -- Started a local Ray instance.


0,1
Python version:,3.7.7
Ray version:,2.7.2


In [10]:
@ray.remote
def f(x):
    time.sleep(20)
    return x

In [11]:
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
    result_ids.append(f.remote(i))

In [12]:
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 5 second.
results = ray.get(result_ids)  # [0, 1, 2, 3]
results

[0, 1, 2, 3]

In [47]:
@ray.remote
def create_matrix(size):
    return np.random.normal(size=size)

@ray.remote
def multiply_matrices(x, y):
    return np.dot(x, y)

start = time.time()
x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_ids = []
for i in range(100):
    z_ids.append(multiply_matrices.remote(x_id, y_id))

# Get the results.
z = ray.get(z_ids)

print('Time taken: ' + str(time.time()-start))

Time taken: 1.3873200416564941


In [49]:
@ray.remote
def add(x, y):
    time.sleep(1)
    return x + y




In [None]:
def add(x, y):

In [None]:
values = [1, 2, 3, 4, 5, 6, 7, 8]

In [50]:
x = add.remote(1, 1)

In [51]:
type(x)

ray._raylet.ObjectRef

In [55]:
ray.get(x)

2

In [56]:
x=[1,2]

In [57]:
y = [ 5, 6, 7]

In [58]:
x + y

[1, 2, 5, 6, 7]

In [64]:
# Fast approach.
values = list(range(20))
while len(values) > 1:
    values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])

In [65]:
# Slow approach.
values = list(range(20))
while len(values) > 1:
    values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])

In [66]:
@ray.remote
class Counter(object):
    def __init__(self):
        self.x = 0
    
    def inc(self):
        self.x += 1
    
    def get_value(self):
        return self.x

In [67]:
# Create an actor process.
c = Counter.remote()

In [68]:
type(c)

ray.actor.ActorHandle

In [70]:
# Check the actor's counter value.
print(ray.get(c.get_value.remote()))  # 0

0


In [71]:
# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()

ObjectRef(40936f29ad73f41c46e5a9d3328a581c3ac2ed120100000001000000)

In [82]:
print(ray.get(c.get_value.remote()))  # 2

2


In [85]:
c.get_value.remote()

ObjectRef(40ea89cf4249848546e5a9d3328a581c3ac2ed120100000001000000)

In [87]:
ray.get(c.get_value.remote())

2

In [88]:
c.x

AttributeError: 'ActorHandle' object has no attribute 'x'

The following example creates an actor that stores messages. Several worker tasks repeatedly push messages to the actor, and the main Python script reads the messages periodically.

In [90]:
import time

@ray.remote
class MessageActor(object):
    def __init__(self):
        self.messages = []
    
    def add_message(self, message):
        self.messages.append(message)
    
    def get_and_clear_messages(self):
        messages = self.messages
        self.messages = []
        return messages
    
    
# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
    for i in range(100):
        time.sleep(1)
        message_actor.add_message.remote("Message {} from worker {}.".format(i, j))
        
        

In [91]:
# Create a message actor.
message_actor = MessageActor.remote()

In [92]:
# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]

[ObjectRef(fd5b387a1d03c2f3ffffffffffffffffffffffff0100000001000000),
 ObjectRef(a065fb8bd22ea359ffffffffffffffffffffffff0100000001000000),
 ObjectRef(5e87988ffd705f70ffffffffffffffffffffffff0100000001000000)]

In [93]:
# Periodically get the messages and print them.
for _ in range(100):
    new_messages = ray.get(message_actor.get_and_clear_messages.remote())
    print("New messages:", new_messages)
    time.sleep(1)

New messages: ['Message 0 from worker 1.', 'Message 0 from worker 2.', 'Message 0 from worker 0.', 'Message 1 from worker 2.', 'Message 1 from worker 1.', 'Message 1 from worker 0.', 'Message 2 from worker 1.', 'Message 2 from worker 2.', 'Message 2 from worker 0.', 'Message 3 from worker 2.', 'Message 3 from worker 1.', 'Message 3 from worker 0.', 'Message 4 from worker 2.', 'Message 4 from worker 1.', 'Message 4 from worker 0.', 'Message 5 from worker 2.', 'Message 5 from worker 1.', 'Message 5 from worker 0.', 'Message 6 from worker 2.', 'Message 6 from worker 1.', 'Message 6 from worker 0.', 'Message 7 from worker 2.', 'Message 7 from worker 1.', 'Message 7 from worker 0.', 'Message 8 from worker 2.', 'Message 8 from worker 1.', 'Message 8 from worker 0.', 'Message 9 from worker 2.', 'Message 9 from worker 1.', 'Message 9 from worker 0.', 'Message 10 from worker 2.', 'Message 10 from worker 1.', 'Message 10 from worker 0.', 'Message 11 from worker 2.', 'Message 11 from worker 1.', 

New messages: ['Message 83 from worker 0.', 'Message 83 from worker 1.', 'Message 83 from worker 2.']
New messages: ['Message 84 from worker 0.', 'Message 84 from worker 1.', 'Message 84 from worker 2.']
New messages: ['Message 85 from worker 0.', 'Message 85 from worker 2.', 'Message 85 from worker 1.']
New messages: ['Message 86 from worker 0.', 'Message 86 from worker 2.', 'Message 86 from worker 1.']
New messages: ['Message 87 from worker 0.', 'Message 87 from worker 2.', 'Message 87 from worker 1.']
New messages: ['Message 88 from worker 0.', 'Message 88 from worker 2.', 'Message 88 from worker 1.']
New messages: ['Message 89 from worker 0.', 'Message 89 from worker 2.', 'Message 89 from worker 1.']
New messages: ['Message 90 from worker 0.', 'Message 90 from worker 1.', 'Message 90 from worker 2.']
New messages: ['Message 91 from worker 0.', 'Message 91 from worker 2.', 'Message 91 from worker 1.']
New messages: ['Message 92 from worker 0.', 'Message 92 from worker 2.', 'Message 

In [99]:
ray.shutdown()

In [100]:
import numpy as np
import psutil
import ray
import scipy.signal


In [101]:
num_cpus = psutil.cpu_count(logical=False)
print(num_cpus)

6


In [102]:
ray.init(num_cpus=num_cpus)

2024-04-21 14:51:59,011	INFO worker.py:1642 -- Started a local Ray instance.


0,1
Python version:,3.7.7
Ray version:,2.7.2


In [103]:
@ray.remote
def f(image, random_filter):
    # Do some image processing.
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

In [104]:
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]

In [105]:
for _ in range(10):
    image = np.zeros((3000, 3000))
    image_id = ray.put(image)
    ray.get([f.remote(image_id, filters[i]) for i in range(num_cpus)])

In [106]:
ray.shutdown()

In [107]:
from multiprocessing import Pool
import numpy as np
import psutil
import scipy.signal

num_cpus = psutil.cpu_count(logical=False)

def f(args):
    image, random_filter = args
    # Do some image processing.
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

pool = Pool(num_cpus)

filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]

# Time the code below.

for _ in range(10):
    image = np.zeros((3000, 3000))
    pool.map(f, zip(num_cpus * [image], filters))