# A Quick Tour of Ray Core

In [1]:
import ray

## Ray Core is about...
* distributing computation across many cores, nodes, or devices (e.g., accelerators)
* scheduling *arbitrary task graphs*
    * any code you can write, you can distribute, scale, and accelerate with Ray Core
* manage the overhead
    * at scale, distributed computation introduces growing "frictions" -- data movement, scheduling costs, etc. -- which make the problem harder
    * Ray Core addresses these issues as first-order concerns in its design (e.g., via a distributed scheduler)
 
(And, of course, for common technical use cases, libraries and other components provide simple dev ex and are built on top of Ray Core)

## `@ray.remote` and `ray.get`

Define a Python function and decorate it so that Ray can schedule it

In [4]:
@ray.remote
def square(a):
    print(a)
    return a*a

Tell Ray to schedule the function

In [6]:
#square.remote(3)
square.remote(3)

2024-09-30 16:25:11,566	INFO worker.py:1601 -- Connecting to existing Ray cluster at address: 10.0.138.179:6379...
2024-09-30 16:25:11,572	INFO worker.py:1777 -- Connected to Ray cluster. View the dashboard at [1m[32mhttps://session-bimi5q6im9gb2znrbq9siuvzkr.i.anyscaleuserdata.com [39m[22m
2024-09-30 16:25:11,580	INFO packaging.py:359 -- Pushing file package 'gcs://_ray_pkg_e3d4896360926ce9868c5ec97811968d4f804d61.zip' (0.10MiB) to Ray cluster...
2024-09-30 16:25:11,581	INFO packaging.py:372 -- Successfully pushed file package 'gcs://_ray_pkg_e3d4896360926ce9868c5ec97811968d4f804d61.zip'.


ObjectRef(ca80f3a8e8ba2e50ffffffffffffffffffffffff0400000001000000)

[36m(square pid=4512, ip=10.0.162.180)[0m 3
[36m(square pid=5794, ip=10.0.162.180)[0m 7
[36m(square pid=5794, ip=10.0.162.180)[0m 11
[36m(autoscaler +10m51s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
[36m(square pid=5793, ip=10.0.162.180)[0m 5
[36m(square pid=5795, ip=10.0.162.180)[0m 4
[36m(square pid=5794, ip=10.0.162.180)[0m 2
[36m(square pid=5793, ip=10.0.162.180)[0m 2
[36m(square pid=5795, ip=10.0.162.180)[0m 4
[36m(square pid=5794, ip=10.0.162.180)[0m 5
[36m(square pid=5793, ip=10.0.162.180)[0m 4
[36m(square pid=5795, ip=10.0.162.180)[0m 2
[36m(square pid=5794, ip=10.0.162.180)[0m 5
[36m(square pid=5795, ip=10.0.162.180)[0m 4
[36m(square pid=5794, ip=10.0.162.180)[0m 3
[36m(square pid=5797, ip=10.0.162.180)[0m 5
[36m(square pid=5795, ip=10.0.162.180)[0m 4
[36m(square pid=5794, ip=10.0.162.180)[0m 5
[36m(square pid=5797, ip=10.0.162.180)[0m 3
[36m(square pid=5793, ip=10.0.

`ObjectRef` is a handle to a task result. We get an ObjectRef immediately because we don't know
* when the task will run
* whether it will succeed
* whether we really need or want the result locally
    * consider a very large result which we may need for other work but which we don't need to inspect

In [None]:
ref = square.remote(3)

If we want to wait (block) and retrieve the corresponding object, we can use `ray.get`

In [None]:
ray.get(ref)

<div class="alert alert-block alert-success">
    
__Activity: define and invoke a Ray task__

* Define a function that takes a two params, takes the square-root of the first, then adds the second and returns the result
* Invoke it with 2 different sets of parameters and collect the results

</div>

### Scheduling multiple tasks

In [8]:
@ray.remote
def spin():
    total = 0
    for i in range(1000):
        for j in range(1000):
            total += i*j
    return total

If we want to run this task many times, we want to
* invoke `.remote` for all invocations
* *if we wish to `get` a result, invoke get on all of the ObjectRefs*

In [16]:
%%time

out = ray.get([spin.remote() for _ in range(48)])

CPU times: user 60.7 ms, sys: 12 ms, total: 72.7 ms
Wall time: 207 ms


__Don't__ call `remote` to schedule each task, then block with a `get` on its result prior to scheduling the next task because then Ray can't run your work in parallel

i.e., don't do this:

In [17]:
%%time

out = [ray.get(spin.remote()) for _ in range(48)]

CPU times: user 63.7 ms, sys: 6.02 ms, total: 69.7 ms
Wall time: 1.94 s


### Task graphs

The above example is a common scenario, but it is also the easiest (least complex) scheduling scenario. Each task is independent of the others -- this is called "embarrassingly parallel"

Many real-world algorithms are not embarrassingly parallel: some tasks depend on results from one or more other tasks. Scheduling this graphs is more challenging.

Ray Core is designed to make this straightforward

In [18]:
@ray.remote
def add(a, b):
    return a+b

In [19]:
arg1 = square.remote(7)

arg1

ObjectRef(4e8873f2fe8b26caffffffffffffffffffffffff0400000001000000)

In [20]:
arg2 = square.remote(11)

We want to schedule `add` which depends on two prior invocations of `square`

We can pass the resulting ObjectRefs -- this means 
* we don't have to wait for the dependencies to complete before we can set up `add` for scheduling
* we don't need to have the concrete parameters (Python objects) for the call to `add.remote`
* Ray will automatically resolve the ObjectRefs -- our `add` implementation will never know that we passed ObjectRefs, not, e.g., numbers

In [24]:
out = add.remote(arg1, arg2)

In [25]:
ray.get(out)

170

If we happen to have concrete Python objects to pass -- instead of ObjectRefs -- we can use those. We can use any combination of objects and refs.

In [26]:
out2 = add.remote(arg1, 15)

ray.get(out2)

64

We can create more complex graphs by
- writing our code in the usual way
- decorating our functions with `@ray.remote`
- using `.remote` when we need to call a function
- using the resulting ObjectRefs and/or concrete values as parameters

In [36]:
@ray.remote
def mult(a,b):
    return a*b

Here, we call
* Mult on the result of
    * Square of 2 and
    * the sum we get from calling Add on
        * Square of 4 and
        * Square of 5

In [37]:
out3 = mult.remote(square.remote(2), add.remote(square.remote(4), square.remote(5)))

In [38]:
ray.get(out3)

164

<div class="alert alert-block alert-success">

__Activity: task graph refactor__

* Refactor the logic from your earlier Ray task (square-root and add) into two separate functions
* Invoke the square-root-and-add logic with without ever locally retrieving the result of the square-root calculation

</div>

### Tasks can launch other tasks

In that example, we organized or arranged the flow of tasks from our original process -- the Python kernel behind this notebook.

Ray __does not__ require that all of your tasks and their dependencies by arranged from one "driver" process.

Consider:

In [41]:
@ray.remote
def sum_of_squares(arr):
    return 2,sum(ray.get([square.remote(val) for val in arr]))

In [42]:
ray.get(sum_of_squares.remote([3,4,5]))

(2, 50)

In that example, 
* our (local) process asked Ray to schedule one task -- a call to `sum_of_squares` -- which that started running somewhere in our cluster;
* within that task, additional code requested multiple additional tasks to be scheduled -- the call to `square` for each item in the list -- which were then scheduled in other locations;
* and when those latter tasks were complete, the our original task computed the sum and completed.

This ability for tasks to schedule other tasks using uniform semantics makes Ray particularly powerful and flexible.

## Ray Actors

Actors are Python class instances which can run for a long time in the cluster, which can maintain state, and which can send messages to/from other code.

In these examples, we'll show the full power of Ray actors where they can mutate state -- but it is worth noting that a common use of actors is with state that is not mutated but is large enough that we may want to create or load it only once and ensure we can route calls to it over time, such as a large AI model

In [50]:
x = 100 

In [51]:
@ray.remote
class Accounting:
    def __init__(self):
        self.total = 0
    
    def add(self, amount):
        self.total += amount + x
        
    def remove(self, amount):
        self.total -= amount
        
    def total(self):
        return self.total

Define an actor with the `@ray.remote` decorator and then use `<class_name>.remote()` ask Ray to construct and instance of this actor somewhere in the cluster.

We get an actor handle which we can use to communicate with that actor, pass to other code, tasks, or actors, etc.

In [52]:
acc = Accounting.remote()

In [53]:
acc

Actor(Accounting, 0f2c6f499e7a7e25277c63b304000000)

We can send a message to an actor -- with RPC semantics -- by using `<handle>.<method_name>.remote()`

In [54]:
acc.total.remote()

ObjectRef(0ef82bf3334edf090f2c6f499e7a7e25277c63b30400000001000000)

Not surprisingly, we get an object ref back

In [55]:
ray.get(acc.total.remote())

0

We can mutate the state inside this actor instance

In [56]:
acc.add.remote(100)

ObjectRef(6586da9349a9e04c0f2c6f499e7a7e25277c63b30400000001000000)

In [62]:
[acc.add.remote(x) for x in range(1,100)]

[ObjectRef(c8dfef82d8e9b2150f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(05abfdb08c5429c30f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(66228ac107f2fbfb0f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(a1098043f13f761c0f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(507d62a3932f5eb00f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(6272834341f30a4e0f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(4d276113c16b1baa0f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(605962cb48074d6d0f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(8ad83ac1cc7806030f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(696a6ded780fe9a70f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(b15013664b0e9e1b0f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(b60699fbd93b1c530f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(1488bd5cb06effb90f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(e8986543e6f384bb0f2c6f499e7a7e25277c63b30400000001000000),
 ObjectRef(9a37ec068

In [60]:
acc.remove.remote(10)

ObjectRef(1b797561c6739b030f2c6f499e7a7e25277c63b30400000001000000)

In [63]:
ray.get(acc.total.remote())

29880

<div class="alert alert-block alert-success">

__Activity: linear model inference__

* Create an actor which applies a model to convert Celsius temperatures to Fahrenheit
* The constructor should take model weights (w1 and w0) and store them as instance state
* A convert method should take a scalar, multiply it by w1 then add w0 (weights retrieved from instance state) and then return the result

Bonus activity:
* Instead of passing weights as constructor params, pass a filepath to the constructor. In the constructor, retrieve the weights from the path.

</div>

And an actor can itself run remote tasks

In [65]:
@ray.remote
class EnhancedAccounting:
    def __init__(self):
        self.total = 0
    
    def add(self, amount):
        self.total += amount
        
    def remove(self, amount):
        self.total -= amount
        
    def total(self):
        return self.total
    
    def add_a_bunch(self, amount):
        bigger_amount = square.remote(amount)
        self.total += ray.get(bigger_amount)

In [66]:
acc = EnhancedAccounting.remote()
acc.add.remote(100)
acc.add_a_bunch.remote(5)

ObjectRef(7bd7587a5f7d2e3b92a888251e5befd9751436dd0400000001000000)

In [67]:
ray.get(acc.total.remote())

125

An actor can also instantiate and use other actors

In [68]:
@ray.remote
class TaxAccounting:
    def __init__(self):
        self.total = 0
        self.tax_account = Accounting.remote()
    
    def add(self, amount):
        self.total += amount/2
        self.tax_account.add.remote(amount/2)
        
    def remove(self, amount):
        self.total -= amount
        self.tax_account.remove.remote(amount/2)
        
    def total(self):
        tax_total = ray.get(self.tax_account.total.remote())
        return (self.total, tax_total)

In [69]:
acc = TaxAccounting.remote()
acc.add.remote(100)
acc.remove.remote(5)

ObjectRef(fee024a1e199be18cdd44759cfddb01c135f2fe50400000001000000)

In [71]:
ray.get(acc.total.remote())

(45.0, 147.5)

And this works regardless of which process creates the various actors.

That is, above the `TaxAccounting` actor created an `Accounting` actor as a helper.

## `ray.put`

As we've seen the results of tasks are in the Ray object store and the caller gets an object ref which can be used for many purposed. If the caller needs the actual object -- e.g., to implement from conditional logic based on the value -- it can use `ray.get`

In some cases, we may have a large object locally which we want to use in many Ray tasks.

The best practice for this is to put the object into the object store (once) to obtain an object ref which we can then use many times.

For example:

In [72]:
@ray.remote
def append(base, appendix):
    return base + " - " + appendix

In [73]:
ray.get(append.remote("foo", "bar"))

'foo - bar'

Now let's pretend that the `base` doc is some very large document

In [74]:
long_doc = """It was the best of times, it was the worst of times, 
it was the age of wisdom, it was the age of foolishness, it was the epoch of belief, it was the epoch of incredulity, 
it was the season of Light, it was the season of Darkness, it was the spring of hope, it was the winter of despair, 
we had everything before us, we had nothing before us, we were all going direct to Heaven, we were all going direct the other way
--in short, the period was so far like the present period that some of its noisiest authorities insisted on its being received, 
for good or for evil, in the superlative degree of comparison only."""

We call `ray.put` to obtain a ref that we can use multiple times

In [75]:
doc_ref = ray.put(long_doc)
doc_ref

ObjectRef(00ffffffffffffffffffffffffffffffffffffff0400000001e1f505)

In [76]:
append.remote(doc_ref, " (Charles Dickens)")

ObjectRef(bee0390cc952dc9cffffffffffffffffffffffff0400000001000000)

In [77]:
append.remote(doc_ref, " (Dickens 1859)")

ObjectRef(5b0c3d6b379ea472ffffffffffffffffffffffff0400000001000000)

In [78]:
ray.get(append.remote(doc_ref, '(A Tale of Two Cities)'))

'It was the best of times, it was the worst of times, \nit was the age of wisdom, it was the age of foolishness, it was the epoch of belief, it was the epoch of incredulity, \nit was the season of Light, it was the season of Darkness, it was the spring of hope, it was the winter of despair, \nwe had everything before us, we had nothing before us, we were all going direct to Heaven, we were all going direct the other way\n--in short, the period was so far like the present period that some of its noisiest authorities insisted on its being received, \nfor good or for evil, in the superlative degree of comparison only. - (A Tale of Two Cities)'

__Note: if we passed the Python object handle -- or even implicitly used a handle that is in our current scope chain -- the code would succeed, but performance might suffer__

E.g., this will work, but usually should be avoided when the object is large and/or used many times:

In [79]:
append.remote(long_doc, " (Dickens)")

ObjectRef(e12b8cc4954b5b50ffffffffffffffffffffffff0400000001000000)

this will also work ... but should also be avoided when the scope-chain object is large and/or used many times:

In [80]:
@ray.remote
def append_to_doc(appendix):
    return long_doc + " - " + appendix

In [81]:
append_to_doc.remote('foo')

ObjectRef(7b1f4cda30812bdaffffffffffffffffffffffff0400000001000000)

<div class="alert alert-block alert-success">

__Activity: object store and performance experiment__

* Create a Ray task which uses NumPy to multiply a (square 2-D) array by itself and returns the sum of the resulting array
* Starting with a small array (10x10), see how large the array must be before we can see a difference between
    * Using `ray.put` to place the array in the object store first, then supplying a reference to the Ray task
    * Passing a handle to the array itself as the parameter to the task

</div>

## Tracking the state of tasks

If we just want to inspect the state of a task that may or may not have successfully completed, we can call `.future()` to convert into a future as defined in `concurrent.futures` (Python 3.6+)

In [82]:
s1 = square.remote(1)

f = s1.future()

f.done()

False

By now it should be done

In [83]:
f.done()

True

In [84]:
f.result()

1

In [85]:
type(f)

concurrent.futures._base.Future

### Access to tasks as they are completed

We may submit a number of tasks and want to access their results -- perhaps to start additional computations -- as they complete.

That is, we don't want to wait for all of our initial tasks to finish, but we may need to wait for one or more to be done.

`ray.wait` blocks until 1 or more of the submitted object refs are complete and then returns a tuple or done and not-done refs

In [86]:
s2 = square.remote(2)
done, not_done = ray.wait([s1, s2])

done

[ObjectRef(6a5ca305caf3f443ffffffffffffffffffffffff0400000001000000)]

In [87]:
not_done

[ObjectRef(de3c08115f3e5a1affffffffffffffffffffffff0400000001000000)]

If we need to wait for more than one task to complete, we can specify that with the `num_returns` parameter

In [88]:
task_refs = [square.remote(i) for i in range(10)]

done, not_done = ray.wait(task_refs, num_returns=2)

done

[ObjectRef(ed2adb97ff2cc165ffffffffffffffffffffffff0400000001000000),
 ObjectRef(7d8e08d6d8a9b4edffffffffffffffffffffffff0400000001000000)]

In [89]:
len(not_done)

8