# Ray: Task Programming with Remote Task/Actor Functions

To install Ray on AWS EC2: run
```sh
$ pip3 install ray
```

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
import time
import math
import random
from fractions import Fraction
import numpy as np

print("Successfully imported ray!")

Successfully imported ray!


This initialize a local Ray environment that runs locally on this EC2 instance.

In [2]:
#ray.init(address='172.31.32.99:6379', num_cpus=2, ignore_reinit_error=True)
ray.init(num_cpus=2, ignore_reinit_error=True)

time.sleep(2.0)

2023-04-03 02:42:05,408	INFO worker.py:1553 -- Started a local Ray instance.


# Remote tasks

These are simple examples to demonstrate the use of Ray's (**stateless**) Tasks API.
The Ray specific items:
* `@ray.remote` decorator indicates that this is a remote function.
* `dot.remote(id1, id2)` invokes the function to run asynchronously.
* `ray.get(id3)` gets the return value of the remote function.

In [3]:
@ray.remote
def zeros(shape):
    return np.zeros(shape)

@ray.remote
def randoms(shape):
    return np.random.randint(10, size=shape)

In [4]:
@ray.remote
def dot(a, b):
    return np.dot(a, b)

In [5]:
id1 = zeros.remote([4,4])
id2 = zeros.remote([4,4])
id3 = dot.remote(id1, id2)

In [6]:
result = ray.get(id3)
print(result)

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]


## The task DAG

```py
randoms   randoms
   |         |
  \|/       \|/
  id1       id2
    \       / 
     \     /
      \   /
       \ /
       dot
        |
       \|/
       id3
```

In [7]:
id1 = randoms.remote([5,5])
id2 = randoms.remote([5,5])
id3 = dot.remote(id1, id2)

In [8]:
obj1 = ray.get(id1)
obj2 = ray.get(id2)
obj3 = ray.get(id3)

In [9]:
print(obj1)
print(obj2)
print(obj3)

[[3 1 1 0 6]
 [7 4 8 2 9]
 [8 8 0 0 9]
 [1 5 8 0 9]
 [7 0 5 0 1]]
[[5 1 2 5 9]
 [5 9 0 9 8]
 [9 1 4 2 3]
 [1 9 9 9 5]
 [8 0 8 9 6]]
[[ 77  13  58  80  74]
 [201  69 136 186 183]
 [152  80  88 193 190]
 [174  54 106 147 127]
 [ 88  12  42  54  84]]


# Remote actors

These are simple examples to demonstrate the use of Ray's (**stateful**) Actors API.

An actor is associated with a decorated `@ray.remote` class. When you define an actor, you get a runtime handle (object identifier) to the actor. That handle provides access to all methods and state within the class.

All of the methods within the class are `remote()`. You should think of this as

* the actor is assigned to a compute resource
* function calls package arguments (and other needed state) and ships it to the remote resource

When working with actors, you should try to have your remote calls be as minimal as possible and keep state within the actor at all times. I.e., try not to send a lot of state across actors.

In [10]:
@ray.remote
class Counter(object):
    def __init__(self, arg1):
        self.value = arg1

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value

# Create an actor from this class.
counter = Counter.remote(5)

## The actor DAG

```sh
   counter <-- 5
      |
     \|/
  increment --> id1
      |
     \|/
  increment --> id2
      |
     \|/
  increment --> id3
```

In [11]:
id1 = counter.increment.remote()
id2 = counter.increment.remote()
id3 = counter.increment.remote()

In [12]:
result = ray.get([id1, id2, id3])
print(result) # prints [6, 7, 8]

[6, 7, 8]


In [13]:
# Create ten Counter actors, each initialized with 0.
counters = [Counter.remote(0) for _ in range(10)]

# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)  # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)  # prints [2, 3, 4, 5, 6]



[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[2, 3, 4, 5, 6]


## Ray is a parallel computing framework

Run a trivial program in serial to show that it is slow.

In [14]:
# A function simulating a more interesting computation that takes one second.
def slow_function(i):
    time.sleep(1)
    return i

start_time = time.time()
results = []
for i in range(8):
    results.append(slow_function(i))
    
duration = time.time() - start_time
print("Executing the for loop took {:.3f} seconds.".format(duration))
print("The results are: ", results)

Executing the for loop took 8.008 seconds.
The results are:  [0, 1, 2, 3, 4, 5, 6, 7]


Parallelize the same computation with remote task functions. 
A trivially parallelizeable program has a speedup of 2-e on two cores. 

In [15]:
@ray.remote
def slow_function(i):
    time.sleep(1)
    return i

start_time = time.time()

results = []
oids = []

for i in range(8):
    oids.append(slow_function.remote(i))
    
for i in range(8):
    results.append(ray.get(oids[i]))

duration = time.time() - start_time
print("Executing the for loop took {:.3f} seconds.".format(duration))
print("The results are: ", results)



Executing the for loop took 5.235 seconds.
The results are:  [0, 1, 2, 3, 4, 5, 6, 7]
