Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Cleaned up version of SGD #8

Closed
wants to merge 18 commits into from
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# OSDI 2018 (Distributed Training Experiments)

For Horovod, Distributed TF, and Ray experiments

## As of Aug 9, 2018, to run the SGD experiments:

use `./resnet101-ray-sgd.py --devices-per-actor=8 --plasma-op --max-bytes=100000000 --ps --warmup --ps-spinwait --num-actors=2 --cluster --debug_ps`

And use `sgd.yaml` to launch the cluster. You may need to reset the conda env when you finally link up with the cluster.

The Ray commit used here is https://github.com/pcmoritz/ray-1/commit/136568f07c59f353dbfdac8e13baf3ba6839df98.
4 changes: 2 additions & 2 deletions chrome_timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ def __init__(self, tid):
self.tid = tid

def patch_ray(self):
orig_log = ray.worker.log
# orig_log = ray.worker.log

def custom_log(event_type, kind, *args, **kwargs):
orig_log(event_type, kind, *args, **kwargs)
# orig_log(event_type, kind, *args, **kwargs)
if kind == ray.worker.LOG_SPAN_START:
self.start(event_type)
elif kind == ray.worker.LOG_SPAN_END:
Expand Down
86 changes: 51 additions & 35 deletions resnet101-ray-sgd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
import os
import ray
import time
import pyarrow.plasma as plasma
import logging


def fetch(oids):
local_sched_client = ray.worker.global_worker.local_scheduler_client
for o in oids:
plasma_id = ray.pyarrow.plasma.ObjectID(o)
ray.worker.global_worker.plasma_client.fetch([plasma_id])
ray_obj_id = ray.ObjectID(o)
local_sched_client.reconstruct_objects([ray_obj_id], True)


def run_timeline(sess, ops, feed_dict={}, write_timeline=False, name=""):
Expand Down Expand Up @@ -86,25 +89,30 @@ def __init__(self,
num_devices=1,
use_cpus=False,
max_bytes=0,
use_xray=True,
# use_xray=True,
plasma_op=False,
verbose=False):
# TODO - just port VariableMgrLocalReplicated
if use_xray:
if num_devices == 4:
gpu0 = FileLock("/tmp/gpu0")
gpu1 = FileLock("/tmp/gpu1")
try:
gpu0.acquire(timeout=0)
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
except:
gpu1.acquire(timeout=0)
os.environ["CUDA_VISIBLE_DEVICES"] = "4,5,6,7"
else:
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7"
print("CUDA VISIBLES", os.environ["CUDA_VISIBLE_DEVICES"])
# if use_xray:
# print(os.environ["CUDA_VISIBLE_DEVICES"])
# if num_devices == 4:
# gpu0 = FileLock("/tmp/gpu0")
# gpu1 = FileLock("/tmp/gpu1")
# try:
# gpu0.acquire(timeout=0)
# os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
# except:
# gpu1.acquire(timeout=0)
# os.environ["CUDA_VISIBLE_DEVICES"] = "4,5,6,7"
# else:
# os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7"
# print("CUDA VISIBLES", os.environ["CUDA_VISIBLE_DEVICES"])
self.logger = logging.getLogger("SGDWorker")
logging.basicConfig(level=logging.DEBUG)

