# Chapter 2: Getting Started with Ray Core

For a book on distributed Python, it's not without a certain irony that Python on its own is
largely ineffective for distributed computing.
Its interpreter is effectively single threaded which makes it difficult to e.g. leverage multiple CPUs on
the same machine, let alone a whole cluster of machines, using plain Python.
That means you need extra tooling, and luckily the Python ecosystem has some options for you.
For instance, libraries like `multiprocessing` can help you distribute work on a single machine, but not beyond.


You can run this notebook directly in
[Colab](https://colab.research.google.com/github/maxpumperla/learning_ray/blob/main/notebooks/ch_02_ray_core.ipynb):
<a target="_blank" href="https://colab.research.google.com/github/maxpumperla/learning_ray/blob/main/notebooks/ch_02_ray_core.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

The book has been written for Ray 2.2.0, which at the time of writing has not
officially been released yet. If you are reading this and this version is already
available, you can install it using `pip install ray==2.2.0`. If not, for this chapter
you can simply use:

In [None]:
! pip install ray>=2.1.0

Should you not run this notebook in Colab and need another type of wheel, please
refer to Ray's [installation instructions for nightlies](https://docs.ray.io/en/latest/ray-overview/installation.html#install-nightlies).

In [None]:
import ray
ray.init()

In [None]:
# tag::retrieve[]
import time

database = [  # <1>
    "Learning", "Ray",
    "Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]


def retrieve(item):
    time.sleep(item / 10.)  # <2>
    return item, database[item]
# end::retrieve[]

In [None]:
# tag::duration[]
def print_runtime(input_data, start_time):
    print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
    print(*input_data, sep="\n")


start = time.time()
data = [retrieve(item) for item in range(8)]  # <1>
print_runtime(data, start)  # <2>
# end::duration[]

In [None]:
# tag::remote[]
@ray.remote  # <1>
def retrieve_task(item):
    return retrieve(item)  # <2>
# end::remote[]

In [None]:
# tag::duration_remote[]
start = time.time()
object_references = [  # <1>
    retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)  # <2>
print_runtime(data, start)
# end::duration_remote[]

In [None]:
# tag::object_store[]
db_object_ref = ray.put(database)  # <1>


@ray.remote
def retrieve_task(item, db):  # <2>
    time.sleep(item / 10.)
    return item, db[item]
# end::object_store[]

In [None]:
# tag::duration_object_store[]
start = time.time()
object_references = [
    retrieve_task.remote(item, db_object_ref) for item in range(8)  # <1>
]
all_data = []

while len(object_references) > 0:  # <2>
    finished, object_references = ray.wait(  # <3>
        object_references, num_returns=2, timeout=7.0
    )
    data = ray.get(finished)
    print_runtime(data, start)  # <4>
    all_data.extend(data)  # <5>

# end::duration_object_store[]
print_runtime(all_data, start)

In [None]:
# tag::task_dependency[]
@ray.remote
def follow_up_task(retrieve_result):  # <1>
    original_item, _ = retrieve_result
    follow_up_result = retrieve(original_item + 1)  # <2>
    return retrieve_result, follow_up_result  # <3>


retrieve_refs = [retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]]
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]  # <4>

result = [print(data) for data in ray.get(follow_up_refs)]
# end::task_dependency[]

In [None]:
# tag::actors[]
@ray.remote  # <1>
class DataTracker:
    def __init__(self):
        self._counts = 0

    def increment(self):
        self._counts += 1

    def counts(self):
        return self._counts
# end::actors[]

In [None]:
# tag::actors_remote[]
@ray.remote
def retrieve_tracker_task(item, tracker, db):  # <1>
    time.sleep(item / 10.)
    tracker.increment.remote()  # <2>
    return item, db[item]


tracker = DataTracker.remote()  # <3>

object_references = [  # <4>
    retrieve_tracker_task.remote(item, tracker, db_object_ref) for item in range(8)
]
data = ray.get(object_references)

print(data)
print(ray.get(tracker.counts.remote()))  # <5>
# end::actors_remote[]

In [None]:
# tag::ownership[]
@ray.remote
def task_owned():
    return


@ray.remote
def task(dependency):
    res_owned = task_owned.remote()
    return


val = ray.put("value")
res = task.remote(dependency=val)
# end::ownership[]

# ![Task dependency](https://raw.githubusercontent.com/maxpumperla/learning_ray/main/notebooks/images/chapter_02/task_dependency.png)

# ![Worker Node](https://raw.githubusercontent.com/maxpumperla/learning_ray/main/notebooks/images/chapter_02/worker_node.png)

# ![Ray architecture](https://raw.githubusercontent.com/maxpumperla/learning_ray/main/notebooks/images/chapter_02/architecture.png)

In [None]:
# tag::load_data[]
import subprocess
zen_of_python = subprocess.check_output(["python", "-c", "import this"])
corpus = zen_of_python.split()  # <1>

num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [  # <2>
    corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]
# end::load_data[]

In [None]:
# tag::map_fct[]
def map_function(document):
    for word in document.lower().split():
        yield word, 1
# end::map_fct[]

In [None]:
# tag::map[]
import ray

@ray.remote
def apply_map(corpus, num_partitions=3):
    map_results = [list() for _ in range(num_partitions)]  # <1>
    for document in corpus:
        for result in map_function(document):
            first_letter = result[0].decode("utf-8")[0]
            word_index = ord(first_letter) % num_partitions  # <2>
            map_results[word_index].append(result)  # <3>
    return map_results
# end::map[]

In [None]:
# tag::apply_mr_1[]
map_results = [
    apply_map.options(num_returns=num_partitions)  # <1>
    .remote(data, num_partitions)  # <2>
    for data in partitions  # <3>
]

for i in range(num_partitions):
    mapper_results = ray.get(map_results[i])  # <4>
    for j, result in enumerate(mapper_results):
        print(f"Mapper {i}, return value {j}: {result[:2]}")
# end::apply_mr_1[]

In [None]:
# tag::reduce[]
@ray.remote
def apply_reduce(*results):  # <1>
    reduce_results = dict()
    for res in results:
        for key, value in res:
            if key not in reduce_results:
                reduce_results[key] = 0
            reduce_results[key] += value  # <2>

    return reduce_results
# end::reduce[]

In [None]:
# tag::apply_mr_2[]
outputs = []
for i in range(num_partitions):
    outputs.append(  # <1>
        apply_reduce.remote(*[partition[i] for partition in map_results])
    )

counts = {k: v for output in ray.get(outputs) for k, v in output.items()}  # <2>

sorted_counts = sorted(counts.items(), key=lambda item: item[1], reverse=True)  # <3>
for count in sorted_counts:
    print(f"{count[0].decode('utf-8')}: {count[1]}")
# end::apply_mr_2[]