This output indicates that your Ray Cluster is up and running. As you can see from the first line of the output, Ray comes with its own, prepackaged dashboard.3 You can check it out at http://127.0.0.1:8265, unless your output shows a different port. You can take your time if you want to explore the dashboard. For instance, you should see all your CPU cores listed and the total utilization of your (trivial) Ray application. To see the resource utilization of your Ray Cluster in Python, you can simply call ray.cluster_resources().

In [3]:
import ray

In [4]:
import ray
ray.init()

2023-10-05 21:29:56,797	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8266 [39m[22m


0,1
Python version:,3.9.16
Ray version:,2.2.0
Dashboard:,http://127.0.0.1:8266


In [5]:
ray.cluster_resources()

{'CPU': 16.0,
 'object_store_memory': 1683072614.0,
 'memory': 3366145230.0,
 'node:100.100.137.165': 1.0}

In [6]:
import time

database = [  
    "Learning", "Ray",
    "Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]


def retrieve(item):
    time.sleep(item / 10.)  
    return item, database[item]

In [7]:
def print_runtime(input_data, start_time):
    print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
    print(*input_data, sep="\n")


start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data, start)

Runtime: 2.80 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


In [8]:
@ray.remote
def retrieve_task(item):
    return retrieve(item)

In [9]:
db_object_ref = ray.put(database)


@ray.remote
def retrieve_task(item, db):
    time.sleep(item / 10.)
    return item, db[item]

In [10]:
start = time.time()
object_references = [
    retrieve_task.remote(item, db_object_ref) for item in range(8)  
]
all_data = []

while len(object_references) > 0:  
    finished, object_references = ray.wait(  
        object_references, num_returns=2, timeout=7.0
    )
    data = ray.get(finished)
    print_runtime(data, start)  
    all_data.extend(data)

Runtime: 0.28 seconds, data:
(0, 'Learning')
(1, 'Ray')
Runtime: 0.48 seconds, data:
(2, 'Flexible')
(3, 'Distributed')
Runtime: 0.69 seconds, data:
(4, 'Python')
(5, 'for')
Runtime: 0.89 seconds, data:
(6, 'Machine')
(7, 'Learning')


In [11]:
@ray.remote
def follow_up_task(retrieve_result):  
    original_item, _ = retrieve_result
    follow_up_result = retrieve(original_item + 1)  
    return retrieve_result, follow_up_result  


retrieve_refs = [retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]]
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]  

result = [print(data) for data in ray.get(follow_up_refs)]

((0, 'Learning'), (1, 'Ray'))
((2, 'Flexible'), (3, 'Distributed'))
((4, 'Python'), (5, 'for'))
((6, 'Machine'), (7, 'Learning'))


Ray has the concept of actors. Actors allow you to run stateful computations on your cluster. They can also communicate between each other.10 Much like Ray tasks were simply decorated functions, Ray actors are decorated Python classes.

In [12]:
@ray.remote
class DataTracker:
    def __init__(self):
        self._counts = 0

    def increment(self):
        self._counts += 1

    def counts(self):
        return self._counts

In [13]:
@ray.remote
def retrieve_tracker_task(item, tracker, db):
    time.sleep(item / 10.)
    tracker.increment.remote()
    return item, db[item]


tracker = DataTracker.remote()

object_references = [
    retrieve_tracker_task.remote(item, tracker, db_object_ref) 
    for item in range(8)
]
data = ray.get(object_references)

print(data) 
print(ray.get(tracker.counts.remote()))

[(0, 'Learning'), (1, 'Ray'), (2, 'Flexible'), (3, 'Distributed'), (4, 'Python'), (5, 'for'), (6, 'Machine'), (7, 'Learning')]
8


## An Overview of the Ray Core API

If you recall what we did in the previous example, you’ll notice that we used a total of just six API methods.11 We used ray.init() to start the cluster and @ray.remote to turn functions and classes into tasks and actors. Then we used ray.put() to pass values into Ray’s object store and ray.get() to retrieve objects from the cluster. Finally, we used .remote() on actor methods or tasks to run code on our cluster, and ray.wait to avoid blocking calls.

While six API methods might not seem like much, those are the only ones you’ll likely ever care about when using the Ray API

