Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experimental] Add experimental distributed SGD API #2858

Merged
merged 20 commits into from
Sep 20, 2018

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Sep 11, 2018

No description provided.

ray.worker.global_worker.plasma_client.store_socket_name)
manager_socket = (
ray.worker.global_worker.plasma_client.manager_socket_name)
memcpy_plasma_module = tf.load_op_library(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're copying a lot of code from TF in this PR. Can you say why? Some of it is just to get a Resnet model, right? What about the allreduce stuff?

import time


class Timeline(object):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this from this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and run_timeline from sgd.py

@@ -0,0 +1,504 @@
from __future__ import absolute_import

This comment was marked as resolved.

assert(len(self.per_device_grads) == num_devices)
self.num_grads = num_grads = len(self.packed_grads_and_vars[0])
if max_bytes:
print("Packed grads => {} tensors".format(num_grads))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use logger

plasma_manager_socket_name=manager_socket)
grad_ph = tf.reshape(
grad_ph, self.packed_grads_and_vars[0][j][0].shape)
print("Packed tensor", grad_ph)
Copy link
Collaborator

@robertnishihara robertnishihara Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use logger, same with all other prints

@robertnishihara
Copy link
Collaborator

robertnishihara commented Sep 11, 2018

Using the default model (but with batch size 64) model_creator = lambda worker_idx, device_idx: TFBenchModel(batch=64, use_cpus=False), I see the following times for sgd.step(). This is all without the Plasma op.

  • 1 machine, 1 GPU: ~0.8s
  • 1 machine, 8 GPUs: ~0.9s
  • 2 machines, 8 GPUs each: ~2s
  • 3 machines, 8 GPUs each: ~2.1s
  • 4 machines, 8 GPUs each: ~2.4s

so, scaling from 1 GPU to 8 GPUs is quite good.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8162/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8193/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8196/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8199/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8198/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8197/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8200/
Test PASSed.

@ericl
Copy link
Contributor Author

ericl commented Sep 14, 2018

I did some reorganization of the code.

  • Moved the benchmarks code to tfbench. We'll need this if we want to run standard image net benchmarks without too much pain.
  • Moved timelines code to util.py file. This stuff is needed to provide proper performance instrumentation for Ray and TF<->GPU scheduling.

I also have started fixing the plasma op code. Currently it crashes since I left out the parameter server actor setup code. We can either try to merge this first, and merge the PS code after, or do both here.

Copy link
Contributor

@pcmoritz pcmoritz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (also the plasma op changes). Happy to merge this after the linting error is fixed and do the parameter server changes as a followup.

self.start_time = self.time()
self.tid = tid

def patch_ray(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why patch the Ray logging? As opposed to just using the existing logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ray logging functionality doesn't provide the fine-grained control we have here to capture just one SGD iteration, and also add additional types of events.

Note that this still calls the original log call so you can get overall timelines that way still.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you can just do (assuming xray)

with ray.profile("custom_event"):
    # do one SGD iteration

to add the event to the timeline. Is that what you want? Anyway, if there's additional functionality here that's needed for profiling, then in a follow up PR we should just extend the profiling API (so it's available more generally outside of SGD).

from __future__ import division
from __future__ import print_function

import ray
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ray import should be separate and below standard library imports https://github.com/google/styleguide/blob/gh-pages/pyguide.md#313-imports-formatting

This applies in other places also

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -0,0 +1,627 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment at the top of all the copied files saying something like "This file was copied from [RELEVANT URL]"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@richardliaw richardliaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should scope the imports correctly (there's places where we import . and from file by name...)

import tensorflow.contrib.nccl as nccl
import tensorflow.contrib.slim as slim

from util import Timeline, fetch, run_timeline
Copy link
Contributor

@richardliaw richardliaw Sep 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want all of these to be local imports (rather than global)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

ray.worker.global_worker.plasma_client.fetch([plasma_id])


def run_timeline(sess, ops, feed_dict={}, write_timeline=False, name=""):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't use a mutable value for default arg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8247/
Test PASSed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me (pending tests passing, it looks like there is a linting error).

Currently it looks like this code is not touched by any of our tests. Can we add simple tests (just to touch the code) in a follow up PR (probably on Jenkins).

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8273/
Test PASSed.

@robertnishihara
Copy link
Collaborator

Looks like we need to exclude the copied TF files from the flake8 test.

@ericl
Copy link
Contributor Author

ericl commented Sep 18, 2018

Let me add that.

@robertnishihara
Copy link
Collaborator

@ericl there is a merge conflict.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8304/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8305/
Test FAILed.

@robertnishihara
Copy link
Collaborator

jenkins, retest this please

@robertnishihara
Copy link
Collaborator

The test failure was

ERROR: testTrainMultiCartpoleSinglePolicy (__main__.TestMultiAgentEnv)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/ray/python/ray/rllib/test/test_multi_agent_env.py", line 364, in testTrainMultiCartpoleSinglePolicy
    raise Exception("failed to improve reward")
Exception: failed to improve reward

----------------------------------------------------------------------

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/8306/
Test PASSed.

@pcmoritz pcmoritz merged commit 3267676 into ray-project:master Sep 20, 2018
@pcmoritz pcmoritz deleted the sgd branch September 20, 2018 04:12
@robertnishihara
Copy link
Collaborator

Progress on #1945.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants