## Essentials of Ray Remote Functions
When you call a remote function, it immediately returns an ObjectRef (a future),
which is a reference to a remote object. Ray creates and executes a task in the
background on a separate worker process and writes the result when finished into the
original reference. You can then call ray.get on the ObjectRef to obtain the value.
Note that ray.get is a blocking method waiting for task execution to complete before
returning the result.

## Remote Objects in Ray
A remote object is just an object, which may be on another node. ObjectRefs are like
pointers or IDs to objects that you can use to get the value from, or status of, the
remote function. In addition to being created from remote function calls, you can
also create ObjectRefs explicitly by using the ray.put function.


The example converts the
iterator to a list before passing it to ray.get. You need to do this when calling
ray.get takes in a list of futures or an individual future.1
The function waits until it has all the objects so it can return the list in order <br/><br/>

As with regular Ray remote functions, it’s important to think
about the amount of work done inside each remote invocation.
For example, using ray.remote to compute factorials recursively
will be slower than doing it locally since the work inside each
function is small even though the overall work can be large. The
exact amount of time depends on how busy your cluster is, but as a
general rule, anything executed in under a few seconds without any
special resources is not worth scheduling remotely

## Remote Functions Lifecycle
The invoking Ray process (called the owner) of a remote function schedules the
execution of a submitted task and facilitates the resolution of the returned ObjectRef
to its underlying value if needed. <br/><br/>
On task submission, the owner waits for all dependencies (i.e., ObjectRef objects that
were passed as an argument to the task) to become available before scheduling. The
dependencies can be local or remote, and the owner considers the dependencies to be
ready as soon as they are available anywhere in the cluster. When the dependencies
are ready, the owner requests resources from the distributed scheduler to execute the
task. Once resources are available, the scheduler grants the request and responds with
the address of a worker that will execute the function.<br/><br/>
At this point, the owner sends the task specification over gRPC to the worker. After
executing the task, the worker stores the return values. If the return values are small
(less than 100 KiB by default), the worker returns the values inline directly to the
owner, which copies them to its in-process object store. If the return values are large,
the worker stores the objects in its local shared memory store and replies to the
owner, indicating that the objects are now in distributed memory. This allows the
owner to refer to the objects without having to fetch the objects to its local node.
When a task is submitted with an ObjectRef as its argument, the worker must resolve
its value before it can start executing the task.<br/><br/>
Tasks can end in an error. Ray distinguishes between two types of task errors:<br/><br/>
**Application-level**<br/>
In this scenario, the worker process is alive, but the task ends in an error (e.g., a
task that throws an IndexError in Python).<br/><br/>
**System-level**<br/>
In this scenario, the worker process dies unexpectedly (e.g., a process that seg‐
faults, or if the worker’s local Raylet dies).<br/><br/>
Tasks that fail because of application-level errors are never retried. The exception
is caught and stored as the return value of the task. Tasks that fail because of system-level errors may be automatically retried up to a specified number of attempts.

In [1]:
import ray
import time
import random
from typing import Tuple



things = list(range(20))
things.sort(reverse=True)

In [2]:
@ray.remote
def remote_task(x):
 time.sleep(x)
 return x

As you recall, the example remote function sleeps based on the input argument.
Since the range is in ascending order, calling the remote function on it will result
in futures that are completed in order. To ensure that the futures won’t complete
in order, you will need to modify the list. One way you can do this is by calling
things.sort(reverse=True) prior to mapping your remote function over things <br/><br/>
To see the difference between using ray.get and ray.wait, you can write a function
that collects the values from your futures with some time delay on each object to
simulate business logic<br/><br/>
The first option, not using ray.wait, is a bit simpler and cleaner to read, as shown in
Example 3-2, but is not recommended for production use.

In [3]:

""" waits for all futures to complete in the order in which they were submitted. 
Wasteful when each remote future has different execution time. 
Hence, you'd be stuck atleast as long as the long running future.
"""
def in_order():
 # Make the futures
 futures = list(map(lambda x: remote_task.remote(x), things))
 values = ray.get(futures)
 for v in values:
    print(f" Completed {v}")
    time.sleep(1) # Business logic goes here
   
in_order()

