# https://docs.ray.io/en/latest/walkthrough.html

In [1]:
import ray
import time

In [2]:
# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
    return 1

# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()

# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1

## Slow loop

### Using pure Python

In [9]:
def slow_function():
    """A functino that takes many seconds seconds to do some job"""
    time.sleep(5)
    return 1

start = time.time()
all_ans = []
for i in range(4):
    # This doesn't block.
    ans = slow_function()
    all_ans.append(ans)
print(all_ans)
end = time.time()
time_taken = end - start
print(f"Time Taken: {time_taken}s")

# Time taken = 5 seconds * 4 = 20 seconds

[1, 1, 1, 1]
Time Taken: 20.03339982032776s


### Using Ray parallelization

In [10]:
## Method 1: Specify without decorator
slow_function_ray = ray.remote(slow_function)

## Method 2: Specify with decorator
# @ray.remote
# def slow_function():
#     """A functino that takes many seconds to do some job"""
#     time.sleep(5)
#     return 1

# Invocations of Ray remote functions happen in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
start = time.time()
all_ans = []
for i in range(4):
    # This doesn't block.
    ans = slow_function_ray.remote()
    all_ans.append(ans)

print(ray.get(all_ans))
end = time.time()
time_taken = end - start
print(f"Time Taken: {time_taken}s")

# Since all 4 loops can run in parallel, time taken = 5 seconds * 1 = 5 seconds

[1, 1, 1, 1]
Time Taken: 5.048443555831909s


**Delay ray.get as much as possible: https://docs.ray.io/en/latest/ray-design-patterns/global-variables.html**

In [11]:
# ray.get is blocking. So it will prevent all further computations until it gets the result
start = time.time()
all_ans = []
for i in range(4):
    ans = slow_function_ray.remote()
    # This WILL block.
    print(ray.get(ans))
    all_ans.append(ans)

print(ray.get(all_ans))
end = time.time()
time_taken = end - start
print(f"Time Taken: {time_taken}s")

1
1
1
1
[1, 1, 1, 1]
Time Taken: 20.05394744873047s


## Ray remote with arguments

In [12]:
@ray.remote
def my_function():
    return 1

@ray.remote
def function_with_an_argument(value):
    return value + 1

obj_ref1 = my_function.remote()
# You can pass an object ref as an argument to another Ray remote function.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2

## Multiple Returns

In [13]:
@ray.remote(num_returns=3)
def return_multiple():
    return 1, 2, 3

a, b, c = return_multiple.remote()
print(ray.get([a, b, c]))

[1, 2, 3]


## Specifying resources

In [15]:
# Specify required resources.
# NOTE: This only means that each call of this function will be assigned 4 cores.
# If the function only uses 1 core, the other 3 cores will be wasted.
# Also, if the machine has 8 cores, then only 2 such tasks will be done in parallel.
# Hence, if the function only runs on a single core, it is better to leave the default num_cpus = 1
# Then we can run 8 such tasks in parallel


#### Running on 8 core machine
@ray.remote(num_cpus=4)
def slow_function():
    """A functino that takes many seconds to do some job"""
    time.sleep(5)
    return 1

start = time.time()
all_ans = []
for i in range(4):
    # This doesn't block.
    ans = slow_function.remote()
    all_ans.append(ans)

print(ray.get(all_ans))
end = time.time()
time_taken = end - start
print(f"Time Taken (4 core - 3 wasted): {time_taken}s")

# Since 8 core machine, only 2 loop iterations can be run in parallel.
# Since there are 4 iterations, it takes 5*2 = 10 seconds

[1, 1, 1, 1]
Time Taken (4 core - 3 wasted): 10.057098388671875s


In [16]:
#### Running on 8 core machine
@ray.remote(num_cpus=1)
def slow_function():
    """A function that takes many seconds to do some job"""
    time.sleep(5)
    return 1

start = time.time()
all_ans = []
for i in range(4):
    # This doesn't block.
    ans = slow_function.remote()
    all_ans.append(ans)

print(ray.get(all_ans))
end = time.time()
time_taken = end - start
print(f"Time Taken (1 core - 0 wasted): {time_taken}s")

# Since 8 core machine, all 4 loop iterations can be run in parallel.
# Hence, it takes 5*1 = 5 seconds

[1, 1, 1, 1]
Time Taken (1 core - 0 wasted): 5.03303599357605s


In [None]:
print(ray.is_initialized())
ray.shutdown()
print(ray.is_initialized())