In this notebook the performance of some Python scripts before/after Ray is investigated. 

The exerices are derived from https://github.com/ray-project/tutorial. Here some of them are solved, and have passed their timing requirements. The notebook will ideally be extended to more complex algorithms, preferrably RL algorithms. 

In [13]:
import ray
import pickle
import time
import numpy as np

In [None]:
ray.init(num_cpus=8, redirect_output=True)

### Exercise 1: Simple Data Parallel Example

#### BEFORE

In [5]:
# This function is a proxy for a more interesting and computationally
# intensive function.
def slow_function(i):
    time.sleep(1)
    return i

# Sleep a little to improve the accuracy of the timing measurements below.
# We do this because workers may still be starting up in the background.
time.sleep(2.0)
start_time = time.time()

results = []
for i in range(4):
    results.append(slow_function(i))

end_time = time.time()
duration_slow = end_time - start_time

print('result: {}'.format(results))
print('time in seconds without Ray: {}'.format(duration_slow))

result: [0, 1, 2, 3]
time in seconds without Ray: 4.004085063934326


#### AFTER

In [6]:
@ray.remote
def fast_function(i):
    time.sleep(1)
    return i

time.sleep(2.0)
start_time = time.time()

object_ids = [fast_function.remote(i) for i in range(4)]
results = ray.get(object_ids)

end_time = time.time()
duration_fast = end_time - start_time

print('result: {}'.format(results))
print('time in seconds with Ray: {}'.format(duration_fast))

result: [0, 1, 2, 3]
time in seconds with Ray: 1.0028748512268066


### Exercise 2: Parallel Data Processing with Task Dependencies

#### BEFORE