2023-10-16 06:47:44,120	INFO worker.py:1642 -- Started a local Ray instance.


 Completed 19
 Completed 18
 Completed 17
 Completed 16
 Completed 15
 Completed 14
 Completed 13
 Completed 12
 Completed 11
 Completed 10
 Completed 9
 Completed 8
 Completed 7
 Completed 6
 Completed 5
 Completed 4
 Completed 3
 Completed 2
 Completed 1
 Completed 0


The second option is a bit more complex, as shown in Example 3-3. This works by
calling ray.wait to find the next available future and iterating until all the futures
have been completed. ray.wait returns two lists, one of the object references for
completed tasks (of the size requested, which defaults to 1) and another list of the rest
of the object references.

In [4]:
"""performs lazy evaluation of futures in arbitrary order, 
which means it would wait until the completion of next future
 and pull results instead of waiting for all to complete, and
will iterate until all the futures have been resolved.
"""
def as_available():
 # Make the futures
 futures = list(map(lambda x: remote_task.remote(x), things))
 # While we still have pending futures
 while len(futures) > 0:
    ready_futures, rest_futures = ray.wait(futures)
    print(f"Ready {len(ready_futures)} rest {len(rest_futures)}")
    for id in ready_futures:
        print(f'completed value {id}, result {ray.get(id)}')
        time.sleep(1) # Business logic goes here
    # We just need to wait on the ones that are not yet available
    futures = rest_futures
as_available()

Ready 1 rest 19
completed value ObjectRef(a02c24b8b7fc0a31ffffffffffffffffffffffff0100000001000000), result 10
Ready 1 rest 18
completed value ObjectRef(347cc60e0bb3da74ffffffffffffffffffffffff0100000001000000), result 11
Ready 1 rest 17
completed value ObjectRef(88543757a8df6d2fffffffffffffffffffffffff0100000001000000), result 12
Ready 1 rest 16
completed value ObjectRef(bcb4fef46b376cafffffffffffffffffffffffff0100000001000000), result 13
Ready 1 rest 15
completed value ObjectRef(cae5e964086715a4ffffffffffffffffffffffff0100000001000000), result 14
Ready 1 rest 14
completed value ObjectRef(3d3e27c54ed1f5cfffffffffffffffffffffffff0100000001000000), result 15
Ready 1 rest 13
completed value ObjectRef(465c0fb8d6cb3cdcffffffffffffffffffffffff0100000001000000), result 16
Ready 1 rest 12
completed value ObjectRef(c76a79b2875a7251ffffffffffffffffffffffff0100000001000000), result 17
Ready 1 rest 11
completed value ObjectRef(dc746dc61b2c1923ffffffffffffffffffffffff0100000001000000), result 18
R

Running these functions side by side with timeit.time, you can see the difference
in performance. It’s important to note that this performance improvement depends
on how long the nonparallelized business logic (the logic in the loop) takes. If you’re
just summing the results, using ray.get directly could be OK, but if you’re doing
something more complex, you should use ray.wait. When we run this, we see that
ray.wait performs roughly twice as fast. You can try varying the sleep times and see
how it works out.<br/><br/>
You may wish to specify one of the few optional parameters to ray.wait:<br/><br/>
**num_returns**<br/><br/>
The number of ObjectRef objects for Ray to wait for completion before return‐
ing. You should set num_returns to less than or equal to the length of the input
list of ObjectRef objects; otherwise, the function throws an exception.2 The
default value is 1.<br/><br/>
**timeout**<br/><br/>
The maximum amount of time in seconds to wait before returning. This defaults
to −1 (which is treated as infinite).<br/><br/>
**fetch_local**<br/><br/>
You can disable fetching of results by setting this to false if you are interested
only in ensuring that the futures are completed.

Ray’s get and wait functions handle timeouts slightly differently. Ray doesn’t raise an
exception on ray.wait when a timeout occurs; instead, it simply returns fewer ready
futures than num_returns. However, if ray.get encounters a timeout, Ray will raise a
GetTimeoutError. Note that the return of the wait/get function does not mean that
your remote function will be terminated; it will still run in the dedicated process. You
can explicitly terminate your future (see the following tip) if you want to release the
resources

Since ray.wait can return results in any order, it’s essential to not
depend on the order of the results. If you need to do different
processing with different records (e.g., test a mix of group A and
group B), you should encode this in the result (often with types)

