# AGH DS Laboratory 3 - Actor Model with Ray Framework

## Introduction

Ray is a general-purpose framework for programming a cluster made by UC Berkeley's RISELab. It enables developers to easily parallelize their Python applications or build new ones, and run them at any scale, from a laptop to a large cluster. It also provides a highly flexible, yet minimalist and easy to use API. 

#### Documentation Reference Links:

Ray official website: https://rise.cs.berkeley.edu/projects/ray/

Ray documentation website: http://ray.readthedocs.io/en/latest/

GitHub repository: https://github.com/ray-project/ray

### Installation
Please follow the instructions:

Installation: https://docs.ray.io/en/latest/ray-overview/installation.html


***
## Part 1 - Remote Functions

This script is too slow, and the computation is embarrassingly parallel. In this exercise, you will use Ray to execute the functions in parallel to speed it up.

The standard way to turn a Python function into a remote function is to add the `@ray.remote` decorator. Here is an example.

```python
# A regular Python function.
def regular_function(x):
    return x + 1

# A Ray remote function.
@ray.remote
def remote_function(x):
    return x + 1
```

The differences are the following:

1. **Invocation:** The regular version is called with `regular_function(1)`, whereas the remote version is called with `remote_function.remote(1)`.
2. **Return values:** `regular_function` immediately executes and returns `1`, whereas `remote_function` immediately returns an object ID (a future) and then creates a task that will be executed on a worker process. The result can be obtained with `ray.get`.
    ```python
    >>> regular_function(0)
    1
    
    >>> remote_function.remote(0)
    ObjectID(1c80d6937802cd7786ad25e50caf2f023c95e350)
    
    >>> ray.get(remote_function.remote(0))
    1
    ```
3. **Parallelism:** Invocations of `regular_function` happen **serially**, for example
    ```python
    # These happen serially.
    for _ in range(4):
        regular_function(0)
    ```
    whereas invocations of `remote_function` happen in **parallel**, for example
    ```python
    # These happen in parallel.
    for _ in range(4):
        remote_function.remote(0)
    ```

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
import time
import numpy as np
from numpy import random
import os
import pickle

2024-04-21 15:49:19,681	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


Start Ray. By default, Ray does not schedule more tasks concurrently than there are CPUs. This example requires four tasks to run concurrently, so we tell Ray that there are four CPUs. Usually this is not done and Ray computes the number of CPUs using `psutil.cpu_count()`. The argument `ignore_reinit_error=True` just ignores errors if the cell is run multiple times.

The call to `ray.init` starts a number of processes.

In [2]:
if ray.is_initialized:
    ray.shutdown()
ray.init(ignore_reinit_error=True)

2024-04-21 15:49:27,548	INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.8
Ray version:,2.10.0
Dashboard:,http://127.0.0.1:8265


In [7]:
# This function is a proxy for a more interesting and computationally
# intensive function.

@ray.remote
def slow_function(i):
    time.sleep(1)
    return i

**EXERCISE:** The loop below takes too long. The four function calls could be executed in parallel. Instead of four seconds, it should only take one second. Once `slow_function` has been made a remote function, execute these four tasks in parallel by calling `slow_function.remote()`. Then obtain the results by calling `ray.get` on a list of the resulting object IDs.

In [9]:
# Sleep a little to improve the accuracy of the timing measurements below.
# We do this because workers may still be starting up in the background.
time.sleep(2.0)
start_time = time.time()

# results = [slow_function(i) for i in range(4)]
ids = [slow_function.remote(i) for i in range(4)]
results = ray.get(ids)

end_time = time.time()
duration = end_time - start_time

print('The results are {}. This took {} seconds. Run the next cell to see '
      'if the exercise was done correctly.'.format(results, duration))

The results are [0, 1, 2, 3]. This took 1.0068161487579346 seconds. Run the next cell to see if the exercise was done correctly.


**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [10]:
assert results == [0, 1, 2, 3], 'Did you remember to call ray.get?'
assert duration < 1.1, ('The loop took {} seconds. This is too slow.'
                        .format(duration))