self.i = i
assert num_devices > 0
self.logger.debug("Starting Session.")
tf_session_args = {
"device_count": {"CPU": num_devices},
"log_device_placement": False,
Expand All @@ -113,6 +121,7 @@ def __init__(self,
}
config_proto = tf.ConfigProto(**tf_session_args)
self.sess = tf.Session(config=config_proto)
self.logger.debug("Created Session.")
models = []
grad_ops = []
self.iter = 0
Expand All @@ -125,14 +134,16 @@ def __init__(self,
device = device_tmpl % device_idx
with tf.device(device):
with tf.variable_scope("device_%d" % device_idx):
print("DEVICE: ", device)
self.logger.debug("Creating Model for Device {}.".format(device))
model = model_cls(batch=batch_size, use_cpus=use_cpus, device=device)
models += [model]
model.grads = [
t for t in model.optimizer.compute_gradients(model.loss)
if t[0] is not None]
grad_ops.append(model.grads)

self.logger.debug("Created all gradient ops.")

self.models = models
if num_devices == 1:
assert not max_bytes, "Not supported with 1 GPU"
Expand All @@ -151,7 +162,7 @@ def __init__(self,
self.num_grads = num_grads = len(self.packed_grads_and_vars[0])
if max_bytes:
assert(num_grads < 314)
print("Packed grads => {} tensors".format(num_grads))
self.logger.debug("Packed grads => {} tensors".format(num_grads))
else:
assert(num_grads == 314)

Expand All @@ -166,7 +177,10 @@ def __init__(self,

round_robin_devices = False
if args.plasma_op:
memcpy_plasma_module = tf.load_op_library("/home/ubuntu/osdi2018/ops/memcpy_plasma_op.so")
self.logger.debug("** Building Plasma Op **")
plasma.build_plasma_tensorflow_op()
# memcpy_plasma_module = tf.load_op_library("/home/ubuntu/osdi2018/ops/memcpy_plasma_op.so")
memcpy_plasma_module = plasma.tf_plasma_op

# For fetching grads -> plasma
self.plasma_in_grads = []
Expand Down Expand Up @@ -197,6 +211,7 @@ def __init__(self,
with tf.control_dependencies([self.plasma_in_grads[j]]):
grad_ph = memcpy_plasma_module.plasma_to_tensor(
self.plasma_out_grads_oids[j],
dtype=tf.float32,
plasma_store_socket_name=ray.worker.global_worker.plasma_client.store_socket_name,
plasma_manager_socket_name=ray.worker.global_worker.plasma_client.manager_socket_name)
grad_ph = tf.reshape(grad_ph, self.packed_grads_and_vars[0][j][0].shape)
Expand Down Expand Up @@ -366,8 +381,8 @@ def add_spinwait(self, grad_shard_ids):
for p in plasma_ids:
if ray.worker.global_worker.plasma_client.contains(p):
self.timeline.start("get_buffers")
[raw_grads] = ray.worker.global_worker.plasma_client.get_buffers([p])
grads = np.frombuffer(raw_grads, dtype=np.float32)
# [raw_grads] = ray.worker.global_worker.plasma_client.get_buffers([p])
grads = ray.worker.global_worker.plasma_client.get(p)
self.accumulated += grads
self.acc_counter += 1
self.timeline.end("get_buffers")
Expand All @@ -382,8 +397,9 @@ def add(self, grad_shard_id):
# self.timeline.end("add_wait")
self.timeline.start("get_buffers")
oid = ray.pyarrow.plasma.ObjectID(grad_shard_id)
[raw_grads] = ray.worker.global_worker.plasma_client.get_buffers([oid])
grads = np.frombuffer(raw_grads, dtype=np.float32)
# [raw_grads] = ray.worker.global_worker.plasma_client.get_buffers([oid])
# grads = np.frombuffer(raw_grads, dtype=np.float32)
grads = ray.worker.global_worker.plasma_client.get(oid)
self.timeline.end("get_buffers")
self.accumulated += grads
self.acc_counter += 1
Expand All @@ -394,11 +410,12 @@ def get(self, object_id):
client = ray.worker.global_worker.plasma_client
assert self.acc_counter == self.num_sgd_workers, self.acc_counter
oid = ray.pyarrow.plasma.ObjectID(object_id)
buff = client.create(
oid, self.accumulated.nbytes)
wrapper = np.frombuffer(buff, dtype=np.float32)
np.copyto(wrapper, self.accumulated)
client.seal(oid)
client.put(self.accumulated.flatten(), object_id=oid)
# buff = client.create(
# oid, self.accumulated.nbytes)
# wrapper = np.frombuffer(buff, dtype=np.float32)
# np.copyto(wrapper, self.accumulated)
# client.seal(oid)
self.accumulated = np.zeros_like(self.accumulated)
self.acc_counter = 0
self.timeline.end("get")
Expand Down Expand Up @@ -608,7 +625,6 @@ def roundrobin_ps(ps_cls, sgd_workers, shard_shapes, spread_ps):

def create_ps():
tid_counter[0] += 1
time.sleep(1) # needed because resource tracking is faulty
return RemotePS.remote(num_workers, tid_counter[0])

ip_mapping = defaultdict(list)
Expand All @@ -634,7 +650,7 @@ def create_ps():

for ps in sum(candidates, []):
if ps not in final_list:
ps.__ray_terminate__.remote(ps._ray_actor_id.id())
ps.__ray_terminate__.remote()
print("removing a ps...")
else:
print("saving ps...")
Expand All @@ -652,13 +668,12 @@ def create_at(ips, actor_class):
while len(assigned) < len(ips):
i += 1
print("Try", i)
time.sleep(1)
cand = actor_class.remote()
c_ip = ray.get(cand.ip.remote())
if c_ip in ips and c_ip not in assigned:
assigned[c_ip] = cand
else:
cand.__ray_terminate__.remote(cand._ray_actor_id.id())
cand.__ray_terminate__.remote()
print("Progress so far", assigned)
return [assigned[ip] for ip in ips]

Expand Down Expand Up @@ -757,7 +772,8 @@ def allreduce_sgd_step(actors, allreduce_actors_by_shard, shard_shapes, args):
max_bytes=args.max_bytes, plasma_op=args.plasma_op,
verbose=args.verbose)]
print("Creating an actor")
time.sleep(1)
print("SLEEPING")
time.sleep(5)

print("Test config: " + str(args))
results = []
Expand Down Expand Up @@ -803,7 +819,7 @@ def allreduce_sgd_step(actors, allreduce_actors_by_shard, shard_shapes, args):
assert len(actors) > 1, "Need more than 1 node for round robin!"
ps_list = roundrobin_ps(RemotePS, actors, shard_shapes, args.spread_ps)
else:
ps_list = [RemotePS.remote(len(actors), i)
ps_list = [RemotePS.remote(len(actors), i)
for i, s in enumerate(shard_shapes)]
[ps.initialize.remote(s) for ps, s in zip(ps_list, shard_shapes)]

Expand All @@ -823,7 +839,7 @@ def allreduce_sgd_step(actors, allreduce_actors_by_shard, shard_shapes, args):
assert args.num_actors == 1
step_fn = lambda: do_sgd_step(actors, args)

for i in range(20):
for i in range(100):
start = time.time()
print("Sgd step", i)
step_fn()
Expand Down
6 changes: 3 additions & 3 deletions richard/rliaw-sgdlaunch.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cluster_name: sgd
min_workers: 1
max_workers: 1
min_workers: 3
max_workers: 3
target_utilization_fraction: 0.8
idle_timeout_minutes: 5
provider:
Expand Down Expand Up @@ -42,5 +42,5 @@ worker_start_ray_commands:
# ./resnet101-ray-sgd.py --verbose --devices-per-actor=1 --num-actors=2 --use-cpus --plasma-op --max-bytes=10000000 --ps --cluster

- source activate tensorflow_p27 && ray stop
# - source activate tensorflow_p27 && ray start --redis-address=$RAY_HEAD_IP:6379 --use-raylet --max-sends 8 --max-receives 8 --object-chunk-size 20000000
- source activate tensorflow_p27 && ray start --redis-address=$RAY_HEAD_IP:6379 --use-raylet

77 changes: 77 additions & 0 deletions sgd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: sgd

# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers. min_workers default to 0.
min_workers: 1
max_workers: 1

# docker:
# image: tensorflow/tensorflow:1.5.0-py3
# container_name: ray_docker

# Cloud-provider specific configuration.
provider:
type: aws
region: us-east-1
availability_zone: us-east-1a

# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu

head_node:
InstanceType: p2.8xlarge
ImageId: ami-076fb2ffd49699eea
InstanceMarketOptions:
MarketType: spot
SpotOptions:
MaxPrice: "9.0"


worker_nodes:
InstanceType: p2.8xlarge
ImageId: ami-076fb2ffd49699eea
InstanceMarketOptions:
MarketType: spot
SpotOptions:
MaxPrice: "9.0"

#
# # Run workers on spot by default. Comment this out to use on-demand.
# InstanceMarketOptions:
# MarketType: spot

setup_commands:
- source activate tensorflow_p27
- echo "source activate tensorflow_p27; export RAY_USE_XRAY=1" >> ~/.bashrc
- pip install -U https://s3.us-east-2.amazonaws.com/richardresults/ray-wheels/ray-0.5.0-cp27-cp27mu-manylinux1_x86_64.whl


file_mounts: {
~/osdi/: /Users/rliaw/Research/riselab/osdi/
#"/home/ubuntu/raycode/actor.py": "/Users/rliaw/Research/riselab/ray/python/ray/actor.py",
#"/home/ubuntu/raycode/tune/": "/Users/rliaw/Research/riselab/ray/python/ray/tune/",
# "/home/ubuntu/raycode/rllib/": "/Users/rliaw/Research/riselab/ray/python/ray/rllib/",
}

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []
# - cp /home/ubuntu/raycode/actor.py /home/ubuntu/anaconda3/envs/py27/lib/python2.7/site-packages/ray/actor.py
#- cp -r /home/ubuntu/raycode/tune/ /home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/ray/
#- cp -r /home/ubuntu/raycode/rllib/ /home/ubuntu/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages/ray/
# - cp -r /home/ubuntu/raycode/rllib/ /home/ubuntu/anaconda3/envs/tensorflow_p27/lib/python2.7/site-packages/ray/

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# # Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ray start --head --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ray start --redis-address=$RAY_HEAD_IP:6379 --object-manager-port=8076

21 changes: 21 additions & 0 deletions test_code/test_parameter_plasma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from resnet import ParameterServer

import ray; ray.shutdown()
ray.init()
import numpy as np
put_oid_bin = np.random.bytes(20)
get_oid_bin = np.random.bytes(20)

random_array = np.random.rand(5,5)
print random_array

put_oid = ray.pyarrow.plasma.ObjectID(put_oid_bin)
ray.worker.global_worker.plasma_client.put(random_array.flatten(), object_id=put_oid)

ps = ParameterServer(1, 1)
ps.initialize(25)
ps.add(put_oid_bin)
ps.get(get_oid_bin)

get_oid = ray.pyarrow.plasma.ObjectID(get_oid_bin)
print ray.worker.global_worker.plasma_client.get(get_oid)
Loading