Skip to content

Commit

Permalink
[scripts] Add trainer option --trainer.optimization.num-jobs-step (#3205
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kkm (aka Kirill Katsnelson) authored and danpovey committed May 7, 2019
1 parent 155c658 commit f2670c3
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 81 deletions.
69 changes: 59 additions & 10 deletions egs/wsj/s5/steps/libs/nnet3/train/common.py
Expand Up @@ -269,8 +269,7 @@ def validate_minibatch_size_str(minibatch_size_str):
return False
# check that the thing before the '=' sign is a positive integer
try:
i = b[0]
if i <= 0:
if int(b[0]) <= 0:
return False
except:
return False # not an integer at all.
Expand Down Expand Up @@ -602,6 +601,16 @@ def get_model_combine_iters(num_iters, num_epochs,
return models_to_combine


def get_current_num_jobs(it, num_it, start, step, end):
"Get number of jobs for iteration number 'it' of range('num_it')"

ideal = float(start) + (end - start) * float(it) / num_it
if ideal < step:
return int(0.5 + ideal)
else:
return int(0.5 + ideal / step) * step


def get_learning_rate(iter, num_jobs, num_iters, num_archives_processed,
num_archives_to_process,
initial_effective_lrate, final_effective_lrate):
Expand Down Expand Up @@ -682,13 +691,11 @@ def remove_model(nnet_dir, iter, num_iters, models_to_combine=None,
os.remove(file_name)


def self_test():
assert halve_minibatch_size_str('64') == '32'
assert halve_minibatch_size_str('64,16:32') == '32,8:16'
assert halve_minibatch_size_str('1') == '1'
assert halve_minibatch_size_str('128=64/256=40,80:100') == '128=32/256=20,40:50'
assert validate_chunk_width('64')
assert validate_chunk_width('64,25,128')
def positive_int(arg):
val = int(arg)
if (val <= 0):
raise argparse.ArgumentTypeError("must be positive int: '%s'" % arg)
return val


class CommonParser(object):
Expand Down Expand Up @@ -845,6 +852,10 @@ def __init__(self,
type=int, dest='num_jobs_final', default=8,
help="Number of neural net jobs to run in "
"parallel at the end of training")
self.parser.add_argument("--trainer.optimization.num-jobs-step",
type=positive_int, metavar='N', dest='num_jobs_step', default=1,
help="""Number of jobs increment, when exceeds this number. For
example, if N=3, the number of jobs may progress as 1, 2, 3, 6, 9...""")
self.parser.add_argument("--trainer.optimization.max-models-combine",
"--trainer.max-models-combine",
type=int, dest='max_models_combine',
Expand Down Expand Up @@ -983,5 +994,43 @@ def __init__(self,
then only failure notifications are sent""")


import unittest

class SelfTest(unittest.TestCase):

def test_halve_minibatch_size_str(self):
self.assertEqual('32', halve_minibatch_size_str('64'))
self.assertEqual('32,8:16', halve_minibatch_size_str('64,16:32'))
self.assertEqual('1', halve_minibatch_size_str('1'))
self.assertEqual('128=32/256=20,40:50', halve_minibatch_size_str('128=64/256=40,80:100'))


def test_validate_chunk_width(self):
for s in [ '64', '64,25,128' ]:
self.assertTrue(validate_chunk_width(s), s)


def test_validate_minibatch_size_str(self):
# Good descriptors.
for s in [ '32', '32,64', '1:32', '1:32,64', '64,1:32', '1:5,10:15',
'128=64:128/256=32,64', '1=2/3=4', '1=1/2=2/3=3/4=4' ]:
self.assertTrue(validate_minibatch_size_str(s), s)
# Bad descriptors.
for s in [ None, 42, (43,), '', '1:', ':2', '3,', ',4', '5:6,', ',7:8',
'9=', '10=10/', '11=11/11', '12=1:2//13=1:3' '14=/15=15',
'16/17=17', '/18=18', '/18', '//19', '/' ]:
self.assertFalse(validate_minibatch_size_str(s), s)


def test_get_current_num_jobs(self):
niters = 12
self.assertEqual([2, 3, 3, 4, 4, 5, 6, 6, 7, 7, 8, 8],
[get_current_num_jobs(i, niters, 2, 1, 9)
for i in range(niters)])
self.assertEqual([2, 3, 3, 3, 3, 6, 6, 6, 6, 6, 9, 9],
[get_current_num_jobs(i, niters, 2, 3, 9)
for i in range(niters)])


if __name__ == '__main__':
_self_test()
unittest.main()
29 changes: 15 additions & 14 deletions egs/wsj/s5/steps/nnet3/chain/e2e/train_e2e.py
Expand Up @@ -202,11 +202,10 @@ def process_args(args):
"--trainer.deriv-truncate-margin.".format(
args.deriv_truncate_margin))

if (not os.path.exists(args.dir)
or not os.path.exists(args.dir+"/configs")):
raise Exception("This scripts expects {0} to exist and have a configs "
"directory which is the output of "
"make_configs.py script")
if (not os.path.exists(args.dir + "/configs")):
raise Exception("This scripts expects the directory specified with "
"--dir={0} to exist and have a configs/ directory which "
"is the output of make_configs.py script".format(args.dir))

# set the options corresponding to args.use_gpu
run_opts = common_train_lib.RunOpts()
Expand Down Expand Up @@ -423,9 +422,10 @@ def train(args, run_opts):
if (args.exit_stage is not None) and (iter == args.exit_stage):
logger.info("Exiting early due to --exit-stage {0}".format(iter))
return
current_num_jobs = int(0.5 + args.num_jobs_initial
+ (args.num_jobs_final - args.num_jobs_initial)
* float(iter) / num_iters)

current_num_jobs = common_train_lib.get_current_num_jobs(
iter, num_iters,
args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final)

if args.stage <= iter:
model_file = "{dir}/{iter}.mdl".format(dir=args.dir, iter=iter)
Expand All @@ -451,12 +451,13 @@ def train(args, run_opts):
shrink_info_str = ''
if shrinkage_value != 1.0:
shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value)
logger.info("Iter: {0}/{1} "
"Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) "
"lr: {5:0.6f} {6}".format(iter, num_iters - 1,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))
logger.info("Iter: {0}/{1} Jobs: {2} "
"Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) "
"lr: {6:0.6f} {7}".format(iter, num_iters - 1,
current_num_jobs,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))

chain_lib.train_one_iteration(
dir=args.dir,
Expand Down
25 changes: 14 additions & 11 deletions egs/wsj/s5/steps/nnet3/chain/train.py
Expand Up @@ -219,8 +219,9 @@ def process_args(args):
args.deriv_truncate_margin))

if (not os.path.exists(args.dir)):
raise Exception("This script expects --dir={0} to exist.")
if (not os.path.exists(args.dir+"/configs") and
raise Exception("Directory specified with --dir={0} "
"does not exist.".format(args.dir))
if (not os.path.exists(args.dir + "/configs") and
(args.input_model is None or not os.path.exists(args.input_model))):
raise Exception("Either --trainer.input-model option should be supplied, "
"and exist; or the {0}/configs directory should exist."
Expand Down Expand Up @@ -470,9 +471,10 @@ def train(args, run_opts):
if (args.exit_stage is not None) and (iter == args.exit_stage):
logger.info("Exiting early due to --exit-stage {0}".format(iter))
return
current_num_jobs = int(0.5 + args.num_jobs_initial
+ (args.num_jobs_final - args.num_jobs_initial)
* float(iter) / num_iters)

current_num_jobs = common_train_lib.get_current_num_jobs(
iter, num_iters,
args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final)

if args.stage <= iter:
model_file = "{dir}/{iter}.mdl".format(dir=args.dir, iter=iter)
Expand Down Expand Up @@ -501,12 +503,13 @@ def train(args, run_opts):
shrink_info_str = ''
if shrinkage_value != 1.0:
shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value)
logger.info("Iter: {0}/{1} "
"Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) "
"lr: {5:0.6f} {6}".format(iter, num_iters - 1,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))
logger.info("Iter: {0}/{1} Jobs: {2} "
"Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) "
"lr: {6:0.6f} {7}".format(iter, num_iters - 1,
current_num_jobs,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))

chain_lib.train_one_iteration(
dir=args.dir,
Expand Down
25 changes: 14 additions & 11 deletions egs/wsj/s5/steps/nnet3/train_dnn.py
Expand Up @@ -117,8 +117,9 @@ def process_args(args):
raise Exception("--trainer.rnn.num-chunk-per-minibatch has an invalid value")

if (not os.path.exists(args.dir)):
raise Exception("This script expects --dir={0} to exist.")
if (not os.path.exists(args.dir+"/configs") and
raise Exception("Directory specified with --dir={0} "
"does not exist.".format(args.dir))
if (not os.path.exists(args.dir + "/configs") and
(args.input_model is None or not os.path.exists(args.input_model))):
raise Exception("Either --trainer.input-model option should be supplied, "
"and exist; or the {0}/configs directory should exist."
Expand Down Expand Up @@ -321,9 +322,10 @@ def train(args, run_opts):
if (args.exit_stage is not None) and (iter == args.exit_stage):
logger.info("Exiting early due to --exit-stage {0}".format(iter))
return
current_num_jobs = int(0.5 + args.num_jobs_initial
+ (args.num_jobs_final - args.num_jobs_initial)
* float(iter) / num_iters)

current_num_jobs = common_train_lib.get_current_num_jobs(
iter, num_iters,
args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final)

if args.stage <= iter:
lrate = common_train_lib.get_learning_rate(iter, current_num_jobs,
Expand All @@ -344,12 +346,13 @@ def train(args, run_opts):
shrink_info_str = ''
if shrinkage_value != 1.0:
shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value)
logger.info("Iter: {0}/{1} "
"Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) "
"lr: {5:0.6f} {6}".format(iter, num_iters - 1,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))
logger.info("Iter: {0}/{1} Jobs: {2} "
"Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) "
"lr: {6:0.6f} {7}".format(iter, num_iters - 1,
current_num_jobs,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))

train_lib.common.train_one_iteration(
dir=args.dir,
Expand Down
25 changes: 14 additions & 11 deletions egs/wsj/s5/steps/nnet3/train_raw_dnn.py
Expand Up @@ -135,8 +135,9 @@ def process_args(args):
raise Exception("--trainer.optimization.minibatch-size has an invalid value")

if (not os.path.exists(args.dir)):
raise Exception("This script expects --dir={0} to exist.")
if (not os.path.exists(args.dir+"/configs") and
raise Exception("Directory specified with --dir={0} "
"does not exist.".format(args.dir))
if (not os.path.exists(args.dir + "/configs") and
(args.input_model is None or not os.path.exists(args.input_model))):
raise Exception("Either --trainer.input-model option should be supplied, "
"and exist; or the {0}/configs directory should exist."
Expand Down Expand Up @@ -356,9 +357,10 @@ def train(args, run_opts):
if (args.exit_stage is not None) and (iter == args.exit_stage):
logger.info("Exiting early due to --exit-stage {0}".format(iter))
return
current_num_jobs = int(0.5 + args.num_jobs_initial
+ (args.num_jobs_final - args.num_jobs_initial)
* float(iter) / num_iters)

current_num_jobs = common_train_lib.get_current_num_jobs(
iter, num_iters,
args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final)

if args.stage <= iter:
lrate = common_train_lib.get_learning_rate(iter, current_num_jobs,
Expand All @@ -380,12 +382,13 @@ def train(args, run_opts):
shrink_info_str = ''
if shrinkage_value != 1.0:
shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value)
logger.info("Iter: {0}/{1} "
"Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) "
"lr: {5:0.6f} {6}".format(iter, num_iters - 1,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))
logger.info("Iter: {0}/{1} Jobs: {2} "
"Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) "
"lr: {6:0.6f} {7}".format(iter, num_iters - 1,
current_num_jobs,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))

train_lib.common.train_one_iteration(
dir=args.dir,
Expand Down
25 changes: 14 additions & 11 deletions egs/wsj/s5/steps/nnet3/train_raw_rnn.py
Expand Up @@ -181,8 +181,9 @@ def process_args(args):
raise Exception("--egs.chunk-right-context should be non-negative")

if (not os.path.exists(args.dir)):
raise Exception("This script expects --dir={0} to exist.")
if (not os.path.exists(args.dir+"/configs") and
raise Exception("Directory specified with --dir={0} "
"does not exist.".format(args.dir))
if (not os.path.exists(args.dir + "/configs") and
(args.input_model is None or not os.path.exists(args.input_model))):
raise Exception("Either --trainer.input-model option should be supplied, "
"and exist; or the {0}/configs directory should exist."
Expand Down Expand Up @@ -411,9 +412,10 @@ def train(args, run_opts):
if (args.exit_stage is not None) and (iter == args.exit_stage):
logger.info("Exiting early due to --exit-stage {0}".format(iter))
return
current_num_jobs = int(0.5 + args.num_jobs_initial
+ (args.num_jobs_final - args.num_jobs_initial)
* float(iter) / num_iters)

current_num_jobs = common_train_lib.get_current_num_jobs(
iter, num_iters,
args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final)

if args.stage <= iter:
model_file = "{dir}/{iter}.raw".format(dir=args.dir, iter=iter)
Expand Down Expand Up @@ -445,12 +447,13 @@ def train(args, run_opts):
shrink_info_str = ''
if shrinkage_value != 1.0:
shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value)
logger.info("Iter: {0}/{1} "
"Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) "
"lr: {5:0.6f} {6}".format(iter, num_iters - 1,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))
logger.info("Iter: {0}/{1} Jobs: {2} "
"Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) "
"lr: {6:0.6f} {7}".format(iter, num_iters - 1,
current_num_jobs,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))

train_lib.common.train_one_iteration(
dir=args.dir,
Expand Down
28 changes: 15 additions & 13 deletions egs/wsj/s5/steps/nnet3/train_rnn.py
Expand Up @@ -172,12 +172,12 @@ def process_args(args):
raise Exception("--egs.chunk-right-context should be non-negative")

if (not os.path.exists(args.dir)):
raise Exception("This script expects --dir={0} to exist.")

if (not os.path.exists(args.dir+"/configs") and
raise Exception("Directory specified with --dir={0} "
"does not exist.".format(args.dir))
if (not os.path.exists(args.dir + "/configs") and
(args.input_model is None or not os.path.exists(args.input_model))):
raise Exception("Either --trainer.input-model option should be supplied, "
"and exist; or the {0}/configs directory should exist."
"and exist; or the {0}/configs directory should exist. "
"{0}/configs is the output of make_configs.py"
"".format(args.dir))

Expand Down Expand Up @@ -396,9 +396,10 @@ def train(args, run_opts):
if (args.exit_stage is not None) and (iter == args.exit_stage):
logger.info("Exiting early due to --exit-stage {0}".format(iter))
return
current_num_jobs = int(0.5 + args.num_jobs_initial
+ (args.num_jobs_final - args.num_jobs_initial)
* float(iter) / num_iters)

current_num_jobs = common_train_lib.get_current_num_jobs(
iter, num_iters,
args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final)

if args.stage <= iter:
model_file = "{dir}/{iter}.mdl".format(dir=args.dir, iter=iter)
Expand Down Expand Up @@ -428,12 +429,13 @@ def train(args, run_opts):
shrink_info_str = ''
if shrinkage_value != 1.0:
shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value)
logger.info("Iter: {0}/{1} "
"Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) "
"lr: {5:0.6f} {6}".format(iter, num_iters - 1,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))
logger.info("Iter: {0}/{1} Jobs: {2} "
"Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) "
"lr: {6:0.6f} {7}".format(iter, num_iters - 1,
current_num_jobs,
epoch, args.num_epochs,
percent,
lrate, shrink_info_str))

train_lib.common.train_one_iteration(
dir=args.dir,
Expand Down

0 comments on commit f2670c3

Please sign in to comment.