assert duration > 1, ('The loop took {} seconds. This is too fast.'
                      .format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 1.0068161487579346 seconds.


** Example Generating Fibonnaci series **
Let's define two functions: one runs locally or serially, the other runs on a Ray cluster (local or remote). Please implment it

In [12]:
# Function for local fibonacci computation
def generate_fibonacci(sequence_size):
    # return length of table with generated fibonnachi numbers
    # do not use recursion, reuse allocated table
    # DO NOT RETURN ENTIRE TABLE
    fib_table = [0, 1]
    for i in range(2, sequence_size):
        fib_table.append(fib_table[i - 1] + fib_table[i - 2])
    return len(fib_table)

# Function for remote Ray task with just a wrapper
@ray.remote
def generate_fibonacci_distributed(sequence_size):
    return generate_fibonacci(sequence_size)

In [13]:
# Get the number of cores for better parralelism
os.cpu_count()

8

In [14]:
# Normal Python in a single process
def run_local(sequence_size):
    results = [generate_fibonacci(sequence_size) for _ in range(os.cpu_count())]
    return results

SEQUENCE_SIZE = 100000
%time run_local(SEQUENCE_SIZE)


CPU times: user 2.57 s, sys: 3 s, total: 5.57 s
Wall time: 5.62 s


[100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000]

In [15]:
# Distributed on a Ray cluster
def run_remote(sequence_size):
    results = ray.get([generate_fibonacci_distributed.remote(sequence_size) for _ in range(os.cpu_count())])
    return results

%time run_remote(SEQUENCE_SIZE)

CPU times: user 33.3 ms, sys: 14.2 ms, total: 47.4 ms
Wall time: 2.13 s


[100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000]

***
## Part 2 - Parallel Data Processing with Task Dependencies

**GOAL:** The goal of this exercise is to show how to pass object IDs into remote functions to encode dependencies between tasks.

In this exercise, we construct a sequence of tasks each of which depends on the previous mimicking a data parallel application. Within each sequence, tasks are executed serially, but multiple sequences can be executed in parallel.

In this exercise, you will use Ray to parallelize the computation below and speed it up.

### Concept for this Exercise - Task Dependencies

Suppose we have two remote functions defined as follows.

```python
@ray.remote
def f(x):
    return x
```

Arguments can be passed into remote functions as usual.

```python
>>> x1_id = f.remote(1)
>>> ray.get(x1_id)
1

>>> x2_id = f.remote([1, 2, 3])
>>> ray.get(x2_id)
[1, 2, 3]
```

**Object IDs** can also be passed into remote functions. When the function actually gets executed, **the argument will be a retrieved as a regular Python object**.

```python
>>> y1_id = f.remote(x1_id)
>>> ray.get(y1_id)
1

>>> y2_id = f.remote(x2_id)
>>> ray.get(y2_id)
[1, 2, 3]
```

So when implementing a remote function, the function should expect a regular Python object regardless of whether the caller passes in a regular Python object or an object ID.

**Task dependencies affect scheduling.** In the example above, the task that creates `y1_id` depends on the task that creates `x1_id`. This has the following implications.

- The second task will not be executed until the first task has finished executing.
- If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to `x1_id`) will be copied over the network to the machine where the second task is scheduled.


These are some helper functions that mimic an example pattern of a data parallel application.

**EXERCISE:** You will need to turn all of these functions into remote functions. When you turn these functions into remote function, you do not have to worry about whether the caller passes in an object ID or a regular object. In both cases, the arguments will be regular objects when the function executes. This means that even if you pass in an object ID, you **do not need to call `ray.get`** inside of these remote functions.

In [16]:
@ray.remote
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

@ray.remote
def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

@ray.remote
def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])

