# Getting started with Ray

[Ray.io](https://www.ray.io/) is a distributed execution framework that makes it easy to scale your single machine applications, with little or no changes, and to leverage state-of-the-art machine learning libraries.

Ray provides a set of core low-level primitives as well as a family of pre-packaged libraries that take advantage of these primitives to enable solving powerful machine learning problems.

The following libraries come packaged with Ray:

* [Tune](https://docs.ray.io/en/master/tune/index.html): Scalable Hyperparameter Tuning

* [RaySGD](https://docs.ray.io/en/releases-1.11.0/raysgd/raysgd.html): Distributed Training Wrappers

* [RLlib](https://docs.ray.io/en/latest/rllib/index.html): Industry-Grade Reinforcement Learning

* [Ray](https://docs.ray.io/en/master/serve/index.html#rayserve) Serve: Scalable and Programmable Serving

Additionally, Ray has been adopted as a foundational framework by a large number of open source ML frameworks which now have community maintained Ray integrations. 

Domino can dynamically provision and orchestrate a Ray cluster directly on the infrastructure backing the Domino instance. This allows Domino users to get quick access to Ray without having to rely on their IT team. When you start a Domino workspace for interactive work or a Domino job for batch processing, Domino will create, manage for you, and make available to your execution a containerized Ray cluster.

Let's start by importing all the libraries needed for this notebook. Note that Ray is already included in our Compute Environment, so no additional installation is needed.

In [1]:
import ray
import ray.util
import os
import time
import random

When provisioning your on-demand Ray cluster, Domino sets up environment variables that hold the information needed to connect to your cluster. *RAY_HEAD_SERVICE_HOST* and *RAY_HEAD_SERVICE_PORT* hold the hostname and port of the Ray head node. We can pass this to *ray.init()* to establish a connection with the cluster.

In [2]:
if ray.is_initialized() == False:
   service_host = os.environ["RAY_HEAD_SERVICE_HOST"]
   service_port = os.environ["RAY_HEAD_SERVICE_PORT"]
   ray.init(f"ray://{service_host}:{service_port}")

Let's confirm that we are now connected to Ray.

In [3]:
ray.is_initialized()

True

Now let's check the health of the nodes, look at their CPU and GPU per node. Here you can see each node, including the head node. It's always a good idea to check this and plan your memory usuage with Ray.

In [4]:
ray.nodes()

[{'NodeID': '86ebdbdb38a064dd4409abaf5792806fed6bcf6d2a2bc7362287909e',
  'Alive': True,
  'NodeManagerAddress': '10.0.36.183',
  'NodeManagerHostname': 'ray-645a8c06935490051989b8bc-ray-worker-0',
  'NodeManagerPort': 2385,
  'ObjectManagerPort': 2384,
  'ObjectStoreSocketName': '/tmp/ray/session_2023-05-09_11-10-07_435787_1/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2023-05-09_11-10-07_435787_1/sockets/raylet',
  'MetricsExportPort': 41329,
  'alive': True,
  'Resources': {'memory': 2957327565.0,
   'CPU': 1.0,
   'node:10.0.36.183': 1.0,
   'object_store_memory': 1267426099.0}},
 {'NodeID': 'e2be0b1ca427d42cb82601c67ec47b0b0e89b91f5caf97ae65f2c0f0',
  'Alive': True,
  'NodeManagerAddress': '10.0.95.160',
  'NodeManagerHostname': 'ray-645a8c06935490051989b8bc-ray-head-0',
  'NodeManagerPort': 2385,
  'ObjectManagerPort': 2384,
  'ObjectStoreSocketName': '/tmp/ray/session_2023-05-09_11-10-07_435787_1/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_20

Domino also provides access to a dashboard (Web UI), which allows us to look at the cluster resources like CPU, Disk, and memory consumption.

Click on the "Ray Web UI" tab in the top right corner to access the Ray dashboard where you can see the cluster nodes, workloads, and available resources.

![Ray dashboard](images/ray_dashboard.png "Ray dashboard")

## A simple sort excercise

Let's define a simple Python bubble sort function. Here we make the choice of algorithm on purpose as it repeatedly steps through the input list element by element, hence it is quite slow.

In [5]:
def bubble_sort(to_sort):
    n = len(to_sort)
    for i in range(n):
        for j in range(n - 1):
            if to_sort[j] > to_sort[j+1]:
                to_sort[j], to_sort[j+1] = to_sort[j+1], to_sort[j]

Now let's run this function on a list of 3,000 randomly selected numbers and repeat the process 20 times. We'll time the execution and use it as a baseline.

In [6]:
start_time = time.time()
[bubble_sort(random.sample(range(1, 1000000), 3000)) for _ in range(20)]
print("--- %s seconds ---" % ((time.time() - start_time)))

--- 21.410213947296143 seconds ---


Let's see if Ray can improve on this run time.

## Task 1 - Convert *bubble_sort* into a Ray task

Ray enables us to execute standard Python functions asynchronously by turning them into Ray tasks (also called Remote Functions).

To do this, you decorate your function with *@ray.remote* to declare that you want to run this function remotely. Write a function called *bubble_sort_remote* that is identical to *bubble_sort* but is set up as a Ray task (i.e. decorated with *@ray.remote*)

In [7]:
@ray.remote
def bubble_sort_remote(to_sort):
    n = len(to_sort)
    for i in range(n):
        for j in range(n - 1):
            if to_sort[j] > to_sort[j+1]:
                to_sort[j], to_sort[j+1] = to_sort[j+1], to_sort[j]

Now call *bubble_sort_remote*, repeating the 3,000 elements x 20 sort. Remember that calling a Ray task is done via .remote() instead. For example, if you have a standard Python function, which you call as

```
f1(arg1, arg2)
```

then its Ray task version should be called as

```
f1.remote(arg1, arg2)
```

Don't forget to take a look at the Ray dashboard while *bubble_sort_remote* is executing.

In [8]:
start_time = time.time()
ray.get([bubble_sort_remote.remote(random.sample(range(1, 1000000), 3000)) for _ in range(20)])
print("--- %s seconds ---" % ((time.time() - start_time)))

--- 13.810055494308472 seconds ---


Note that the *remote()* call above creates an object reference (obj_ref) and a Ray task that is executed on a worker process. 
The **result** of the execution is then retrieved by calling *ray.get(obj_ref)*. 

# Task dependencies

Despite being asynchronous in nature, Ray tasks can still be dependent on other tasks. We could, for example, modify the call to *bubble_sort_remote* in the following way:

In [9]:
@ray.remote
def random_list(n=3000):
    return random.sample(range(1, 1000000), n)
                    
start_time = time.time()
ray.get([bubble_sort_remote.remote(random_list.remote()) for _ in range(20)])
print("--- %s seconds ---" % ((time.time() - start_time)))

--- 13.937318563461304 seconds ---


In this case, the random list creation is refactored into a separate Ray task, which is nested within the bubble_sort_remote call. Ray handles these situations transparently by building an internal dependency graph, so there is nothing special that we need to take care of. Just be mindful that in situations like this the actual sorting won't be executed before the random_list task has finished executing. This is generally the case for tasks that depend on each other.

In addition, the observant participant may ask, "Wait, I thought calling a Ray task returns an object reference, not the actual object. Don't I need to call *ray.get()* and pass that to *bubble_sort_remote.remote()*?" The answer is no; Ray does this step for us.

# Actors

So far we have looked at how to transform simple Python functions into Ray tasks. Actors further extend the API to Python classes. Similar to the transformation of functions, decorating a Python class with @ray.remote transforms it into a stateful actor. Every instance of a class decorated with *@ray.remote* results in a new process (actor) that Ray starts somewhere on the cluster. Every call to an instance method is executed as a Ray task, which can mutate the state of the actor. 

Let's look at an example. Here is a simple class that implements our sinking sort algorithm:

In [10]:
@ray.remote
class Bubble_Remote(object):

    def __init__(self):
      self.to_sort = self.shuffle()
    
    def shuffle(self):
        return random.sample(range(1, 1000000), 3000)
    
    def sort(self):
        n = len(self.to_sort)
        for i in range(n):
            for j in range(n - 1):
                if self.to_sort[j] > self.to_sort[j+1]:
                    self.to_sort[j], self.to_sort[j+1] = self.to_sort[j+1], self.to_sort[j]
                    
    def get_value(self):
        return self.to_sort

As you can see above, besides the decorator, there is nothing special about the class. The class encapsulates our *bubble_sort* method, a *shuffle* method that randomly initialises the *to_sort* class member, and one getter method for retrieving the sorted list. The latter is needed because we can't read fields in Ray actors directly. 

Using the code above is pretty straightforward, but pay attention to how the class is being instantiated:

In [11]:
bubble_remote = Bubble_Remote.remote()
print("Unsorted:", ray.get(bubble_remote.get_value.remote())[:10])
start_time = time.time()
bubble_remote.sort.remote()
print("Sorted:", ray.get(bubble_remote.get_value.remote())[:10])
print("--- %s seconds ---" % ((time.time() - start_time)))

Unsorted: [723075, 941639, 754831, 791515, 191244, 290100, 489127, 769594, 604909, 970304]
Sorted: [368, 418, 440, 886, 919, 1068, 1355, 1388, 1399, 1589]
--- 1.852614164352417 seconds ---


What about parallelisation? Let's do another 20 runs of shuffling and sorting, and check the wall clock.

In [12]:
start_time = time.time()
for _ in range(20):
    bubble_remote.shuffle.remote()
    ray.get(bubble_remote.sort.remote())
print("--- %s seconds ---" % ((time.time() - start_time)))

--- 24.511674404144287 seconds ---


OK, it appears that this is as slow as a normal single-threaded Python execution.

Let's terminate the *bubble_remote* actor as we'll need the resources it currently uses for what follows. You can optionally check the Logical View of the Ray Web UI tab before and after executing the next cell and see how the Alive value for BubbleRemote changes.

In [13]:
ray.kill(bubble_remote)

The reason for the slow execution above is because methods called on the same actor are executed serially in the order that they are called. Remember that actors are stateful, so Ray can't allow multiple remote functions to change class members out of order. This behaviour may look disappointing at first, but keep the following in mind:

* methods on different actors are executed in parallel
* actor handles can be passed to remote functions and other actors, and they can call each other

The above properties enable us to design highly complex execution graphs with a substantial degree of parallelism. Here is an example from the official [Ray documentation](https://docs.ray.io/) that illustrates building a tree of actors:

In [14]:
@ray.remote(num_cpus=.5)
class Worker:
    def work(self):
        return "done"

@ray.remote(num_cpus=.5)
class Supervisor:
    def __init__(self):
        self.workers = [Worker.remote() for _ in range(3)]
    def work(self):
        return ray.get([w.work.remote() for w in self.workers])

sup = Supervisor.remote()
print(ray.get(sup.work.remote()))  # outputs ['done', 'done', 'done']

['done', 'done', 'done']