Table 2-1. The six major API methods of Ray Core
API call	Description
ray.init()
Initializes your Ray Cluster. Pass in an address to connect to an existing cluster.
@ray.remote
Turns functions into tasks and classes into actors.
ray.put()
Puts values into Ray’s object store.
ray.get()
Gets values from the object store. Returns the values you’ve put there or that were computed by a task or actor.
.remote()
Runs actor methods or tasks on your Ray Cluster and is used to instantiate actors.
ray.wait()
Returns two lists of object references, one with finished tasks we’re waiting for and one with unfinished tasks.

### Ray Mapreduce

In [14]:
import subprocess
zen_of_python = subprocess.check_output(["python", "-c", "import this"])
corpus = zen_of_python.split()

num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [ 
    corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]

In [15]:
def map_function(document):
    for word in document.lower().split():
        yield word, 1

In [16]:
import ray

@ray.remote
def apply_map(corpus, num_partitions=3):
    map_results = [list() for _ in range(num_partitions)]  
    for document in corpus:
        for result in map_function(document):
            first_letter = result[0].decode("utf-8")[0]
            word_index = ord(first_letter) % num_partitions  
            map_results[word_index].append(result)  
    return map_results

In [17]:
map_results = [
    apply_map.options(num_returns=num_partitions)
    .remote(data, num_partitions)
    for data in partitions
]

for i in range(num_partitions):
    mapper_results = ray.get(map_results[i])
    for j, result in enumerate(mapper_results):
        print(f"Mapper {i}, return value {j}: {result[:2]}")

Mapper 0, return value 0: [(b'of', 1), (b'is', 1)]
Mapper 0, return value 1: [(b'python,', 1), (b'peters', 1)]
Mapper 0, return value 2: [(b'the', 1), (b'zen', 1)]
Mapper 1, return value 0: [(b'unless', 1), (b'in', 1)]
Mapper 1, return value 1: [(b'although', 1), (b'practicality', 1)]
Mapper 1, return value 2: [(b'beats', 1), (b'errors', 1)]
Mapper 2, return value 0: [(b'is', 1), (b'is', 1)]
Mapper 2, return value 1: [(b'although', 1), (b'a', 1)]
Mapper 2, return value 2: [(b'better', 1), (b'than', 1)]


In [18]:
@ray.remote
def apply_reduce(*results):  
    reduce_results = dict()
    for res in results:
        for key, value in res:
            if key not in reduce_results:
                reduce_results[key] = 0
            reduce_results[key] += value  

    return reduce_results

In [19]:
outputs = []
for i in range(num_partitions):
    outputs.append(  
        apply_reduce.remote(*[partition[i] for partition in map_results])
    )

counts = {k: v for output in ray.get(outputs) for k, v in output.items()}  

sorted_counts = sorted(counts.items(), key=lambda item: item[1], reverse=True)  
for count in sorted_counts:
    print(f"{count[0].decode('utf-8')}: {count[1]}")

is: 10
better: 8
than: 8
the: 6
to: 5
of: 3
although: 3
be: 3
unless: 2
one: 2
if: 2
implementation: 2
idea.: 2
special: 2
should: 2
do: 2
may: 2
a: 2
never: 2
way: 2
explain,: 2
ugly.: 1
implicit.: 1
complex.: 1
complex: 1
complicated.: 1
flat: 1
readability: 1
counts.: 1
cases: 1
rules.: 1
in: 1
face: 1
refuse: 1
one--: 1
only: 1
--obvious: 1
it.: 1
obvious: 1
first: 1
often: 1
*right*: 1
it's: 1
it: 1
idea: 1
--: 1
let's: 1
python,: 1
peters: 1
simple: 1
sparse: 1
dense.: 1
aren't: 1
practicality: 1
purity.: 1
pass: 1
silently.: 1
silenced.: 1
ambiguity,: 1
guess.: 1
and: 1
preferably: 1
at: 1
you're: 1
dutch.: 1
good: 1
are: 1
great: 1
more: 1
zen: 1
by: 1
tim: 1
beautiful: 1
explicit: 1
nested.: 1
enough: 1
break: 1
beats: 1
errors: 1
explicitly: 1
temptation: 1
there: 1
that: 1
not: 1
now: 1
never.: 1
now.: 1
hard: 1
bad: 1
easy: 1
namespaces: 1
honking: 1
those!: 1