In [7]:
def load_data(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

def normalize_data(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

def extract_features(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])

def compute_loss(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

time.sleep(2.0)
start_time = time.time()

losses = []
for filename in ['file1', 'file2', 'file3', 'file4']:
    data = load_data(filename)
    normalized_data = normalize_data(data)
    features = extract_features(normalized_data)
    loss = compute_loss(features)
    losses.append(loss)

loss = sum(losses)

end_time = time.time()
duration_slow = end_time - start_time

print('result: {}'.format(loss))
print('time in seconds without Ray: {}'.format(duration_slow))

result: 4000.0
time in seconds without Ray: 1.6527800559997559


#### AFTER

In [11]:
@ray.remote
def load_data_fast(filename):
    time.sleep(0.1)
    return np.ones((1000, 100))

@ray.remote
def normalize_data_fast(data):
    time.sleep(0.1)
    return data - np.mean(data, axis=0)

@ray.remote
def extract_features_fast(normalized_data):
    time.sleep(0.1)
    return np.hstack([normalized_data, normalized_data ** 2])

@ray.remote
def compute_loss_fast(features):
    num_data, dim = features.shape
    time.sleep(0.1)
    return np.sum((np.dot(features, np.ones(dim)) - np.ones(num_data)) ** 2)

time.sleep(2.0)
start_time = time.time()
files = ['file1', 'file2', 'file3', 'file4']

data_obj = [load_data_fast.remote(filename) for filename in files]
normalized_data_obj = [normalize_data_fast.remote(dt) for dt in data_obj]
features_obj = [extract_features_fast.remote(norm_dt) for norm_dt in normalized_data_obj]
losses_obj = [compute_loss_fast.remote(feature) for feature in features_obj]

loss = sum(ray.get(losses_obj))

end_time = time.time()
duration_fast = end_time - start_time

print('result: {}'.format(loss))
print('time in seconds with Ray: {}'.format(duration_fast))

result: 4000.0
time in seconds with Ray: 0.4228091239929199


### Exercise 3: Tree of tasks executed in parallel (Tree Reduce)

#### BEFORE

In [12]:
# This is a proxy for a function which generates some data.
def create_data(i):
    time.sleep(0.3)
    return i * np.ones(10000)

# This is a proxy for an expensive aggregation step (which is also
# commutative and associative so it can be used in a tree-reduce).
def aggregate_data(x, y):
    time.sleep(0.3)
    return x * y

time.sleep(2.0)
start_time = time.time()

vectors = [create_data(i + 1) for i in range(8)]

# For clarity the aggregation below is written as seperate functions,
# but it should be done using a while loop
result = aggregate_data(vectors[0], vectors[1])
result = aggregate_data(result, vectors[2])
result = aggregate_data(result, vectors[3])
result = aggregate_data(result, vectors[4])
result = aggregate_data(result, vectors[5])
result = aggregate_data(result, vectors[6])
result = aggregate_data(result, vectors[7])

end_time = time.time()
duration_slow = end_time - start_time

print('result: {}'.format(result))
print('time in seconds without Ray: {}'.format(duration_slow))

result: [ 40320.  40320.  40320. ...,  40320.  40320.  40320.]
time in seconds without Ray: 4.524723052978516


#### AFTER

In [4]:
# This is a proxy for a function which generates some data.
@ray.remote
def create_data_fast(i):
    time.sleep(0.3)
    return i * np.ones(10000)

# This is a proxy for an expensive aggregation step (which is also
# commutative and associative so it can be used in a tree-reduce).
@ray.remote
def aggregate_data_fast(x, y):
    time.sleep(0.3)
    return x * y

time.sleep(2.0)
start_time = time.time()

# data is generated in parallel
vectors_obj = [create_data_fast.remote(i+1) for i in range(8)]

# Again for clarity separate functions are written, but while loop
# should ideally be used. 
# Speeding up tree aggregation below by using Ray:
result01 = aggregate_data_fast.remote(vectors_obj[0], vectors_obj[1])
result23 = aggregate_data_fast.remote(vectors_obj[2], vectors_obj[3])
result45 = aggregate_data_fast.remote(vectors_obj[4], vectors_obj[5])
result67 = aggregate_data_fast.remote(vectors_obj[6], vectors_obj[7])

result0123 = aggregate_data_fast.remote(result01, result23)
result4567 = aggregate_data_fast.remote(result45, result67)

final_result = aggregate_data_fast.remote(result0123, result4567)

result = ray.get(final_result)

end_time = time.time()
duration_fast = end_time - start_time

print('result: {}'.format(result))
print('time in seconds with Ray: {}'.format(duration_fast))

result: [ 40320.  40320.  40320. ...,  40320.  40320.  40320.]
time in seconds with Ray: 1.2474958896636963


### Exercise 4: Hyper Parameter sweep by Nested Parallelism

#### BEFORE

In [5]:
def compute_gradient(data):
    time.sleep(0.03)
    return 1

def train_model(hyperparameters):
    result = 0
    for i in range(10):
        result += sum([compute_gradient(j) for j in range(2)])
    return result

time.sleep(2.0)
start_time = time.time()

# Run some hyperparaameter experiments.
results = []
for hyperparameters in [{'learning_rate': 1e-1, 'batch_size': 100},
                        {'learning_rate': 1e-2, 'batch_size': 100},
                        {'learning_rate': 1e-3, 'batch_size': 100}]:
    results.append(train_model(hyperparameters))

end_time = time.time()
duration_slow = end_time - start_time

print('result: {}'.format(results))
print('time in seconds without Ray: {}'.format(duration_slow))

result: [20, 20, 20]
time in seconds without Ray: 1.949739933013916


#### AFTER

In [7]:
@ray.remote
def compute_gradient_fast(data):
    time.sleep(0.03)
    return 1

@ray.remote
def train_model_fast(hyperparameters):
    # EXERCISE: After you turn "compute_gradient" into a remote function,
    # you will need to call it with ".remote". The results must be retrieved
    # with "ray.get" before "sum" is called.
    grad = sum(ray.get([compute_gradient_fast.remote(j) for j in range(2)]))
    return [grad for _ in range(10)]

# Sleep a little to improve the accuracy of the timing measurements below.
time.sleep(2.0)
start_time = time.time()

# Run some hyperparaameter experiments.
results = []
hyperparameters_list = [{'learning_rate': 1e-1, 'batch_size': 100},
                        {'learning_rate': 1e-2, 'batch_size': 100},
                        {'learning_rate': 1e-3, 'batch_size': 100}]

results_obj = [train_model_fast.remote(params) for params in hyperparameters_list]
results = [sum(res) for res in ray.get(results_obj)]

end_time = time.time()
duration_fast = end_time - start_time

print('result: {}'.format(results))
print('time in seconds with Ray: {}'.format(duration_fast))

result: [20, 20, 20]
time in seconds with Ray: 0.03414201736450195


### Exercise 7: Using Actors

From Ray Doc: Sometimes you need a "worker" process to have "state". For example, that state might be a neural network, a simulator environment, a counter, or something else entirely. However, remote functions are side-effect free. That is, they operate on inputs and produce outputs, but they don't change the state of the worker they execute on.

Actors are different. When we instantiate an actor, a brand new worker is created, and all methods that are called on that actor are executed on the newly created worker.

This means that with a single actor, no parallelism can be achieved because calls to the actor's methods will be executed one at a time. However, multiple actors can be created and methods can be executed on them in parallel.


#### BEFORE

In [9]:
class Foo(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter
    
# Create two Foo objects.
f1 = Foo()
f2 = Foo()

time.sleep(2.0)
start_time = time.time()

f1.reset()
f2.reset()

results = []
for _ in range(5):
    results.append(f1.increment())
    results.append(f2.increment())

end_time = time.time()
duration_slow = end_time - start_time

print('result: {}'.format(results))
print('time in seconds without Ray: {}'.format(duration_slow))

result: [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
time in seconds without Ray: 5.021368980407715


#### AFTER

In [10]:
@ray.remote
class Foo_fast(object):
    def __init__(self):
        self.counter = 0

    def reset(self):
        self.counter = 0

    def increment(self):
        time.sleep(0.5)
        self.counter += 1
        return self.counter
    
# Create two Foo objects.
f1 = Foo_fast.remote()
f2 = Foo_fast.remote()

time.sleep(2.0)
start_time = time.time()

# Reset the actor state so that we can run this cell multiple times without
# changing the results.
f1.reset.remote()
f2.reset.remote()

# We want to parallelize this code. However, it is not straightforward to
# make "increment" a remote function, because state is shared (the value of
# "self.counter") between subsequent calls to "increment". In this case, it
# makes sense to use actors.
results_obj = [[f1.increment.remote(), f2.increment.remote()] for _ in range(5)]
results = ray.get(sum(results_obj,[]))

end_time = time.time()
duration_fast = end_time - start_time

print('result: {}'.format(results))
print('time in seconds with Ray: {}'.format(duration_fast))

result: [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
time in seconds with Ray: 2.524989128112793


### Exercise 8: Speed up Serialization

In [11]:
neural_net_weights = {'variable{}'.format(i): np.random.normal(size=1000000)
                      for i in range(50)}

Compare the time required to serialize the neural net weights and copy them into the object store using Ray versus the time required to pickle and unpickle the weights. 

The big win should be with the time required for deserialization.

ray.put leverages multiple threads when serializing large objects. Note that this is not possible with pickle.

In [14]:
print('Ray - serializing')
%time x_id = ray.put(neural_net_weights)
print('\nRay - deserializing')
%time x_val = ray.get(x_id)

print('\npickle - serializing')
%time serialized = pickle.dumps(neural_net_weights)
print('\npickle - deserializing')
%time deserialized = pickle.loads(serialized)

Ray - serializing
CPU times: user 280 ms, sys: 965 ms, total: 1.25 s
Wall time: 1.01 s

Ray - deserializing
CPU times: user 719 µs, sys: 628 µs, total: 1.35 ms
Wall time: 1.25 ms

pickle - serializing
CPU times: user 296 ms, sys: 405 ms, total: 701 ms
Wall time: 1.06 s

pickle - deserializing
CPU times: user 174 ms, sys: 141 ms, total: 315 ms
Wall time: 316 ms


Use ray.put to avoid copying the neural net weights to the object store multiple times:

In [18]:
@ray.remote
def use_weights(weights, i):
    return i

time.sleep(2.0)
start_time = time.time()

weights_id = ray.put(neural_net_weights)
results = ray.get([use_weights.remote(weights_id, i)
                   for i in range(20)])

end_time = time.time()
duration_fast = end_time - start_time

print('result: {}'.format(results))
print('time in seconds with Ray: {}'.format(duration_fast))

result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
time in seconds with Ray: 0.5597741603851318
