Skip to content

Commit

Permalink
check in experiment code (#289)
Browse files Browse the repository at this point in the history
* fix flags

* add log

* log

* sync step

* log

* _sync_state_op
  • Loading branch information
lgarithm committed May 27, 2020
1 parent 6986cd9 commit f101146
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 5 deletions.
13 changes: 12 additions & 1 deletion benchmarks/scaling/benchmark_kungfu_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def parse_args():
p.add_argument('--train-steps', type=int, default=10, help='')
p.add_argument('--epochs', type=int, default=10, help='')
p.add_argument('--epoch-size', type=int, default=10, help='')
p.add_argument('--sync-step', action='store_true', default=False, help='')
p.add_argument('--resize-schedule',
type=str,
default='10:2,100:0',
Expand Down Expand Up @@ -170,10 +171,13 @@ def parse_scheule(schedule):

def run_with_estimator(args):
_log_event('BEGIN :: run_with_estimator')

_log_event('BEGIN :: build_estimator')
classifier = build_estimator(args)
_log_event('END :: build_estimator')

hooks = [
# debug_hooks.LogStepHook(),
debug_hooks.LogStepHook(),
]

if args.show_training_throughput:
Expand All @@ -193,7 +197,14 @@ def run_with_estimator(args):
classifier.train(input_fn, hooks=hooks)
else:
input_fn = build_input_fn(args.batch_size, args.train_steps)

sync_step_hook = debug_hooks.SyncStepHook()
if args.sync_step:
hooks.append(sync_step_hook)

_log_event('BEGIN :: classifier.train')
classifier.train(input_fn, hooks=hooks, max_steps=args.train_steps)
_log_event('END :: classifier.train')

_log_event('END :: run_with_estimator')

Expand Down
37 changes: 37 additions & 0 deletions benchmarks/scaling/debug_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
from kungfu._utils import (_log_event, _since_proc_start, one_based_range,
show_duration)
from kungfu.tensorflow.ops import all_reduce

import tensorflow as tf

Expand Down Expand Up @@ -63,7 +64,16 @@ class LogStepHook(tf.train.SessionRunHook):
def __init__(self):
self._step = 0

def begin(self):
print('%s::%s %d steps' % ('LogStepHook', 'begin', self._step))

def after_create_session(self, sess, coord):
print('%s::%s %d steps' %
('LogStepHook', 'after_create_session', self._step))

def before_run(self, run_context):
if self._step == 0:
_log_event('before_run_step_0')
print('%s::%s %d steps' % ('LogStepHook', 'before_run', self._step))

def after_run(self, run_context, run_values):
Expand Down Expand Up @@ -100,3 +110,30 @@ def after_run(self, run_context, run_values):

def end(self, run_context):
pass


class SyncStepHook(tf.train.SessionRunHook):
def __init__(self):
pass

def begin(self):
global_step = tf.train.get_or_create_global_step()
new_global_step = all_reduce(global_step, op='max')
self._sync_step_op = tf.assign(global_step, new_global_step)
from kungfu.tensorflow.initializer import BroadcastGlobalVariablesOp
self._sync_state_op = BroadcastGlobalVariablesOp()

def after_create_session(self, sess, coord):
gs = sess.run(self._sync_step_op)
sess.run(self._sync_state_op)
print('_sync_step_op result %d' % (gs))
_log_event('AFTER _sync_step_op')

def before_run(self, run_context):
pass

def after_run(self, run_context, run_values):
pass

def end(self, run_context):
_log_event('SyncStepHook::end')
4 changes: 2 additions & 2 deletions srcs/go/utils/runner/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func RunStaticKungFuJob(ctx context.Context, j job.Job, sp runtime.SystemParamet
`PYTHONWARNINGS=ignore`,
`TF_CPP_MIN_LOG_LEVEL=2`,
runnerProg,
// `-q`,
`-np`, strconv.Itoa(sp.ClusterSize),
`-H`, hl.String(),
`-port-range`, sp.WorkerPortRange.String(),
`-nic`, sp.Nic,
`-strategy`, j.Strategy.String(),
`-logdir`, j.LogDir,
}
if quiet {
Expand Down Expand Up @@ -103,14 +103,14 @@ func RunElasticKungFuJob(ctx context.Context, j job.Job, sp runtime.SystemParame
`TF_CPP_MIN_LOG_LEVEL=2`,

runnerProg,
`-q`,
`-w`,
`-k`,
`-config-server`, j.ConfigServer,
`-np`, strconv.Itoa(sp.ClusterSize),
`-H`, hl.String(),
`-port-range`, sp.WorkerPortRange.String(),
`-nic`, sp.Nic,
`-strategy`, j.Strategy.String(),
`-logdir`, j.LogDir,
}
if quiet {
Expand Down
8 changes: 6 additions & 2 deletions srcs/python/kungfu/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,9 @@ def _since_proc_start():


def _log_event(name):
d = _since_proc_start()
print('%s :: %s since proc started' % (name, show_duration(d)))
t0 = os.getenv('KUNGFU_PROC_START_TIMESTAMP') or '0'
t0 = int(t0)
t1 = time.time()
d = t1 - t0
# d = _since_proc_start()
print('TS=%f %s :: %s since proc started' % (t1, name, show_duration(d)))

0 comments on commit f101146

Please sign in to comment.