In [27]:
import ray
import random
import time
import math
from fractions import Fraction


# Initialize Ray
if ray.is_initialized():
    ray.shutdown()
# context = ray.init(dashboard_host="0.0.0.0", include_dashboard=True)
# context.dashboard_url

In [12]:
ray.cluster_resources()

{'memory': 7543000269.0,
 'CPU': 8.0,
 'object_store_memory': 2147483648.0,
 'node:__internal_head__': 1.0,
 'node:127.0.0.1': 1.0}

In [3]:
import time

database = [
    "Learning", "Ray",
    "Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]


def retrieve(item):
    time.sleep(item / 10.)
    return item, database[item]

# # Basic Example

In [4]:
def print_runtime(input_data, start_time):
    print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
    print(*input_data, sep="\n")


start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data, start)

Runtime: 2.82 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


# # Parallelism

In [5]:
import ray 


@ray.remote
def retrieve_task(item):
    return retrieve(item)

start = time.time()
object_references = [
    retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)

Runtime: 1.24 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


# # With Object Stores

In [6]:
db_object_ref = ray.put(database)


@ray.remote
def retrieve_task(item, db):
    time.sleep(item / 10.)
    return item, db[item]


start = time.time()
object_references = [
    retrieve_task.remote(item, db_object_ref) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)



Runtime: 0.71 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


# # Non-blocking calls

In [7]:
start = time.time()
object_references = [
    retrieve_task.remote(item, db_object_ref) for item in range(8)
]
all_data = []

while len(object_references) > 0:
    finished, object_references = ray.wait(
        object_references, timeout=7.0
    )
    data = ray.get(finished)
    print_runtime(data, start)
    all_data.extend(data)

print_runtime(all_data, start)


Runtime: 0.04 seconds, data:
(0, 'Learning')
Runtime: 0.14 seconds, data:
(1, 'Ray')
Runtime: 0.25 seconds, data:
(2, 'Flexible')
Runtime: 0.35 seconds, data:
(3, 'Distributed')
Runtime: 0.44 seconds, data:
(4, 'Python')
Runtime: 0.54 seconds, data:
(5, 'for')
Runtime: 0.64 seconds, data:
(6, 'Machine')
Runtime: 0.75 seconds, data:
(7, 'Learning')
Runtime: 0.75 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


# # Task Dependencies

In [8]:
@ray.remote
def follow_up_task(retrieve_result):
    original_item, _ = retrieve_result
    follow_up_result = retrieve(original_item + 1)
    return retrieve_result, follow_up_result


retrieve_refs = [retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]]
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]

result = [print(data) for data in ray.get(follow_up_refs)]

((0, 'Learning'), (1, 'Ray'))
((2, 'Flexible'), (3, 'Distributed'))
((4, 'Python'), (5, 'for'))
((6, 'Machine'), (7, 'Learning'))


# # Ray Actors

In [10]:
@ray.remote
class DataTracker:
    def __init__(self):
        self._counts = 0

    def increment(self):
        self._counts += 1

    def counts(self):
        return self._counts
    

@ray.remote
def retrieve_tracker_task(item, tracker, db):
    time.sleep(item / 10.)
    tracker.increment.remote()
    return item, db[item]


tracker = DataTracker.remote()

object_references = [
    retrieve_tracker_task.remote(item, tracker, db_object_ref) for item in range(8)
]
data = ray.get(object_references)

print(data)
print(ray.get(tracker.counts.remote()))

[(0, 'Learning'), (1, 'Ray'), (2, 'Flexible'), (3, 'Distributed'), (4, 'Python'), (5, 'for'), (6, 'Machine'), (7, 'Learning')]
8


# # Highly Parallel Example

In [22]:
@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the 
    fraction of time it was inside the circle. 
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

In [23]:
SAMPLE_COUNT = 1000 * 1000
start = time.time() 
future = pi4_sample.remote(sample_count = SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

Running 1000000 tests took 0.3192629814147949 seconds


In [24]:
pi = pi4 * 4
print(float(pi))
print(abs(pi-math.pi)/pi)


3.14244
0.0002696460108091184


In [None]:
FULL_SAMPLE_COUNT = 100 * 1000 * 1000 * 1000 # 100 billion samples! 
BATCHES = int(FULL_SAMPLE_COUNT / SAMPLE_COUNT)
print(f'Doing {BATCHES} batches')
results = []
for _ in range(BATCHES):
    results.append(pi4_sample.remote(sample_count = SAMPLE_COUNT))
output = ray.get(results)

Doing 100000 batches
