From f2670c3201f4c2bffca9f28a09c027104dafc50d Mon Sep 17 00:00:00 2001 From: "kkm (aka Kirill Katsnelson)" Date: Mon, 6 May 2019 22:35:57 -0700 Subject: [PATCH] [scripts] Add trainer option --trainer.optimization.num-jobs-step (#3205) --- egs/wsj/s5/steps/libs/nnet3/train/common.py | 69 ++++++++++++++++--- egs/wsj/s5/steps/nnet3/chain/e2e/train_e2e.py | 29 ++++---- egs/wsj/s5/steps/nnet3/chain/train.py | 25 ++++--- egs/wsj/s5/steps/nnet3/train_dnn.py | 25 ++++--- egs/wsj/s5/steps/nnet3/train_raw_dnn.py | 25 ++++--- egs/wsj/s5/steps/nnet3/train_raw_rnn.py | 25 ++++--- egs/wsj/s5/steps/nnet3/train_rnn.py | 28 ++++---- 7 files changed, 145 insertions(+), 81 deletions(-) diff --git a/egs/wsj/s5/steps/libs/nnet3/train/common.py b/egs/wsj/s5/steps/libs/nnet3/train/common.py index 1a038cc23f2..f230e12e96f 100644 --- a/egs/wsj/s5/steps/libs/nnet3/train/common.py +++ b/egs/wsj/s5/steps/libs/nnet3/train/common.py @@ -269,8 +269,7 @@ def validate_minibatch_size_str(minibatch_size_str): return False # check that the thing before the '=' sign is a positive integer try: - i = b[0] - if i <= 0: + if int(b[0]) <= 0: return False except: return False # not an integer at all. @@ -602,6 +601,16 @@ def get_model_combine_iters(num_iters, num_epochs, return models_to_combine +def get_current_num_jobs(it, num_it, start, step, end): + "Get number of jobs for iteration number 'it' of range('num_it')" + + ideal = float(start) + (end - start) * float(it) / num_it + if ideal < step: + return int(0.5 + ideal) + else: + return int(0.5 + ideal / step) * step + + def get_learning_rate(iter, num_jobs, num_iters, num_archives_processed, num_archives_to_process, initial_effective_lrate, final_effective_lrate): @@ -682,13 +691,11 @@ def remove_model(nnet_dir, iter, num_iters, models_to_combine=None, os.remove(file_name) -def self_test(): - assert halve_minibatch_size_str('64') == '32' - assert halve_minibatch_size_str('64,16:32') == '32,8:16' - assert halve_minibatch_size_str('1') == '1' - assert halve_minibatch_size_str('128=64/256=40,80:100') == '128=32/256=20,40:50' - assert validate_chunk_width('64') - assert validate_chunk_width('64,25,128') +def positive_int(arg): + val = int(arg) + if (val <= 0): + raise argparse.ArgumentTypeError("must be positive int: '%s'" % arg) + return val class CommonParser(object): @@ -845,6 +852,10 @@ def __init__(self, type=int, dest='num_jobs_final', default=8, help="Number of neural net jobs to run in " "parallel at the end of training") + self.parser.add_argument("--trainer.optimization.num-jobs-step", + type=positive_int, metavar='N', dest='num_jobs_step', default=1, + help="""Number of jobs increment, when exceeds this number. For + example, if N=3, the number of jobs may progress as 1, 2, 3, 6, 9...""") self.parser.add_argument("--trainer.optimization.max-models-combine", "--trainer.max-models-combine", type=int, dest='max_models_combine', @@ -983,5 +994,43 @@ def __init__(self, then only failure notifications are sent""") +import unittest + +class SelfTest(unittest.TestCase): + + def test_halve_minibatch_size_str(self): + self.assertEqual('32', halve_minibatch_size_str('64')) + self.assertEqual('32,8:16', halve_minibatch_size_str('64,16:32')) + self.assertEqual('1', halve_minibatch_size_str('1')) + self.assertEqual('128=32/256=20,40:50', halve_minibatch_size_str('128=64/256=40,80:100')) + + + def test_validate_chunk_width(self): + for s in [ '64', '64,25,128' ]: + self.assertTrue(validate_chunk_width(s), s) + + + def test_validate_minibatch_size_str(self): + # Good descriptors. + for s in [ '32', '32,64', '1:32', '1:32,64', '64,1:32', '1:5,10:15', + '128=64:128/256=32,64', '1=2/3=4', '1=1/2=2/3=3/4=4' ]: + self.assertTrue(validate_minibatch_size_str(s), s) + # Bad descriptors. + for s in [ None, 42, (43,), '', '1:', ':2', '3,', ',4', '5:6,', ',7:8', + '9=', '10=10/', '11=11/11', '12=1:2//13=1:3' '14=/15=15', + '16/17=17', '/18=18', '/18', '//19', '/' ]: + self.assertFalse(validate_minibatch_size_str(s), s) + + + def test_get_current_num_jobs(self): + niters = 12 + self.assertEqual([2, 3, 3, 4, 4, 5, 6, 6, 7, 7, 8, 8], + [get_current_num_jobs(i, niters, 2, 1, 9) + for i in range(niters)]) + self.assertEqual([2, 3, 3, 3, 3, 6, 6, 6, 6, 6, 9, 9], + [get_current_num_jobs(i, niters, 2, 3, 9) + for i in range(niters)]) + + if __name__ == '__main__': - _self_test() + unittest.main() diff --git a/egs/wsj/s5/steps/nnet3/chain/e2e/train_e2e.py b/egs/wsj/s5/steps/nnet3/chain/e2e/train_e2e.py index e96f2a10820..d5fa89f3ce0 100755 --- a/egs/wsj/s5/steps/nnet3/chain/e2e/train_e2e.py +++ b/egs/wsj/s5/steps/nnet3/chain/e2e/train_e2e.py @@ -202,11 +202,10 @@ def process_args(args): "--trainer.deriv-truncate-margin.".format( args.deriv_truncate_margin)) - if (not os.path.exists(args.dir) - or not os.path.exists(args.dir+"/configs")): - raise Exception("This scripts expects {0} to exist and have a configs " - "directory which is the output of " - "make_configs.py script") + if (not os.path.exists(args.dir + "/configs")): + raise Exception("This scripts expects the directory specified with " + "--dir={0} to exist and have a configs/ directory which " + "is the output of make_configs.py script".format(args.dir)) # set the options corresponding to args.use_gpu run_opts = common_train_lib.RunOpts() @@ -423,9 +422,10 @@ def train(args, run_opts): if (args.exit_stage is not None) and (iter == args.exit_stage): logger.info("Exiting early due to --exit-stage {0}".format(iter)) return - current_num_jobs = int(0.5 + args.num_jobs_initial - + (args.num_jobs_final - args.num_jobs_initial) - * float(iter) / num_iters) + + current_num_jobs = common_train_lib.get_current_num_jobs( + iter, num_iters, + args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final) if args.stage <= iter: model_file = "{dir}/{iter}.mdl".format(dir=args.dir, iter=iter) @@ -451,12 +451,13 @@ def train(args, run_opts): shrink_info_str = '' if shrinkage_value != 1.0: shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value) - logger.info("Iter: {0}/{1} " - "Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) " - "lr: {5:0.6f} {6}".format(iter, num_iters - 1, - epoch, args.num_epochs, - percent, - lrate, shrink_info_str)) + logger.info("Iter: {0}/{1} Jobs: {2} " + "Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) " + "lr: {6:0.6f} {7}".format(iter, num_iters - 1, + current_num_jobs, + epoch, args.num_epochs, + percent, + lrate, shrink_info_str)) chain_lib.train_one_iteration( dir=args.dir, diff --git a/egs/wsj/s5/steps/nnet3/chain/train.py b/egs/wsj/s5/steps/nnet3/chain/train.py index 40b65afe273..67cb9f90620 100755 --- a/egs/wsj/s5/steps/nnet3/chain/train.py +++ b/egs/wsj/s5/steps/nnet3/chain/train.py @@ -219,8 +219,9 @@ def process_args(args): args.deriv_truncate_margin)) if (not os.path.exists(args.dir)): - raise Exception("This script expects --dir={0} to exist.") - if (not os.path.exists(args.dir+"/configs") and + raise Exception("Directory specified with --dir={0} " + "does not exist.".format(args.dir)) + if (not os.path.exists(args.dir + "/configs") and (args.input_model is None or not os.path.exists(args.input_model))): raise Exception("Either --trainer.input-model option should be supplied, " "and exist; or the {0}/configs directory should exist." @@ -470,9 +471,10 @@ def train(args, run_opts): if (args.exit_stage is not None) and (iter == args.exit_stage): logger.info("Exiting early due to --exit-stage {0}".format(iter)) return - current_num_jobs = int(0.5 + args.num_jobs_initial - + (args.num_jobs_final - args.num_jobs_initial) - * float(iter) / num_iters) + + current_num_jobs = common_train_lib.get_current_num_jobs( + iter, num_iters, + args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final) if args.stage <= iter: model_file = "{dir}/{iter}.mdl".format(dir=args.dir, iter=iter) @@ -501,12 +503,13 @@ def train(args, run_opts): shrink_info_str = '' if shrinkage_value != 1.0: shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value) - logger.info("Iter: {0}/{1} " - "Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) " - "lr: {5:0.6f} {6}".format(iter, num_iters - 1, - epoch, args.num_epochs, - percent, - lrate, shrink_info_str)) + logger.info("Iter: {0}/{1} Jobs: {2} " + "Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) " + "lr: {6:0.6f} {7}".format(iter, num_iters - 1, + current_num_jobs, + epoch, args.num_epochs, + percent, + lrate, shrink_info_str)) chain_lib.train_one_iteration( dir=args.dir, diff --git a/egs/wsj/s5/steps/nnet3/train_dnn.py b/egs/wsj/s5/steps/nnet3/train_dnn.py index e72b29297a4..84817446b6e 100755 --- a/egs/wsj/s5/steps/nnet3/train_dnn.py +++ b/egs/wsj/s5/steps/nnet3/train_dnn.py @@ -117,8 +117,9 @@ def process_args(args): raise Exception("--trainer.rnn.num-chunk-per-minibatch has an invalid value") if (not os.path.exists(args.dir)): - raise Exception("This script expects --dir={0} to exist.") - if (not os.path.exists(args.dir+"/configs") and + raise Exception("Directory specified with --dir={0} " + "does not exist.".format(args.dir)) + if (not os.path.exists(args.dir + "/configs") and (args.input_model is None or not os.path.exists(args.input_model))): raise Exception("Either --trainer.input-model option should be supplied, " "and exist; or the {0}/configs directory should exist." @@ -321,9 +322,10 @@ def train(args, run_opts): if (args.exit_stage is not None) and (iter == args.exit_stage): logger.info("Exiting early due to --exit-stage {0}".format(iter)) return - current_num_jobs = int(0.5 + args.num_jobs_initial - + (args.num_jobs_final - args.num_jobs_initial) - * float(iter) / num_iters) + + current_num_jobs = common_train_lib.get_current_num_jobs( + iter, num_iters, + args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final) if args.stage <= iter: lrate = common_train_lib.get_learning_rate(iter, current_num_jobs, @@ -344,12 +346,13 @@ def train(args, run_opts): shrink_info_str = '' if shrinkage_value != 1.0: shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value) - logger.info("Iter: {0}/{1} " - "Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) " - "lr: {5:0.6f} {6}".format(iter, num_iters - 1, - epoch, args.num_epochs, - percent, - lrate, shrink_info_str)) + logger.info("Iter: {0}/{1} Jobs: {2} " + "Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) " + "lr: {6:0.6f} {7}".format(iter, num_iters - 1, + current_num_jobs, + epoch, args.num_epochs, + percent, + lrate, shrink_info_str)) train_lib.common.train_one_iteration( dir=args.dir, diff --git a/egs/wsj/s5/steps/nnet3/train_raw_dnn.py b/egs/wsj/s5/steps/nnet3/train_raw_dnn.py index ffccf443b99..af921048bb5 100755 --- a/egs/wsj/s5/steps/nnet3/train_raw_dnn.py +++ b/egs/wsj/s5/steps/nnet3/train_raw_dnn.py @@ -135,8 +135,9 @@ def process_args(args): raise Exception("--trainer.optimization.minibatch-size has an invalid value") if (not os.path.exists(args.dir)): - raise Exception("This script expects --dir={0} to exist.") - if (not os.path.exists(args.dir+"/configs") and + raise Exception("Directory specified with --dir={0} " + "does not exist.".format(args.dir)) + if (not os.path.exists(args.dir + "/configs") and (args.input_model is None or not os.path.exists(args.input_model))): raise Exception("Either --trainer.input-model option should be supplied, " "and exist; or the {0}/configs directory should exist." @@ -356,9 +357,10 @@ def train(args, run_opts): if (args.exit_stage is not None) and (iter == args.exit_stage): logger.info("Exiting early due to --exit-stage {0}".format(iter)) return - current_num_jobs = int(0.5 + args.num_jobs_initial - + (args.num_jobs_final - args.num_jobs_initial) - * float(iter) / num_iters) + + current_num_jobs = common_train_lib.get_current_num_jobs( + iter, num_iters, + args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final) if args.stage <= iter: lrate = common_train_lib.get_learning_rate(iter, current_num_jobs, @@ -380,12 +382,13 @@ def train(args, run_opts): shrink_info_str = '' if shrinkage_value != 1.0: shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value) - logger.info("Iter: {0}/{1} " - "Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) " - "lr: {5:0.6f} {6}".format(iter, num_iters - 1, - epoch, args.num_epochs, - percent, - lrate, shrink_info_str)) + logger.info("Iter: {0}/{1} Jobs: {2} " + "Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) " + "lr: {6:0.6f} {7}".format(iter, num_iters - 1, + current_num_jobs, + epoch, args.num_epochs, + percent, + lrate, shrink_info_str)) train_lib.common.train_one_iteration( dir=args.dir, diff --git a/egs/wsj/s5/steps/nnet3/train_raw_rnn.py b/egs/wsj/s5/steps/nnet3/train_raw_rnn.py index c704b0725d3..b2d55ac20e7 100755 --- a/egs/wsj/s5/steps/nnet3/train_raw_rnn.py +++ b/egs/wsj/s5/steps/nnet3/train_raw_rnn.py @@ -181,8 +181,9 @@ def process_args(args): raise Exception("--egs.chunk-right-context should be non-negative") if (not os.path.exists(args.dir)): - raise Exception("This script expects --dir={0} to exist.") - if (not os.path.exists(args.dir+"/configs") and + raise Exception("Directory specified with --dir={0} " + "does not exist.".format(args.dir)) + if (not os.path.exists(args.dir + "/configs") and (args.input_model is None or not os.path.exists(args.input_model))): raise Exception("Either --trainer.input-model option should be supplied, " "and exist; or the {0}/configs directory should exist." @@ -411,9 +412,10 @@ def train(args, run_opts): if (args.exit_stage is not None) and (iter == args.exit_stage): logger.info("Exiting early due to --exit-stage {0}".format(iter)) return - current_num_jobs = int(0.5 + args.num_jobs_initial - + (args.num_jobs_final - args.num_jobs_initial) - * float(iter) / num_iters) + + current_num_jobs = common_train_lib.get_current_num_jobs( + iter, num_iters, + args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final) if args.stage <= iter: model_file = "{dir}/{iter}.raw".format(dir=args.dir, iter=iter) @@ -445,12 +447,13 @@ def train(args, run_opts): shrink_info_str = '' if shrinkage_value != 1.0: shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value) - logger.info("Iter: {0}/{1} " - "Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) " - "lr: {5:0.6f} {6}".format(iter, num_iters - 1, - epoch, args.num_epochs, - percent, - lrate, shrink_info_str)) + logger.info("Iter: {0}/{1} Jobs: {2} " + "Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) " + "lr: {6:0.6f} {7}".format(iter, num_iters - 1, + current_num_jobs, + epoch, args.num_epochs, + percent, + lrate, shrink_info_str)) train_lib.common.train_one_iteration( dir=args.dir, diff --git a/egs/wsj/s5/steps/nnet3/train_rnn.py b/egs/wsj/s5/steps/nnet3/train_rnn.py index ab2aa0c4d8d..6ed7197c22b 100755 --- a/egs/wsj/s5/steps/nnet3/train_rnn.py +++ b/egs/wsj/s5/steps/nnet3/train_rnn.py @@ -172,12 +172,12 @@ def process_args(args): raise Exception("--egs.chunk-right-context should be non-negative") if (not os.path.exists(args.dir)): - raise Exception("This script expects --dir={0} to exist.") - - if (not os.path.exists(args.dir+"/configs") and + raise Exception("Directory specified with --dir={0} " + "does not exist.".format(args.dir)) + if (not os.path.exists(args.dir + "/configs") and (args.input_model is None or not os.path.exists(args.input_model))): raise Exception("Either --trainer.input-model option should be supplied, " - "and exist; or the {0}/configs directory should exist." + "and exist; or the {0}/configs directory should exist. " "{0}/configs is the output of make_configs.py" "".format(args.dir)) @@ -396,9 +396,10 @@ def train(args, run_opts): if (args.exit_stage is not None) and (iter == args.exit_stage): logger.info("Exiting early due to --exit-stage {0}".format(iter)) return - current_num_jobs = int(0.5 + args.num_jobs_initial - + (args.num_jobs_final - args.num_jobs_initial) - * float(iter) / num_iters) + + current_num_jobs = common_train_lib.get_current_num_jobs( + iter, num_iters, + args.num_jobs_initial, args.num_jobs_step, args.num_jobs_final) if args.stage <= iter: model_file = "{dir}/{iter}.mdl".format(dir=args.dir, iter=iter) @@ -428,12 +429,13 @@ def train(args, run_opts): shrink_info_str = '' if shrinkage_value != 1.0: shrink_info_str = 'shrink: {0:0.5f}'.format(shrinkage_value) - logger.info("Iter: {0}/{1} " - "Epoch: {2:0.2f}/{3:0.1f} ({4:0.1f}% complete) " - "lr: {5:0.6f} {6}".format(iter, num_iters - 1, - epoch, args.num_epochs, - percent, - lrate, shrink_info_str)) + logger.info("Iter: {0}/{1} Jobs: {2} " + "Epoch: {3:0.2f}/{4:0.1f} ({5:0.1f}% complete) " + "lr: {6:0.6f} {7}".format(iter, num_iters - 1, + current_num_jobs, + epoch, args.num_epochs, + percent, + lrate, shrink_info_str)) train_lib.common.train_one_iteration( dir=args.dir,