@ray.remote
def compute_loss(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

assert hasattr(load_data, 'remote'), 'load_data must be a remote function'
assert hasattr(normalize_data, 'remote'), 'normalize_data must be a remote function'
assert hasattr(extract_features, 'remote'), 'extract_features must be a remote function'
assert hasattr(compute_loss, 'remote'), 'compute_loss must be a remote function'

**EXERCISE:** The loop below takes too long. Parallelize the four passes through the loop by turning `load_data`, `normalize_data`, `extract_features`, and `compute_loss` into remote functions and then retrieving the losses with `ray.get`.

**NOTE:** You should only use **ONE** call to `ray.get`. For example, the object ID returned by `load_data` should be passed directly into `normalize_data` without needing to be retrieved by the driver.

In [18]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

losses = []
for filename in ['file1', 'file2', 'file3', 'file4']:
    inner_start = time.time()

    data = load_data.remote(filename)
    normalized_data = normalize_data.remote(data)
    features = extract_features.remote(normalized_data)
    loss = compute_loss.remote(features)
    losses.append(loss)
    
    inner_end = time.time()
    
    if inner_end - inner_start >= 0.1:
        raise Exception('You may be calling ray.get inside of the for loop! '
                        'Doing this will prevent parallelism from being exposed. '
                        'Make sure to only call ray.get once outside of the for loop.')

print('The losses are {}.'.format(losses) + '\n')
loss = sum(ray.get(losses))

end_time = time.time()
duration = end_time - start_time

print('The loss is {}. This took {} seconds. Run the next cell to see '
      'if the exercise was done correctly.'.format(loss, duration))

The losses are [ObjectRef(1e360ffa862f8fe3ffffffffffffffffffffffff0100000001000000), ObjectRef(465c0fb8d6cb3cdcffffffffffffffffffffffff0100000001000000), ObjectRef(88543757a8df6d2fffffffffffffffffffffffff0100000001000000), ObjectRef(79cc316456d39201ffffffffffffffffffffffff0100000001000000)].

The loss is 4000.0. This took 0.46883296966552734 seconds. Run the next cell to see if the exercise was done correctly.


**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [19]:
assert loss == 4000
assert duration < 0.8, ('The loop took {} seconds. This is too slow.'
                        .format(duration))
assert duration > 0.4, ('The loop took {} seconds. This is too fast.'
                        .format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 0.46883296966552734 seconds.


***
## Part 3 - Introducing Actors

**Goal:** The goal of this exercise is to show how to create an actor and how to call actor methods.

See the documentation on actors at https://docs.ray.io/en/latest/ray-core/actors.html.

Sometimes you need a "worker" process to have "state". For example, that state might be a neural network, a simulator environment, a counter, or something else entirely. However, remote functions are side-effect free. That is, they operate on inputs and produce outputs, but they don't change the state of the worker they execute on.

Actors are different. When we instantiate an actor, a brand new worker is created, and all methods that are called on that actor are executed on the newly created worker.

This means that with a single actor, no parallelism can be achieved because calls to the actor's methods will be executed one at a time. However, multiple actors can be created and methods can be executed on them in parallel.

### Concepts for this Exercise - Actors

To create an actor, decorate Python class with the `@ray.remote` decorator.

```python
@ray.remote
class Example(object):
    def __init__(self, x):
        self.x = x
    
    def set(self, x):
        self.x = x
    
    def get(self):
        return self.x
```

Like regular Python classes, **actors encapsulate state that is shared across actor method invocations**.

Actor classes differ from regular Python classes in the following ways.
1. **Instantiation:** A regular class would be instantiated via `e = Example(1)`. Actors are instantiated via
    ```python
    e = Example.remote(1)
    ```
    When an actor is instantiated, a **new worker process** is created by a local scheduler somewhere in the cluster.
2. **Method Invocation:** Methods of a regular class would be invoked via `e.set(2)` or `e.get()`. Actor methods are invoked differently.
    ```python
    >>> e.set.remote(2)
    ObjectID(d966aa9b6486331dc2257522734a69ff603e5a1c)
    
    >>> e.get.remote()
    ObjectID(7c432c085864ed4c7c18cf112377a608676afbc3)
    ```
3. **Return Values:** Actor methods are non-blocking. They immediately return an object ID and **they create a task which is scheduled on the actor worker**. The result can be retrieved with `ray.get`.
    ```python
    >>> ray.get(e.set.remote(2))
    None
    
    >>> ray.get(e.get.remote())
    2
    ```

**EXERCISE:** Change the `Foo` class to be an actor class by using the `@ray.remote` decorator.

In [21]:
@ray.remote
class Foo(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter

assert hasattr(Foo, 'remote'), 'You need to turn "Foo" into an actor with @ray.remote.'

**EXERCISE:** Change the intantiations below to create two actors by calling `Foo.remote()`.

In [23]:
# Create two Foo objects.
f1 = Foo.remote()
f2 = Foo.remote()

**EXERCISE:** Parallelize the code below. The two actors can execute methods in parallel (though each actor can only execute one method at a time).

In [24]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# Reset the actor state so that we can run this cell multiple times without
# changing the results.
f1.reset.remote()
f2.reset.remote()

# We want to parallelize this code. However, it is not straightforward to
# make "increment" a remote function, because state is shared (the value of
# "self.counter") between subsequent calls to "increment". In this case, it
# makes sense to use actors.
results = []
for _ in range(5):
    results.append(f1.increment.remote())
    results.append(f2.increment.remote())
results = ray.get(results)

end_time = time.time()
duration = end_time - start_time

print('Success! The example took {} seconds.'.format(duration))

assert not any([isinstance(result, ray.ObjectID) for result in results]), 'Looks like "results" is {}. You may have forgotten to call ray.get.'.format(results)

Success! The example took 2.897522449493408 seconds.


**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [25]:
assert results == [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]

assert duration < 3, ('The experiments ran in {} seconds. This is too '
                      'slow.'.format(duration))
assert duration > 2.5, ('The experiments ran in {} seconds. This is too '
                        'fast.'.format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 2.897522449493408 seconds.


***
## Part 4 - Handling Slow Tasks

**GOAL:** The goal of this exercise is to show how to use `ray.wait` to avoid waiting for slow tasks.

See the documentation for ray.wait at https://docs.ray.io/en/latest/ray-core/api/doc/ray.wait.html.

This script starts 6 tasks, each of which takes a random amount of time to complete. We'd like to process the results in two batches (each of size 3). Change the code so that instead of waiting for a fixed set of 3 tasks to finish, we make the first batch consist of the first 3 tasks that complete. The second batch should consist of the 3 remaining tasks. Do this exercise by using `ray.wait`.

### Concepts for this Exercise - ray.wait

After launching a number of tasks, you may want to know which ones have finished executing. This can be done with `ray.wait`. The function works as follows.

```python
ready_ids, remaining_ids = ray.wait(object_ids, num_returns=1, timeout=None)
```

**Arguments:**
- `object_ids`: This is a list of object IDs.
- `num_returns`: This is maximum number of object IDs to wait for. The default value is `1`.
- `timeout`: This is the maximum amount of time in milliseconds to wait for. So `ray.wait` will block until either `num_returns` objects are ready or until `timeout` milliseconds have passed.

**Return values:**
- `ready_ids`: This is a list of object IDs that are available in the object store.
- `remaining_ids`: This is a list of the IDs that were in `object_ids` but are not in `ready_ids`, so the IDs in `ready_ids` and `remaining_ids` together make up all the IDs in `object_ids`.

Define a remote function that takes a variable amount of time to run.

In [29]:
@ray.remote
def f(i):
    np.random.seed(5 + i)
    x = np.random.uniform(0, 4)
    time.sleep(x)
    return i, time.time()

**EXERCISE:** Using `ray.wait`, change the code below so that `initial_results` consists of the outputs of the first three tasks to complete instead of the first three tasks that were submitted.

In [30]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# This launches 6 tasks, each of which takes a random amount of time to
# complete.
result_ids = [f.remote(i) for i in range(6)]
# Get one batch of tasks. Instead of waiting for a fixed subset of tasks, we
# should instead use the first 3 tasks that finish.

initial_results, remaining_ids = ray.wait(result_ids, num_returns=3, timeout=None)
initial_results = ray.get(initial_results)

end_time = time.time()
duration = end_time - start_time

**EXERCISE:** Change the code below so that `remaining_results` consists of the outputs of the last three tasks to complete.

In [33]:
# Wait for the remaining tasks to complete.
remaining_results = ray.get(remaining_ids)

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [34]:
assert len(initial_results) == 3
assert len(remaining_results) == 3

initial_indices = [result[0] for result in initial_results]
initial_times = [result[1] for result in initial_results]
remaining_indices = [result[0] for result in remaining_results]
remaining_times = [result[1] for result in remaining_results]

assert set(initial_indices + remaining_indices) == set(range(6))

assert duration < 1.5, ('The initial batch of ten tasks was retrieved in '
                        '{} seconds. This is too slow.'.format(duration))

assert duration > 0.8, ('The initial batch of ten tasks was retrieved in '
                        '{} seconds. This is too fast.'.format(duration))

# Make sure the initial results actually completed first.
assert max(initial_times) < min(remaining_times)

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 0.9011986255645752 seconds.


## Part 5 - Speed up Serialization

**GOAL:** The goal of this exercise is to illustrate how to speed up serialization by using `ray.put`.

### Concepts for this Exercise - ray.put

Object IDs can be created in multiple ways.
- They are returned by remote function calls.
- They are returned by actor method calls.
- They are returned by `ray.put`.

When an object is passed to `ray.put`, the object is serialized using the Apache Arrow format (see https://arrow.apache.org/ for more information about Arrow) and copied into a shared memory object store. This object will then be available to other workers on the same machine via shared memory. If it is needed by workers on another machine, it will be shipped under the hood.

**When objects are passed into a remote function, Ray puts them in the object store under the hood.** That is, if `f` is a remote function, the code

```python
x = np.zeros(1000)
f.remote(x)
```

is essentially transformed under the hood to

```python
x = np.zeros(1000)
x_id = ray.put(x)
f.remote(x_id)
```

The call to `ray.put` copies the numpy array into the shared-memory object store, from where it can be read by all of the worker processes (without additional copying). However, if you do something like

```python
for i in range(10):
    f.remote(x)
```

then 10 copies of the array will be placed into the object store. This takes up more memory in the object store than is necessary, and it also takes time to copy the array into the object store over and over. This can be made more efficient by placing the array in the object store only once as follows.

```python
x_id = ray.put(x)
for i in range(10):
    f.remote(x_id)
```

In this exercise, you will speed up the code below and reduce the memory footprint by calling `ray.put` on the neural net weights before passing them into the remote functions.

**WARNING:** This exercise requires a lot of memory to run. If this notebook is running within a Docker container, then the docker container must be started with a large shared-memory file system. This can be done by starting the docker container with the `--shm-size` flag.

In [35]:
neural_net_weights = {'variable{}'.format(i): np.random.normal(size=1000000)
                      for i in range(50)}

**EXERCISE:** Compare the time required to serialize the neural net weights and copy them into the object store using Ray versus the time required to pickle and unpickle the weights. The big win should be with the time required for *deserialization*.

Note that when you call `ray.put`, in addition to serializing the object, we are copying it into shared memory where it can be efficiently accessed by other workers on the same machine.

**NOTE:** You don't actually have to do anything here other than run the cell below and read the output.

**NOTE:** Sometimes `ray.put` can be faster than `pickle.dumps`. This is because `ray.put` leverages multiple threads when serializing large objects. Note that this is not possible with `pickle`.

In [36]:
print('Ray - serializing')
%time x_id = ray.put(neural_net_weights)
print('\nRay - deserializing')
%time x_val = ray.get(x_id)

print('\npickle - serializing')
%time serialized = pickle.dumps(neural_net_weights)
print('\npickle - deserializing')
%time deserialized = pickle.loads(serialized)

Ray - serializing
CPU times: user 0 ns, sys: 592 ms, total: 592 ms
Wall time: 157 ms

Ray - deserializing
CPU times: user 0 ns, sys: 1.62 ms, total: 1.62 ms
Wall time: 1.16 ms

pickle - serializing
CPU times: user 245 ms, sys: 1.07 s, total: 1.31 s
Wall time: 1.32 s

pickle - deserializing
CPU times: user 144 ms, sys: 363 ms, total: 508 ms
Wall time: 510 ms


Define a remote function which uses the neural net weights.

In [37]:
@ray.remote
def use_weights(weights, i):
    len(weights)
    return i

**EXERCISE:** In the code below, use `ray.put` to avoid copying the neural net weights to the object store multiple times.

In [38]:
# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# results = ray.get([use_weights.remote(neural_net_weights, i)
                #    for i in range(20)])
neural_net_weights_id = ray.put(neural_net_weights)
results = ray.get([use_weights.remote(neural_net_weights_id, i) for i in range(20)])


end_time = time.time()
duration = end_time - start_time

**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass.

In [39]:
assert results == list(range(20))
assert duration < 1, ('The experiments ran in {} seconds. This is too '
                      'slow.'.format(duration))

print('Success! The example took {} seconds.'.format(duration))

Success! The example took 0.3575100898742676 seconds.


In [40]:
# Example: Parameter Server distributed application with Ray Actors
# Problem: We want to update weights and gradients, computed by workers, at a central server.
# Let's use Python class and convert that to a remote Actor class actor as a Parameter Server.
# This is a common example in machine learning where you have a central
# Parameter server updating gradients from other worker processes computing individual gradients.

print('parameter server')
@ray.remote
class ParameterSever:
    def __init__(self):
        # Initialized our gradients to zero
        self.params = np.zeros(10)

    def get_params(self):
        # Return current gradients
        return self.params

    def update_params(self, grad):
        # Update the gradients
        self.params -= grad

# Define a worker or task as a function for a remote Worker. This could be a
# machine learning function that computes gradients and sends them to
# the parameter server.

@ray.remote
def worker(ps):         # It takes an actor handle or instance as an argument
    # Iterate over some epoch
    for i in range(25):
        time.sleep(1.5)  # this could be your loss function computing gradients
        #grad = np.ones(10)
        from numpy import random
        grad = random.randint(30, size=(10))
        # update the gradients in the parameter server
        ps.update_params.remote(grad)

# Start our Parameter Server actor. This will be scheduled as a worker process
# on a remote Ray node. You invoke its ActorClass.remote(...) to instantiate an
# Actor instance of that type.

param_server = ParameterSever.remote()
print(param_server)

# Let's get the initial values of the parameter server
print(f"Initial params: {ray.get(param_server.get_params.remote())}")

# Create Workers remote tasks computing gradients
# Let's create three separate worker tasks as our machine learning tasks
# that compute gradients. These will be scheduled as tasks on a Ray cluster.

# You can use list comprehension.
# If we need more workers to scale, we can always bump them up.
# Note: We are sending the parameter_server as an argument to the remote worker task.

print([worker.remote(param_server) for _ in range(3)])

# Now, let's iterate over a loop and query the Parameter Server as the
# workers are running independently and updating the gradients

for _i in range(20):
    print(f"Updated params: {ray.get(param_server.get_params.remote())}")
    time.sleep(1)

parameter server
Actor(ParameterSever, 849c7c0b82728c37ec71b3bb01000000)
Initial params: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[ObjectRef(5f70e045687d2f9affffffffffffffffffffffff0100000001000000), ObjectRef(a4dc031465f905f8ffffffffffffffffffffffff0100000001000000), ObjectRef(9e7872a82e7456d9ffffffffffffffffffffffff0100000001000000)]
Updated params: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Updated params: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Updated params: [-42. -51. -26. -51. -35. -46. -41. -33. -28. -60.]
Updated params: [-58. -67. -33. -63. -50. -63. -62. -40. -44. -87.]
Updated params: [ -78. -111.  -61.  -79.  -80.  -90.  -96.  -47.  -45. -123.]
Updated params: [-105. -153.  -93. -136. -114. -134. -151. -124.  -79. -177.]
Updated params: [-136. -181. -113. -150. -120. -169. -162. -145. -109. -184.]
Updated params: [-159. -189. -119. -158. -126. -171. -181. -164. -134. -194.]
Updated params: [-189. -232. -162. -196. -174. -208. -224. -203. -199. -225.]
Updated params: [-207. -261. -179. -220. -199. -2

## Part 6 - Compute Pi using Monte Carlo method
**Excercise:** Based on the previous example, create an actor-based system that can compute Pi using the Monte Carlo method. There should be one supervising actor and computing tasks/actors. 

Hint: take a look on https://docs.ray.io/en/latest/ray-core/examples/monte_carlo_pi.html . However, we do not want to have progress listed. Instead, we want to see as we get closer to the Pi value in each round.


![monte-carlo](img/monte_carlo_pi_sma.png)


In [44]:
import ray
import math
import time
import random

# ray.init()

In [145]:
# implement here 
import ray.actor


@ray.remote
class ProgressActor:
    def __init__(self, total_num_samples: int):
        self.total_num_samples = total_num_samples
        self.num_samples_completed_per_task = {}

    def report_progress(self, task_id: int, num_samples_completed: int) -> None:
        self.num_samples_completed_per_task[task_id] = num_samples_completed

    def get_progress(self) -> float:
        return (
            sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
        )
    
@ray.remote
class SamplesActor:
    def __init__(self):
        self.num_samples_completed = {}
        self.num_samples_inside = {}

    def report_samples(self, task_id: int, num_samples_completed, num_samples_inside):
        self.num_samples_completed[task_id] = num_samples_completed
        self.num_samples_inside[task_id] = num_samples_inside

    def get_estimation(self):
        sum_num_samples_completed = sum(self.num_samples_completed.values())
        sum_num_samples_inside = sum(self.num_samples_inside.values())
        if sum_num_samples_completed == 0:
            return None
        return sum_num_samples_inside * 4 / sum_num_samples_completed
        
    
@ray.remote
def sampling_task(num_samples: int, task_id: int,
                  progress_actor: ray.actor.ActorHandle,
                  samples_actor: ray.actor.ActorHandle) -> int:
    num_inside = 0
    for i in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1

        if (i + 1) % 1_000_000 == 0:
            progress_actor.report_progress.remote(task_id, i + 1)
            samples_actor.report_samples.remote(task_id, i+1, num_inside)

    progress_actor.report_progress.remote(task_id, num_samples)
    samples_actor.report_samples.remote(task_id, i+1, num_inside)

In [147]:
NUM_SAMPLING_TASKS = 10
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK
SLEEP_PERIOD = 3

progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)
samples_actor = SamplesActor.remote()

for i in range(NUM_SAMPLING_TASKS):
    sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor, samples_actor)

while True:
    progress = ray.get(progress_actor.get_progress.remote())
    
    pi_estimation = ray.get(samples_actor.get_estimation.remote())
    print(f"Current estimation of Ï€ is: {pi_estimation}")

    if progress == 1:
        break

    time.sleep(SLEEP_PERIOD)

pi_estimation = ray.get(samples_actor.get_estimation.remote())
print(f"Final estimation of Ï€ is: {pi_estimation}")


Current estimation of Ï€ is: None
Current estimation of Ï€ is: 3.140769882352941
Current estimation of Ï€ is: 3.1412489411764706
Current estimation of Ï€ is: 3.1414493333333335
Current estimation of Ï€ is: 3.1415977777777777
Current estimation of Ï€ is: 3.141574684931507
Current estimation of Ï€ is: 3.141670313253012
Current estimation of Ï€ is: 3.141573022222222
Current estimation of Ï€ is: 3.141584244897959
Current estimation of Ï€ is: 3.14157056
Final estimation of Ï€ is: 3.14157056


## Part 7 - Parallel merge sort (homework)
**Excercise:** Based on the previous examples, create a sequential and parallel implementation of the merge sort algorithm. You can choose any version of the parallel algorithm. Compare performance.


In [61]:
import ray
import math
import time
import random

# ray.init()

2024-04-21 19:56:34,284	INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.8
Ray version:,2.10.0
Dashboard:,http://127.0.0.1:8265


In [98]:
# Sequential Merge Sort

def merge(arr_left, arr_right):
    n = len(arr_left) + len(arr_right)
    result = [0 for _ in range(n)]

    l, r = 0, 0
    for i in range(n):
        if r >= len(arr_right) or (l < len(arr_left) and arr_left[l] < arr_right[r]):
            result[i] = arr_left[l]
            l += 1
        else:
            result[i] = arr_right[r]
            r += 1
    return result

def merge_sort(arr, l, r):
    if l == r:
        return [arr[l]]
    
    m = (l + r) // 2
    arr_left = merge_sort(arr, l, m)
    arr_right = merge_sort(arr, m+1, r)
    return merge(arr_left, arr_right)


# ls = [2,1,0,4,5,6,1,2,5,10,7,8]
# print(ls)
# print(merge_sort(ls, 0, len(ls)-1))

In [124]:
# Parallel Merge Sort
from itertools import chain

@ray.remote
def merge_sort_task(arr, l, r):
    if l > r:
        return []
    if l == r:
        return [arr[l]]
    
    m = (l + r) // 2
    arr_left = merge_sort(arr, l, m)
    arr_right = merge_sort(arr, m+1, r)
    return merge(arr_left, arr_right)


def parallel_merge_sort(arr, n_tasks):
    n_buckets = n_tasks
    buckets = [[] for _ in range(n_buckets)]
    min_val = min(arr)
    max_val = max(arr)
    bucket_range = (max_val - min_val) / n_buckets
    for num in arr:
        bucket_nr = int((num - min_val) // bucket_range)
        bucket_nr = min(bucket_nr, n_buckets-1)
        buckets[bucket_nr].append(num)
    
    results = [merge_sort_task.remote(buckets[i], 0, len(buckets[i])-1) for i in range(n_buckets)]
    flatten_results = list(chain.from_iterable(ray.get(results)))
    return flatten_results


# ls = [2,1,0,4,5,6,1,2,5,10,7, 8]
# print(ls)
# parallel_merge_sort(ls, 3)

In [140]:
import random

# Comparing performance

def generate_random_array(num_elements, num_range):
    return [random.randint(0, num_range) for _ in range(num_elements)]

def run_test(sequntial_ms, parallel_ms, num_tasks, input_arr):
    # Test sequential merge sort
    time.sleep(1.0)
    start_time = time.time()

    sequential_result = sequntial_ms(input_arr, 0, len(input_arr)-1)
    
    end_time = time.time()
    sequential_duration = end_time - start_time

    # Test parallel merge sort
    time.sleep(1.0)
    start_time = time.time()

    parallel_result = parallel_ms(input_arr, num_tasks)
    
    end_time = time.time()
    parallel_duration = end_time - start_time

    expected_result = sorted(input_arr)
    assert sequential_result == expected_result, ('Sequential Merge Sort cannot sort ðŸ˜¬')
    assert parallel_result == expected_result, ('Parallel Merge Sort cannot sort ðŸ˜¬')

    return sequential_duration, parallel_duration


def compare_performance(sequntial_ms, parallel_ms, num_tests, num_elements, num_range, num_tasks):
    print(f'Performance Comparison: num_tests={num_tests}, num_elements={num_elements}, num_tasks={num_tasks}, time_unit=ms\n')

    sequential_total_time, parallel_total_time = 0, 0
    
    for test_id in range(num_tests):
        input_arr = generate_random_array(num_elements, num_range)
        
        sequential_duration, parallel_duration = run_test(sequntial_ms, parallel_ms, num_tasks, input_arr)
        sequential_duration *= 1000
        parallel_duration *= 1000
        sequential_total_time += sequential_duration
        parallel_total_time += parallel_duration
        print(f'Test {test_id+1}/{num_tests}: Sequential={sequential_duration}, Parallel={parallel_duration}')

    print(f'\nSequential Total Time: {sequential_total_time}')
    print(f'Parallel Total Time: {parallel_total_time}')
    print(f'Difference: {abs(sequential_total_time - parallel_total_time)}')

    print(f'\nSequential Avg Time: {sequential_total_time/num_tests}')
    print(f'Parallel Avg Time: {parallel_total_time/num_tests}')
    print(f'Difference: {abs((sequential_total_time-parallel_total_time)/num_tests)}')
    

In [143]:
NUM_ELEMENTS = 10**3
NUM_RANGE = 10**6
NUM_TESTS = 10
NUM_SORTING_TASKS = 5

compare_performance(merge_sort, parallel_merge_sort, NUM_TESTS, NUM_ELEMENTS, NUM_RANGE, NUM_SORTING_TASKS)

Performance Comparison: num_tests=10, num_elements=1000, num_tasks=5, time_unit=ms

Test 1/10: Sequential=2.116680145263672, Parallel=6.580352783203125
Test 2/10: Sequential=5.080938339233398, Parallel=11.852264404296875
Test 3/10: Sequential=6.290912628173828, Parallel=9.03940200805664
Test 4/10: Sequential=6.404399871826172, Parallel=9.598016738891602
Test 5/10: Sequential=5.286693572998047, Parallel=10.04338264465332
Test 6/10: Sequential=5.538463592529297, Parallel=10.36214828491211
Test 7/10: Sequential=5.304574966430664, Parallel=10.686159133911133
Test 8/10: Sequential=5.866050720214844, Parallel=11.224746704101562
Test 9/10: Sequential=5.443096160888672, Parallel=6.968021392822266
Test 10/10: Sequential=1.970529556274414, Parallel=5.203723907470703

Sequential Total Time: 49.30233955383301
Parallel Total Time: 91.55821800231934
Difference: 42.25587844848633

Sequential Avg Time: 4.930233955383301
Parallel Avg Time: 9.155821800231934
Difference: 4.225587844848633


In [142]:
NUM_ELEMENTS = 10**6
NUM_RANGE = 10**6
NUM_TESTS = 10
NUM_SORTING_TASKS = 5

compare_performance(merge_sort, parallel_merge_sort, NUM_TESTS, NUM_ELEMENTS, NUM_RANGE, NUM_SORTING_TASKS)

Performance Comparison: num_tests=10, num_elements=1000000, num_tasks=5, time_unit=ms

Test 1/10: Sequential=3837.5906944274902, Parallel=1733.567237854004
Test 2/10: Sequential=3859.668731689453, Parallel=1528.7787914276123
Test 3/10: Sequential=3903.109312057495, Parallel=1640.1095390319824
Test 4/10: Sequential=3833.404064178467, Parallel=1665.5075550079346
Test 5/10: Sequential=4064.236640930176, Parallel=2553.3058643341064
Test 6/10: Sequential=4412.663459777832, Parallel=2204.9925327301025
Test 7/10: Sequential=4595.367908477783, Parallel=2456.7034244537354
Test 8/10: Sequential=4601.7961502075195, Parallel=2194.1299438476562
Test 9/10: Sequential=4788.10453414917, Parallel=2310.056686401367
Test 10/10: Sequential=4316.9190883636475, Parallel=2172.365188598633

Sequential Total Time: 42212.86058425903
Parallel Total Time: 20459.516763687134
Difference: 21753.3438205719

Sequential Avg Time: 4221.286058425903
Parallel Avg Time: 2045.9516763687134
Difference: 2175.33438205719


In [144]:
NUM_ELEMENTS = 10**6
NUM_RANGE = 10**6
NUM_TESTS = 10
NUM_SORTING_TASKS = 10

compare_performance(merge_sort, parallel_merge_sort, NUM_TESTS, NUM_ELEMENTS, NUM_RANGE, NUM_SORTING_TASKS)

Performance Comparison: num_tests=10, num_elements=1000000, num_tasks=10, time_unit=ms

Test 1/10: Sequential=4107.08475112915, Parallel=1443.6264038085938
Test 2/10: Sequential=3710.505962371826, Parallel=1425.4140853881836
Test 3/10: Sequential=4137.281656265259, Parallel=1549.0355491638184
Test 4/10: Sequential=4545.2117919921875, Parallel=2065.202474594116
Test 5/10: Sequential=4885.754585266113, Parallel=2010.8373165130615
Test 6/10: Sequential=4463.362455368042, Parallel=2115.036725997925
Test 7/10: Sequential=6003.6985874176025, Parallel=2072.6280212402344
Test 8/10: Sequential=5183.418273925781, Parallel=2143.045663833618
Test 9/10: Sequential=5040.796518325806, Parallel=2345.5400466918945
Test 10/10: Sequential=4801.0313510894775, Parallel=2818.5465335845947

Sequential Total Time: 46878.145933151245
Parallel Total Time: 19988.91282081604
Difference: 26889.233112335205

Sequential Avg Time: 4687.8145933151245
Parallel Avg Time: 1998.891282081604
Difference: 2688.9233112335205


## Clean up  - Clean up the environemnt

**GOAL:** The goal of this exercise is to close the environment once you finish play with ray `ray.shutdown`.

In [148]:
ray.shutdown()