If you have a task that does not finish in a reasonable time (e.g., a straggler), you can
cancel the task by using ray.cancel with the same ObjectRef used to wait/get. You
can modify the previous ray.wait example to add a timeout and cancel any “bad”
tasks, resulting in something like Example 3-4.

In [5]:
import threading
futures = list(map(lambda x: remote_task.remote(x), [1, threading.TIMEOUT_MAX]))
# While we still have pending futures
while len(futures) > 0:
 # In practice, 10 seconds is too short for most cases
 ready_futures, rest_futures = ray.wait(futures, timeout=10, num_returns=1)
 # If we get back anything less than num_returns 
 if len(ready_futures) < 1:
    print(f"Timed out on {rest_futures}")
    # Canceling is a good idea for long-running, unneeded tasks
    ray.cancel(*rest_futures)
    # You should break since you exceeded your timeout
    break
 for id in ready_futures:
    print(f'completed value {id}, result {ray.get(id)}')
    futures = rest_futures


RayTaskError(OSError): [36mray::remote_task()[39m (pid=19728, ip=127.0.0.1)
  File "/var/folders/qj/nfsd826s231_h8sdz8nbsqqm0000gn/T/ipykernel_19711/2681013661.py", line 3, in remote_task
OSError: [Errno 22] Invalid argument

Canceling a task should not be part of your normal program flow.
If you find yourself having to frequently cancel tasks, you should
investigate what’s going on. Any subsequent calls to wait or get
for a canceled task are unspecified and could raise an exception or
return incorrect results.


Another minor point that we skipped in the previous chapter is that while the
examples so far return only a single value, Ray remote functions can return multiple
values, as with regular Python functions

Fault tolerance is an important consideration for those running in a distributed
environment. Say the worker executing the task dies unexpectedly (because either the
process crashed or the machine failed). Ray will rerun the task (after a delay) until
either the task succeeds or the maximum number of retries is exceeded. We cover
fault tolerance more in Chapter 5.

## Composition of Remote Ray Functions
You can make your remote functions even more powerful by composing them.
The two most common methods of composition with remote functions in Ray are
**pipelining** and **nested parallelism**. You can compose your functions with nested
parallelism to express recursive functions. Ray also allows you to express sequential
dependencies without having to block or collect the result in the driver, known as
pipelining.
You can build a pipelined function by using ObjectRef objects from an earlier
ray.remote as parameters for a new remote function call. Ray will automatically
fetch the ObjectRef objects and pass the underlying objects to your function. This
approach allows for easy coordination between the function invocations. Addition‐
ally, such an approach minimizes data transfer; the result will be sent directly to the
node where execution of the second remote function is executed. A simple example
of such a sequential calculation is presented in Example 3-5.

In [7]:
@ray.remote
def generate_number(s: int, limit: int, sl: float) -> int :
 random.seed(s)
 time.sleep(sl)
 return random.randint(0, limit)

@ray.remote
def sum_values(v1: int, v2: int, v3: int) -> int :
 return v1+v2+v3

# Get result
print(ray.get(sum_values.remote(generate_number.remote(1, 10, .1),
 generate_number.remote(5, 20, .2), generate_number.remote(7, 15, .3))))

31


This code defines two remote functions and then starts three instances of the first
one. ObjectRef objects for all three instances are then used as input for the second
function. In this case, Ray will wait for all three instances to complete before startingto execute sum_values. You can use this approach not only for passing data but
also for expressing basic workflow style dependencies. There is no restriction on the
number of ObjectRef objects you can pass, and you can also pass “normal” Python
objects at the same time.


**You cannot use Python structures (for example, lists, dictionaries, or classes) containing ObjectRef instead of using ObjectRef directly. Ray waits for and resolves only
ObjectRef objects that are passed directly to a function. If you attempt to pass a
structure, you will have to do your own ray.wait and ray.get inside the function.
Example 3-6 is a variation of Example 3-5 that does not work**

In [8]:
# Does not work -- Ray won't resolve any nested ObjectRefs
#tag::broken_ray_remote_seq[]
@ray.remote
def generate_number(s: int, limit: int, sl: float) -> int :
   random.seed(s)
   time.sleep(sl)
   return random.randint(0, limit)

@ray.remote
def sum_values(values: []) -> int :
   return sum(values)

# get result
print(ray.get(sum_values.remote([generate_number.remote(1, 10, .1),
       generate_number.remote(5, 20, .2), generate_number.remote(7, 15, .3)])))
#end::broken_ray_remote_seq[]

RayTaskError(TypeError): [36mray::sum_values()[39m (pid=19731, ip=127.0.0.1)
  File "/var/folders/qj/nfsd826s231_h8sdz8nbsqqm0000gn/T/ipykernel_19711/2718449464.py", line 11, in sum_values
TypeError: unsupported operand type(s) for +: 'int' and 'ray._raylet.ObjectRef'

Example 3-6 has been modified from Example 3-5 to take a list of ObjectRef
objects as parameters instead of ObjectRef objects themselves. Ray does not “look
inside” any structure being passed in. Therefore, the function will be invoked imme‐
diately, and since types won’t match, the function will fail with an err`or TypeError:
unsupported operand type(s) for +: 'int' and 'ray._raylet.ObjectR`ef'.
You could fix this error by using ray.wait and ray.get, but this would still launch
the function too early, resulting in unnecessary blocking

In another composition approach, nested parallelism, your remote function launches
additional remote functions. This can be useful in many cases, including imple‐
menting recursive algorithms and combining hyperparameter tuning with parallel
model training.4
 Let’s take a look at two ways to implement nested parallelism

In [9]:
"""
This code defines three remote functions:
`generate_numbers`
A simple function that generates random numbers
`remote_objrefs`
Invokes several remote functions and returns resulting ObjectRef objects
`remote_values`
Invokes several remote functions, waits for their completion, and returns the
resulting values

"""

#tag::nested_par[]
@ray.remote
def generate_number(s: int, limit: int) -> int :
   random.seed(s)
   time.sleep(.1)
   return random.randint(0, limit)

@ray.remote
def remote_objrefs():
   results = []
   for n in range(4):
       results.append(generate_number.remote(n, 4*n))
   return results

@ray.remote
def remote_values():
   results = []
   for n in range(4):
       results.append(generate_number.remote(n, 4*n))
   return ray.get(results)

print(ray.get(remote_values.remote()))
futures = ray.get(remote_objrefs.remote())
while len(futures) > 0:
    ready_futures, rest_futures = ray.wait(futures, timeout=600, num_returns=1)
    # If we get back anything less than num_returns there was a timeout
    if len(ready_futures) < 1:
        ray.cancel(*rest_futures)
        break
    for id in ready_futures:
        print(f'completed result {ray.get(id)}')
        futures = rest_futures
#end::nested_par[]

[0, 1, 0, 3]
completed result 1
completed result 0
completed result 0
completed result 3


As you can see from this example, nested parallelism allows for two approaches.
In the first case (remote_objrefs), you return all the ObjectRef objects to the
invoker of the aggregating function. The invoking code is responsible for waiting
for all the remote functions’ completion and processing the results. In the second
case (remote_values), the aggregating function waits for all the remote functions’
executions to complete and returns the actual execution results.

**Returning all of the ObjectRef objects allows for more flexibility with nonsequential
consumption, as described back in ray.await, but it is not suitable for many recur‐
sive algorithms. With many recursive algorithms (e.g., quicksort, factorial, etc.) we
have many levels of a combination step that need to be performed, requiring that the
results be combined at each level of recursion.**

## Ray Remote Best Practices
When you are using remote functions, keep in mind that you don’t want to make
them too small. If the tasks are very small, using Ray can take longer than if you used
Python without Ray. The reason for this is that every task invocation has a nontriv‐
ial overhead—for example, scheduling, data passing, inter-process communication
(IPC), and updating the system state. To get a real advantage from parallel execution,
you need to make sure that this overhead is negligible compared to the execution
time of the function itself.5

As described in this chapter, one of the most powerful features of Ray remote is
the ability to parallelize functions’ execution. Once you call the remote functions,
the handle to the remote object (future) is returned immediately, and the invoker
can continue execution either locally or with additional remote functions. If, at this
point, you call ray.get, your code will block, waiting for a remote function to
complete, and as a result, you will have no parallelism. **To ensure parallelization of
your code, you should invoke ray.get only at the point when you absolutely need
the data to continue the main thread of execution. Moreover, as we’ve described, it is
recommended to use ray.wait instead of ray.get directly. Additionally, if the result
of one remote function is required for the execution of another remote function(s),
consider using pipelining (described previously) to leverage Ray’s task coordination.**

**When you submit your parameters to remote functions, Ray does not submit them
directly to the remote function, but rather copies the parameters into object storage
and then passes ObjectRef as a parameter. As a result, if you send the same parame‐
ter to multiple remote functions, you are paying a (performance) penalty for storing
the same data to the object storage several times. The larger the size of the data, the
larger the penalty. To avoid this, if you need to pass the same data to multiple remote
functions, a better option is to first put the shared data in object storage and use the
resulting ObjectRef as a parameter to the function.**

 remote function invocation is done by the Raylet
component. If you invoke a lot of remote functions from a single client, all these
invocations are done by a single Raylet. Therefore, it takes a certain amount of time
for a given Raylet to process these requests, which can cause a delay in starting all
the functions. A better approach, as described in the “Ray Design Patterns” documen‐
tation, is to use an invocation tree—a nested function invocation as described in the
previous section. Basically, a client creates several remote functions, each of which, in
turn, creates more remote functions, and so on. In this approach, the invocations are
spread across multiple Raylets, allowing scheduling to happen faster. <br/><br/>
Every time you define a remote function by using the @ray.remote decorator, Ray
exports these definitions to all Ray workers, which takes time (especially if you have a
lot of nodes). To reduce the number of function exports, a good practice is to define
as many of the remote tasks on the top level outside the loops and local functions
using them

## Bringing It Together with an Example
ML models composed of other models (e.g., ensemble models) are well suited to
evaluation with Ray. Example 3-8 shows what it looks like to use Ray’s function
composition for a hypothetical spam model for web links.

In [11]:
@ray.remote
def fetch(url: str) -> Tuple[str, str]:
    import urllib.request
    with urllib.request.urlopen(url) as response:
       return (url, response.read())

@ray.remote
def has_spam(site_text: Tuple[str, str]) -> bool:
    # Open the list of spammers or download it
    spammers_url = (
        "https://raw.githubusercontent.com/matomo-org/referrer-spam-list/master/spammers.txt"
    )
    import urllib.request
    with urllib.request.urlopen(spammers_url) as response:
            spammers = response.readlines()
            for spammer in spammers:
                if spammer in site_text[1]:
                    return True
    return False
            
    
@ray.remote
def fake_spam1(us: Tuple[str, str]) -> bool:
    # You should do something fancy here with TF or even just NLTK
    time.sleep(10)
    if random.randrange(10) == 1:
        return True
    else:
        return False
    
@ray.remote
def fake_spam2(us: Tuple[str, str]) -> bool:
    # You should do something fancy here with TF or even just NLTK
    time.sleep(5)
    if random.randrange(10) > 4:
        return True
    else:
        return False
    
@ray.remote
def combine_is_spam(us: Tuple[str, str], model1: bool, model2: bool, model3: bool) -> Tuple[str, str, bool]:
    # Questionable fake ensemble
    score = model1 * 0.2 + model2 * 0.4 + model3 * 0.4
    if score > 0.2:
        return True
    else:
        return False
#end::bring_it_together_with_ensemble[]


# In[ ]:


urls = ["https://www.espncricinfo.com/", "http://www.google.com", "http://www.holdenkarau.com"]
site_futures = map(lambda url: fetch.remote(url), urls)
spam_futures = map(lambda us: [us, has_spam.remote(us), fake_spam1.remote(us), fake_spam2.remote(us)],
                   site_futures)
info_futures = map(lambda i: combine_is_spam.remote(*i), spam_futures)
                   
                   
not_ready = list(info_futures)
while len(not_ready) > 0:
    ready, not_ready = ray.wait(not_ready, num_returns = 1)
    if len(ready) < 1:
        raise Exception("Error fetching futures")
    print(ray.get(ready))

[True]
[False]
[False]


By using Ray instead of taking the summation of the time to evaluate all the models,
you instead need to wait for only the slowest model, and all other models that
finish faster are “free.” For example, if the models take equal lengths of time to run,
evaluating these models serially, without Ray, would take almost three times as long.