From 6a4d1e11e5748c818c2d9a50430530b68455d052 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 31 May 2019 17:32:36 +0800 Subject: [PATCH 01/19] Refactor multiphase --- src/sdk/pynni/nni/batch_tuner/batch_tuner.py | 4 +- src/sdk/pynni/nni/common.py | 7 + .../nni/evolution_tuner/evolution_tuner.py | 4 +- .../nni/gridsearch_tuner/gridsearch_tuner.py | 4 +- .../nni/hyperopt_tuner/hyperopt_tuner.py | 4 +- src/sdk/pynni/nni/metis_tuner/metis_tuner.py | 4 +- src/sdk/pynni/nni/msg_dispatcher.py | 30 ++- src/sdk/pynni/nni/multi_phase/__init__.py | 0 .../nni/multi_phase/multi_phase_dispatcher.py | 198 ------------------ .../nni/multi_phase/multi_phase_tuner.py | 106 ---------- .../networkmorphism_tuner.py | 4 +- src/sdk/pynni/nni/smac_tuner/smac_tuner.py | 6 +- src/sdk/pynni/nni/tuner.py | 12 +- 13 files changed, 51 insertions(+), 332 deletions(-) delete mode 100644 src/sdk/pynni/nni/multi_phase/__init__.py delete mode 100644 src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py delete mode 100644 src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py diff --git a/src/sdk/pynni/nni/batch_tuner/batch_tuner.py b/src/sdk/pynni/nni/batch_tuner/batch_tuner.py index 8e08fb3f82..4342c89ab1 100644 --- a/src/sdk/pynni/nni/batch_tuner/batch_tuner.py +++ b/src/sdk/pynni/nni/batch_tuner/batch_tuner.py @@ -78,7 +78,7 @@ def update_search_space(self, search_space): """ self.values = self.is_valid(search_space) - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): """Returns a dict of trial (hyper-)parameters, as a serializable object. Parameters @@ -90,7 +90,7 @@ def generate_parameters(self, parameter_id): raise nni.NoMoreTrialError('no more parameters now.') return self.values[self.count] - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): pass def import_data(self, data): diff --git a/src/sdk/pynni/nni/common.py b/src/sdk/pynni/nni/common.py index 03c02a9725..02565ba062 100644 --- a/src/sdk/pynni/nni/common.py +++ b/src/sdk/pynni/nni/common.py @@ -76,3 +76,10 @@ def enable_multi_thread(): def multi_thread_enabled(): return _multi_thread + +def enable_multi_phase(): + global _multi_phase + _multi_phase = True + +def multi_phase_enabled(): + return _multi_phase diff --git a/src/sdk/pynni/nni/evolution_tuner/evolution_tuner.py b/src/sdk/pynni/nni/evolution_tuner/evolution_tuner.py index 8caf9c4a59..995322b21b 100644 --- a/src/sdk/pynni/nni/evolution_tuner/evolution_tuner.py +++ b/src/sdk/pynni/nni/evolution_tuner/evolution_tuner.py @@ -188,7 +188,7 @@ def update_search_space(self, search_space): self.searchspace_json, is_rand, self.random_state) self.population.append(Individual(config=config)) - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): """Returns a dict of trial (hyper-)parameters, as a serializable object. Parameters @@ -232,7 +232,7 @@ def generate_parameters(self, parameter_id): config = split_index(total_config) return config - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): '''Record the result from a trial Parameters diff --git a/src/sdk/pynni/nni/gridsearch_tuner/gridsearch_tuner.py b/src/sdk/pynni/nni/gridsearch_tuner/gridsearch_tuner.py index fc5b520168..98c43d1cc6 100644 --- a/src/sdk/pynni/nni/gridsearch_tuner/gridsearch_tuner.py +++ b/src/sdk/pynni/nni/gridsearch_tuner/gridsearch_tuner.py @@ -137,7 +137,7 @@ def update_search_space(self, search_space): ''' self.expanded_search_space = self.json2parameter(search_space) - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): self.count += 1 while (self.count <= len(self.expanded_search_space)-1): _params_tuple = convert_dict2tuple(self.expanded_search_space[self.count]) @@ -147,7 +147,7 @@ def generate_parameters(self, parameter_id): return self.expanded_search_space[self.count] raise nni.NoMoreTrialError('no more parameters now.') - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): pass def import_data(self, data): diff --git a/src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py b/src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py index 0b203a8e73..a1f3bc37d5 100644 --- a/src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py +++ b/src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py @@ -248,7 +248,7 @@ def update_search_space(self, search_space): verbose=0) self.rval.catch_eval_exceptions = False - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): """ Returns a set of trial (hyper-)parameters, as a serializable object. @@ -269,7 +269,7 @@ def generate_parameters(self, parameter_id): params = split_index(total_params) return params - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): """ Record an observation of the objective function diff --git a/src/sdk/pynni/nni/metis_tuner/metis_tuner.py b/src/sdk/pynni/nni/metis_tuner/metis_tuner.py index dd1e273280..95f32a7c92 100644 --- a/src/sdk/pynni/nni/metis_tuner/metis_tuner.py +++ b/src/sdk/pynni/nni/metis_tuner/metis_tuner.py @@ -174,7 +174,7 @@ def _pack_output(self, init_parameter): return output - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): """Generate next parameter for trial If the number of trial result is lower than cold start number, metis will first random generate some parameters. @@ -205,7 +205,7 @@ def generate_parameters(self, parameter_id): return results - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): """Tuner receive result from trial. Parameters diff --git a/src/sdk/pynni/nni/msg_dispatcher.py b/src/sdk/pynni/nni/msg_dispatcher.py index dfb3d60af7..eb337c7344 100644 --- a/src/sdk/pynni/nni/msg_dispatcher.py +++ b/src/sdk/pynni/nni/msg_dispatcher.py @@ -18,7 +18,6 @@ # OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # ================================================================================================== -import os import logging from collections import defaultdict import json_tricks @@ -26,7 +25,7 @@ from .protocol import CommandType, send from .msg_dispatcher_base import MsgDispatcherBase from .assessor import AssessResult -from .common import multi_thread_enabled +from .common import multi_thread_enabled, multi_phase_enabled from .env_vars import dispatcher_env_vars _logger = logging.getLogger(__name__) @@ -61,13 +60,19 @@ def _create_parameter_id(): _next_parameter_id += 1 return _next_parameter_id - 1 -def _pack_parameter(parameter_id, params, customized=False): +def _pack_parameter(parameter_id, params, customized=False, trial_job_id=None, parameter_index=None): _trial_params[parameter_id] = params ret = { 'parameter_id': parameter_id, 'parameter_source': 'customized' if customized else 'algorithm', 'parameters': params } + if trial_job_id is not None: + ret['trial_job_id'] = trial_job_id + if parameter_index is not None: + ret['parameter_index'] = parameter_index + else: + ret['parameter_index'] = 0 return json_tricks.dumps(ret) class MsgDispatcher(MsgDispatcherBase): @@ -133,8 +138,13 @@ def handle_report_metric_data(self, data): elif data['type'] == 'PERIODICAL': if self.assessor is not None: self._handle_intermediate_metric_data(data) - else: - pass + elif data['type'] == 'REQUEST_PARAMETER': + assert multi_phase_enabled() + assert data['trial_job_id'] is not None + assert data['parameter_index'] is not None + param_id = _create_parameter_id() + param = self.tuner.generate_parameters(param_id, trial_job_id=data['trial_job_id']) + send(CommandType.SendTrialJobParameter, _pack_parameter(param_id, param, trial_job_id=data['trial_job_id'], parameter_index=data['parameter_index'])) else: raise ValueError('Data type not supported: {}'.format(data['type'])) @@ -160,9 +170,15 @@ def _handle_final_metric_data(self, data): id_ = data['parameter_id'] value = data['value'] if id_ in _customized_parameter_ids: - self.tuner.receive_customized_trial_result(id_, _trial_params[id_], value) + if multi_phase_enabled(): + self.tuner.receive_customized_trial_result(id_, _trial_params[id_], value, trial_job_id=data['trial_job_id']) + else: + self.tuner.receive_customized_trial_result(id_, _trial_params[id_], value) else: - self.tuner.receive_trial_result(id_, _trial_params[id_], value) + if multi_phase_enabled(): + self.tuner.receive_trial_result(id_, _trial_params[id_], value, trial_job_id=data['trial_job_id']) + else: + self.tuner.receive_trial_result(id_, _trial_params[id_], value) def _handle_intermediate_metric_data(self, data): """Call assessor to process intermediate results diff --git a/src/sdk/pynni/nni/multi_phase/__init__.py b/src/sdk/pynni/nni/multi_phase/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py b/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py deleted file mode 100644 index a4a6dbecbe..0000000000 --- a/src/sdk/pynni/nni/multi_phase/multi_phase_dispatcher.py +++ /dev/null @@ -1,198 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and -# associated documentation files (the "Software"), to deal in the Software without restriction, -# including without limitation the rights to use, copy, modify, merge, publish, distribute, -# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all copies or -# substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT -# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT -# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -# ================================================================================================== - -import logging -from collections import defaultdict -import json_tricks - -from nni.protocol import CommandType, send -from nni.msg_dispatcher_base import MsgDispatcherBase -from nni.assessor import AssessResult - -_logger = logging.getLogger(__name__) - -# Assessor global variables -_trial_history = defaultdict(dict) -'''key: trial job ID; value: intermediate results, mapping from sequence number to data''' - -_ended_trials = set() -'''trial_job_id of all ended trials. -We need this because NNI manager may send metrics after reporting a trial ended. -TODO: move this logic to NNI manager -''' - -def _sort_history(history): - ret = [ ] - for i, _ in enumerate(history): - if i in history: - ret.append(history[i]) - else: - break - return ret - -# Tuner global variables -_next_parameter_id = 0 -_trial_params = {} -'''key: trial job ID; value: parameters''' -_customized_parameter_ids = set() - -def _create_parameter_id(): - global _next_parameter_id # pylint: disable=global-statement - _next_parameter_id += 1 - return _next_parameter_id - 1 - -def _pack_parameter(parameter_id, params, customized=False, trial_job_id=None, parameter_index=None): - _trial_params[parameter_id] = params - ret = { - 'parameter_id': parameter_id, - 'parameter_source': 'customized' if customized else 'algorithm', - 'parameters': params - } - if trial_job_id is not None: - ret['trial_job_id'] = trial_job_id - if parameter_index is not None: - ret['parameter_index'] = parameter_index - else: - ret['parameter_index'] = 0 - return json_tricks.dumps(ret) - -class MultiPhaseMsgDispatcher(MsgDispatcherBase): - def __init__(self, tuner, assessor=None): - super(MultiPhaseMsgDispatcher, self).__init__() - self.tuner = tuner - self.assessor = assessor - if assessor is None: - _logger.debug('Assessor is not configured') - - def load_checkpoint(self): - self.tuner.load_checkpoint() - if self.assessor is not None: - self.assessor.load_checkpoint() - - def save_checkpoint(self): - self.tuner.save_checkpoint() - if self.assessor is not None: - self.assessor.save_checkpoint() - - def handle_initialize(self, data): - ''' - data is search space - ''' - self.tuner.update_search_space(data) - send(CommandType.Initialized, '') - return True - - def handle_request_trial_jobs(self, data): - # data: number or trial jobs - ids = [_create_parameter_id() for _ in range(data)] - params_list = self.tuner.generate_multiple_parameters(ids) - assert len(ids) == len(params_list) - for i, _ in enumerate(ids): - send(CommandType.NewTrialJob, _pack_parameter(ids[i], params_list[i])) - return True - - def handle_update_search_space(self, data): - self.tuner.update_search_space(data) - return True - - def handle_import_data(self, data): - """import additional data for tuning - data: a list of dictionarys, each of which has at least two keys, 'parameter' and 'value' - """ - self.tuner.import_data(data) - return True - - def handle_add_customized_trial(self, data): - # data: parameters - id_ = _create_parameter_id() - _customized_parameter_ids.add(id_) - send(CommandType.NewTrialJob, _pack_parameter(id_, data, customized=True)) - return True - - def handle_report_metric_data(self, data): - trial_job_id = data['trial_job_id'] - if data['type'] == 'FINAL': - id_ = data['parameter_id'] - if id_ in _customized_parameter_ids: - self.tuner.receive_customized_trial_result(id_, _trial_params[id_], data['value'], trial_job_id) - else: - self.tuner.receive_trial_result(id_, _trial_params[id_], data['value'], trial_job_id) - elif data['type'] == 'PERIODICAL': - if self.assessor is not None: - self._handle_intermediate_metric_data(data) - else: - pass - elif data['type'] == 'REQUEST_PARAMETER': - assert data['trial_job_id'] is not None - assert data['parameter_index'] is not None - param_id = _create_parameter_id() - param = self.tuner.generate_parameters(param_id, trial_job_id) - send(CommandType.SendTrialJobParameter, _pack_parameter(param_id, param, trial_job_id=data['trial_job_id'], parameter_index=data['parameter_index'])) - else: - raise ValueError('Data type not supported: {}'.format(data['type'])) - - return True - - def handle_trial_end(self, data): - trial_job_id = data['trial_job_id'] - _ended_trials.add(trial_job_id) - if trial_job_id in _trial_history: - _trial_history.pop(trial_job_id) - if self.assessor is not None: - self.assessor.trial_end(trial_job_id, data['event'] == 'SUCCEEDED') - if self.tuner is not None: - self.tuner.trial_end(json_tricks.loads(data['hyper_params'])['parameter_id'], data['event'] == 'SUCCEEDED', trial_job_id) - return True - - def handle_import_data(self, data): - pass - - def _handle_intermediate_metric_data(self, data): - if data['type'] != 'PERIODICAL': - return True - if self.assessor is None: - return True - - trial_job_id = data['trial_job_id'] - if trial_job_id in _ended_trials: - return True - - history = _trial_history[trial_job_id] - history[data['sequence']] = data['value'] - ordered_history = _sort_history(history) - if len(ordered_history) < data['sequence']: # no user-visible update since last time - return True - - try: - result = self.assessor.assess_trial(trial_job_id, ordered_history) - except Exception as e: - _logger.exception('Assessor error') - - if isinstance(result, bool): - result = AssessResult.Good if result else AssessResult.Bad - elif not isinstance(result, AssessResult): - msg = 'Result of Assessor.assess_trial must be an object of AssessResult, not %s' - raise RuntimeError(msg % type(result)) - - if result is AssessResult.Bad: - _logger.debug('BAD, kill %s', trial_job_id) - send(CommandType.KillTrialJob, json_tricks.dumps(trial_job_id)) - else: - _logger.debug('GOOD') diff --git a/src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py b/src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py deleted file mode 100644 index e777dbe095..0000000000 --- a/src/sdk/pynni/nni/multi_phase/multi_phase_tuner.py +++ /dev/null @@ -1,106 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and -# associated documentation files (the "Software"), to deal in the Software without restriction, -# including without limitation the rights to use, copy, modify, merge, publish, distribute, -# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all copies or -# substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT -# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT -# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -# ================================================================================================== - - -import logging - -from nni.recoverable import Recoverable - -_logger = logging.getLogger(__name__) - - -class MultiPhaseTuner(Recoverable): - # pylint: disable=no-self-use,unused-argument - - def generate_parameters(self, parameter_id, trial_job_id=None): - """Returns a set of trial (hyper-)parameters, as a serializable object. - User code must override either this function or 'generate_multiple_parameters()'. - parameter_id: identifier of the parameter (int) - """ - raise NotImplementedError('Tuner: generate_parameters not implemented') - - def generate_multiple_parameters(self, parameter_id_list): - """Returns multiple sets of trial (hyper-)parameters, as iterable of serializable objects. - Call 'generate_parameters()' by 'count' times by default. - User code must override either this function or 'generate_parameters()'. - parameter_id_list: list of int - """ - return [self.generate_parameters(parameter_id) for parameter_id in parameter_id_list] - - def receive_trial_result(self, parameter_id, parameters, value, trial_job_id): - """Invoked when a trial reports its final result. Must override. - parameter_id: identifier of the parameter (int) - parameters: object created by 'generate_parameters()' - value: object reported by trial - trial_job_id: identifier of the trial (str) - """ - raise NotImplementedError('Tuner: receive_trial_result not implemented') - - def receive_customized_trial_result(self, parameter_id, parameters, value, trial_job_id): - """Invoked when a trial added by WebUI reports its final result. Do nothing by default. - parameter_id: identifier of the parameter (int) - parameters: object created by user - value: object reported by trial - trial_job_id: identifier of the trial (str) - """ - _logger.info('Customized trial job %s ignored by tuner', parameter_id) - - def trial_end(self, parameter_id, success, trial_job_id): - """Invoked when a trial is completed or terminated. Do nothing by default. - parameter_id: identifier of the parameter (int) - success: True if the trial successfully completed; False if failed or terminated - trial_job_id: identifier of the trial (str) - """ - pass - - def update_search_space(self, search_space): - """Update the search space of tuner. Must override. - search_space: JSON object - """ - raise NotImplementedError('Tuner: update_search_space not implemented') - - def import_data(self, data): - """Import additional data for tuning - data: a list of dictionarys, each of which has at least two keys, 'parameter' and 'value' - """ - pass - - def load_checkpoint(self): - """Load the checkpoint of tuner. - path: checkpoint directory for tuner - """ - checkpoin_path = self.get_checkpoint_path() - _logger.info('Load checkpoint ignored by tuner, checkpoint path: %s' % checkpoin_path) - - def save_checkpoint(self): - """Save the checkpoint of tuner. - path: checkpoint directory for tuner - """ - checkpoin_path = self.get_checkpoint_path() - _logger.info('Save checkpoint ignored by tuner, checkpoint path: %s' % checkpoin_path) - - def _on_exit(self): - pass - - def _on_error(self): - pass - - def import_data(self, data): - pass diff --git a/src/sdk/pynni/nni/networkmorphism_tuner/networkmorphism_tuner.py b/src/sdk/pynni/nni/networkmorphism_tuner/networkmorphism_tuner.py index 00d27b29f2..4a51fa5c22 100644 --- a/src/sdk/pynni/nni/networkmorphism_tuner/networkmorphism_tuner.py +++ b/src/sdk/pynni/nni/networkmorphism_tuner/networkmorphism_tuner.py @@ -123,7 +123,7 @@ def update_search_space(self, search_space): """ self.search_space = search_space - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): """ Returns a set of trial neural architecture, as a serializable object. @@ -152,7 +152,7 @@ def generate_parameters(self, parameter_id): return json_out - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): """ Record an observation of the objective function. Parameters diff --git a/src/sdk/pynni/nni/smac_tuner/smac_tuner.py b/src/sdk/pynni/nni/smac_tuner/smac_tuner.py index 5a334973fc..d3fe829516 100644 --- a/src/sdk/pynni/nni/smac_tuner/smac_tuner.py +++ b/src/sdk/pynni/nni/smac_tuner/smac_tuner.py @@ -151,7 +151,7 @@ def update_search_space(self, search_space): else: self.logger.warning('update search space is not supported.') - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): """receive_trial_result Parameters @@ -209,7 +209,7 @@ def convert_loguniform_categorical(self, challenger_dict): converted_dict[key] = value return converted_dict - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): """generate one instance of hyperparameters Parameters @@ -232,7 +232,7 @@ def generate_parameters(self, parameter_id): self.total_data[parameter_id] = challenger return self.convert_loguniform_categorical(challenger.get_dictionary()) - def generate_multiple_parameters(self, parameter_id_list): + def generate_multiple_parameters(self, parameter_id_list, **kwargs): """generate mutiple instances of hyperparameters Parameters diff --git a/src/sdk/pynni/nni/tuner.py b/src/sdk/pynni/nni/tuner.py index c2e1c59d01..0d995f94c5 100644 --- a/src/sdk/pynni/nni/tuner.py +++ b/src/sdk/pynni/nni/tuner.py @@ -30,14 +30,14 @@ class Tuner(Recoverable): # pylint: disable=no-self-use,unused-argument - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): """Returns a set of trial (hyper-)parameters, as a serializable object. User code must override either this function or 'generate_multiple_parameters()'. parameter_id: int """ raise NotImplementedError('Tuner: generate_parameters not implemented') - def generate_multiple_parameters(self, parameter_id_list): + def generate_multiple_parameters(self, parameter_id_list, **kwargs): """Returns multiple sets of trial (hyper-)parameters, as iterable of serializable objects. Call 'generate_parameters()' by 'count' times by default. User code must override either this function or 'generate_parameters()'. @@ -49,13 +49,13 @@ def generate_multiple_parameters(self, parameter_id_list): for parameter_id in parameter_id_list: try: _logger.debug("generating param for {}".format(parameter_id)) - res = self.generate_parameters(parameter_id) + res = self.generate_parameters(parameter_id, **kwargs) except nni.NoMoreTrialError: return result result.append(res) return result - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): """Invoked when a trial reports its final result. Must override. parameter_id: int parameters: object created by 'generate_parameters()' @@ -63,7 +63,7 @@ def receive_trial_result(self, parameter_id, parameters, value): """ raise NotImplementedError('Tuner: receive_trial_result not implemented') - def receive_customized_trial_result(self, parameter_id, parameters, value): + def receive_customized_trial_result(self, parameter_id, parameters, value, **kwargs): """Invoked when a trial added by WebUI reports its final result. Do nothing by default. parameter_id: int parameters: object created by user @@ -71,7 +71,7 @@ def receive_customized_trial_result(self, parameter_id, parameters, value): """ _logger.info('Customized trial job %s ignored by tuner', parameter_id) - def trial_end(self, parameter_id, success): + def trial_end(self, parameter_id, success, **kwargs): """Invoked when a trial is completed or terminated. Do nothing by default. parameter_id: int success: True if the trial successfully completed; False if failed or terminated From c64fba11c509392c00e4ababfe1c61dd68c2c541 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 31 May 2019 18:05:50 +0800 Subject: [PATCH 02/19] updates --- src/sdk/pynni/nni/__main__.py | 10 ++++------ src/sdk/pynni/nni/common.py | 1 + 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/sdk/pynni/nni/__main__.py b/src/sdk/pynni/nni/__main__.py index 46edf8661a..f5783f8797 100644 --- a/src/sdk/pynni/nni/__main__.py +++ b/src/sdk/pynni/nni/__main__.py @@ -28,9 +28,8 @@ import importlib from .constants import ModuleName, ClassName, ClassArgs, AdvisorModuleName, AdvisorClassName -from nni.common import enable_multi_thread +from nni.common import enable_multi_thread, enable_multi_phase from nni.msg_dispatcher import MsgDispatcher -from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher logger = logging.getLogger('nni.main') logger.debug('START') @@ -126,6 +125,8 @@ def main(): args = parse_args() if args.multi_thread: enable_multi_thread() + if args.multi_phase: + enable_multi_phase() if args.advisor_class_name: # advisor is enabled and starts to run @@ -180,10 +181,7 @@ def main(): if assessor is None: raise AssertionError('Failed to create Assessor instance') - if args.multi_phase: - dispatcher = MultiPhaseMsgDispatcher(tuner, assessor) - else: - dispatcher = MsgDispatcher(tuner, assessor) + dispatcher = MsgDispatcher(tuner, assessor) try: dispatcher.run() diff --git a/src/sdk/pynni/nni/common.py b/src/sdk/pynni/nni/common.py index 02565ba062..d9f380145c 100644 --- a/src/sdk/pynni/nni/common.py +++ b/src/sdk/pynni/nni/common.py @@ -69,6 +69,7 @@ def init_logger(logger_file_path, log_level_name='info'): sys.stdout = _LoggerFileWrapper(logger_file) _multi_thread = False +_multi_phase = False def enable_multi_thread(): global _multi_thread From c5db10447696c536ec910ab6235dbcd69fee7430 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 31 May 2019 18:18:28 +0800 Subject: [PATCH 03/19] updates --- src/sdk/pynni/tests/test_multi_phase_tuner.py | 110 ------------------ .../multi_phase/multi_phase.test.yml | 6 +- 2 files changed, 3 insertions(+), 113 deletions(-) delete mode 100644 src/sdk/pynni/tests/test_multi_phase_tuner.py diff --git a/src/sdk/pynni/tests/test_multi_phase_tuner.py b/src/sdk/pynni/tests/test_multi_phase_tuner.py deleted file mode 100644 index 5bf8eda3c5..0000000000 --- a/src/sdk/pynni/tests/test_multi_phase_tuner.py +++ /dev/null @@ -1,110 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and -# associated documentation files (the "Software"), to deal in the Software without restriction, -# including without limitation the rights to use, copy, modify, merge, publish, distribute, -# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all copies or -# substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT -# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT -# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -# ================================================================================================== - - -import logging -import random -from io import BytesIO - -import nni -import nni.protocol -from nni.protocol import CommandType, send, receive -from nni.multi_phase.multi_phase_tuner import MultiPhaseTuner -from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher - -from unittest import TestCase, main - -class NaiveMultiPhaseTuner(MultiPhaseTuner): - ''' - supports only choices - ''' - def __init__(self): - self.search_space = None - - def generate_parameters(self, parameter_id, trial_job_id=None): - """Returns a set of trial (hyper-)parameters, as a serializable object. - User code must override either this function or 'generate_multiple_parameters()'. - parameter_id: int - """ - generated_parameters = {} - if self.search_space is None: - raise AssertionError('Search space not specified') - for k in self.search_space: - param = self.search_space[k] - if not param['_type'] == 'choice': - raise ValueError('Only choice type is supported') - param_values = param['_value'] - generated_parameters[k] = param_values[random.randint(0, len(param_values)-1)] - logging.getLogger(__name__).debug(generated_parameters) - return generated_parameters - - - def receive_trial_result(self, parameter_id, parameters, value, trial_job_id): - logging.getLogger(__name__).debug('receive_trial_result: {},{},{},{}'.format(parameter_id, parameters, value, trial_job_id)) - - def receive_customized_trial_result(self, parameter_id, parameters, value, trial_job_id): - pass - - def update_search_space(self, search_space): - self.search_space = search_space - - -_in_buf = BytesIO() -_out_buf = BytesIO() - -def _reverse_io(): - _in_buf.seek(0) - _out_buf.seek(0) - nni.protocol._out_file = _in_buf - nni.protocol._in_file = _out_buf - -def _restore_io(): - _in_buf.seek(0) - _out_buf.seek(0) - nni.protocol._in_file = _in_buf - nni.protocol._out_file = _out_buf - -def _test_tuner(): - _reverse_io() # now we are sending to Tuner's incoming stream - send(CommandType.UpdateSearchSpace, "{\"learning_rate\": {\"_value\": [0.0001, 0.001, 0.002, 0.005, 0.01], \"_type\": \"choice\"}, \"optimizer\": {\"_value\": [\"Adam\", \"SGD\"], \"_type\": \"choice\"}}") - send(CommandType.RequestTrialJobs, '2') - send(CommandType.ReportMetricData, '{"parameter_id":0,"type":"PERIODICAL","value":10,"trial_job_id":"abc"}') - send(CommandType.ReportMetricData, '{"parameter_id":1,"type":"FINAL","value":11,"trial_job_id":"abc"}') - send(CommandType.AddCustomizedTrialJob, '{"param":-1}') - send(CommandType.ReportMetricData, '{"parameter_id":2,"type":"FINAL","value":22,"trial_job_id":"abc"}') - send(CommandType.RequestTrialJobs, '1') - send(CommandType.TrialEnd, '{"trial_job_id":"abc"}') - _restore_io() - - tuner = NaiveMultiPhaseTuner() - dispatcher = MultiPhaseMsgDispatcher(tuner) - dispatcher.run() - - _reverse_io() # now we are receiving from Tuner's outgoing stream - - command, data = receive() # this one is customized - print(command, data) - -class MultiPhaseTestCase(TestCase): - def test_tuner(self): - _test_tuner() - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/test/config_test/multi_phase/multi_phase.test.yml b/test/config_test/multi_phase/multi_phase.test.yml index 46996a9680..af21870320 100644 --- a/test/config_test/multi_phase/multi_phase.test.yml +++ b/test/config_test/multi_phase/multi_phase.test.yml @@ -6,9 +6,9 @@ trialConcurrency: 4 searchSpacePath: ./search_space.json tuner: - codeDir: ../../../src/sdk/pynni/tests - classFileName: test_multi_phase_tuner.py - className: NaiveMultiPhaseTuner + builtinTunerName: TPE + classArgs: + optimize_mode: maximize trial: codeDir: . From 369c6700744f07b5f07e3d17b3b0b417393a967b Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 14 Jun 2019 20:50:44 +0800 Subject: [PATCH 04/19] Implement multiphase on PAI --- .../common/clusterJobRestServer.ts | 6 +- .../training_service/pai/paiData.ts | 6 +- .../training_service/pai/paiJobRestServer.ts | 46 +++++++++++-- .../pai/paiTrainingService.ts | 69 +++++++++++++++++-- tools/nni_trial_tool/constants.py | 4 +- tools/nni_trial_tool/trial_keeper.py | 21 +++++- tools/nni_trial_tool/url_utils.py | 8 ++- 7 files changed, 141 insertions(+), 19 deletions(-) diff --git a/src/nni_manager/training_service/common/clusterJobRestServer.ts b/src/nni_manager/training_service/common/clusterJobRestServer.ts index 1dc8fef585..e4f6785766 100644 --- a/src/nni_manager/training_service/common/clusterJobRestServer.ts +++ b/src/nni_manager/training_service/common/clusterJobRestServer.ts @@ -56,6 +56,10 @@ export abstract class ClusterJobRestServer extends RestServer{ this.port = basePort + 1; } + get apiRootUrl(): string { + return this.API_ROOT_URL; + } + public get clusterRestServerPort(): number { if(!this.port) { throw new Error('PAI Rest server port is undefined'); @@ -79,7 +83,7 @@ export abstract class ClusterJobRestServer extends RestServer{ this.app.use(this.API_ROOT_URL, this.createRestHandler()); } - private createRestHandler() : Router { + protected createRestHandler() : Router { const router: Router = Router(); // tslint:disable-next-line:typedef diff --git a/src/nni_manager/training_service/pai/paiData.ts b/src/nni_manager/training_service/pai/paiData.ts index 15dc133730..2f06ed24bd 100644 --- a/src/nni_manager/training_service/pai/paiData.ts +++ b/src/nni_manager/training_service/pai/paiData.ts @@ -61,10 +61,10 @@ else fi`; export const PAI_TRIAL_COMMAND_FORMAT: string = -`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} +`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} && cd $NNI_SYS_DIR && sh install_nni.sh -&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}' ---pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1' --nni_manager_version '{12}' --log_collection '{13}'`; +&& python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' +--pai_hdfs_output_dir '{9}' --pai_hdfs_host '{10}' --pai_user_name {11} --nni_hdfs_exp_dir '{12}' --webhdfs_path '/webhdfs/api/v1' --nni_manager_version '{13}' --log_collection '{13}'`; export const PAI_OUTPUT_DIR_FORMAT: string = `hdfs://{0}:9000/`; diff --git a/src/nni_manager/training_service/pai/paiJobRestServer.ts b/src/nni_manager/training_service/pai/paiJobRestServer.ts index c551e1f575..76ac01cac9 100644 --- a/src/nni_manager/training_service/pai/paiJobRestServer.ts +++ b/src/nni_manager/training_service/pai/paiJobRestServer.ts @@ -19,17 +19,26 @@ 'use strict'; -import * as component from '../../common/component'; +import { Request, Response, Router } from 'express'; import { Inject } from 'typescript-ioc'; +import * as component from '../../common/component'; +import { ClusterJobRestServer } from '../common/clusterJobRestServer'; import { PAITrainingService } from './paiTrainingService'; -import { ClusterJobRestServer } from '../common/clusterJobRestServer' + +export interface ParameterFileMeta { + readonly experimentId: string; + readonly trialId: string; + readonly filePath: string; +} /** * PAI Training service Rest server, provides rest API to support pai job metrics update * */ @component.Singleton -export class PAIJobRestServer extends ClusterJobRestServer{ +export class PAIJobRestServer extends ClusterJobRestServer { + private parameterFileMetaList: ParameterFileMeta[] = []; + @Inject private readonly paiTrainingService : PAITrainingService; @@ -51,4 +60,33 @@ export class PAIJobRestServer extends ClusterJobRestServer{ }); } } -} \ No newline at end of file + + protected createRestHandler(): Router { + const router: Router = super.createRestHandler(); + + router.post(`/parameter-file-meta`, (req: Request, res: Response) => { + try { + this.log.info(`POST /parameter-file-meta, body is ${JSON.stringify(req.body)}`); + this.parameterFileMetaList.push(req.body); + res.send(); + } catch (err) { + this.log.error(`POST parameter-file-meta error: ${err}`); + res.status(500); + res.send(err.message); + } + }); + + router.get(`/parameter-file-meta`, (req: Request, res: Response) => { + try { + this.log.info(`GET /parameter-file-meta`); + res.send(this.parameterFileMetaList); + } catch (err) { + this.log.error(`GET parameter-file-meta error: ${err}`); + res.status(500); + res.send(err.message); + } + }); + + return router; + } +} diff --git a/src/nni_manager/training_service/pai/paiTrainingService.ts b/src/nni_manager/training_service/pai/paiTrainingService.ts index 9e3a681610..91b9882448 100644 --- a/src/nni_manager/training_service/pai/paiTrainingService.ts +++ b/src/nni_manager/training_service/pai/paiTrainingService.ts @@ -33,7 +33,7 @@ import { MethodNotImplementedError } from '../../common/errors'; import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo'; import { getLogger, Logger } from '../../common/log'; import { - JobApplicationForm, NNIManagerIpConfig, TrainingService, + HyperParameters, JobApplicationForm, NNIManagerIpConfig, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric } from '../../common/trainingService'; import { delay, generateParamFileName, @@ -46,7 +46,7 @@ import { HDFSClientUtility } from './hdfsClientUtility'; import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig'; import { PAI_LOG_PATH_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData'; import { PAIJobInfoCollector } from './paiJobInfoCollector'; -import { PAIJobRestServer } from './paiJobRestServer'; +import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer'; const WebHDFS = require('webhdfs'); @@ -79,6 +79,7 @@ class PAITrainingService implements TrainingService { private copyExpCodeDirPromise?: Promise; private versionCheck: boolean = true; private logCollection: string; + private isMultiPhase: boolean = false; constructor() { this.log = getLogger(); @@ -179,12 +180,22 @@ class PAITrainingService implements TrainingService { return deferred.promise; } - public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { - throw new MethodNotImplementedError(); + public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise { + const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId); + if (trialJobDetail === undefined) { + throw new Error(`updateTrialJob failed: ${trialJobId} not found`); + } + if (form.jobType === 'TRIAL') { + await this.writeParameterFile(trialJobId, (form).hyperParameters); + } else { + throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`); + } + + return trialJobDetail; } public get isMultiPhaseJobSupported(): boolean { - return false; + return true; } public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise { @@ -331,6 +342,9 @@ class PAITrainingService implements TrainingService { case TrialConfigMetadataKey.LOG_COLLECTION: this.logCollection = value; break; + case TrialConfigMetadataKey.MULTI_PHASE: + this.isMultiPhase = (value === 'true' || value === 'True'); + break; default: //Reject for unknown keys throw new Error(`Uknown key: ${key}`); @@ -437,6 +451,7 @@ class PAITrainingService implements TrainingService { trialJobId, this.experimentId, trialJobDetail.sequenceId, + this.isMultiPhase, this.paiTrialConfig.command, nniManagerIp, this.paiRestServerPort, @@ -618,6 +633,50 @@ class PAITrainingService implements TrainingService { return Promise.race([timeoutDelay, deferred.promise]) .finally(() => clearTimeout(timeoutId)); } + + private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise { + if (this.paiClusterConfig === undefined) { + throw new Error(''); + } + if (this.paiTrialConfig === undefined) { + throw new Error(''); + } + + const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId); + const hpFileName: string = generateParamFileName(hyperParameters); + const localFilepath: string = path.join(trialLocalTempFolder, hpFileName); + await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' }); + const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId); + const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName); + + await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient); + + await this.postParameterFileMeta({ + experimentId: this.experimentId, + trialId: trialJobId, + filePath: hdfsHpFilePath + }); + } + + private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise { + const deferred : Deferred = new Deferred(); + const restServer: PAIJobRestServer = component.get(PAIJobRestServer); + const req: request.Options = { + uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`, + method: 'POST', + json: true, + body: parameterFileMeta + }; + request(req, (err: Error, res: request.Response) => { + if (err) { + deferred.reject(err); + } else { + deferred.resolve(); + } + }); + + return deferred.promise; + } } export { PAITrainingService }; diff --git a/tools/nni_trial_tool/constants.py b/tools/nni_trial_tool/constants.py index a55cc19abd..0013ee9984 100644 --- a/tools/nni_trial_tool/constants.py +++ b/tools/nni_trial_tool/constants.py @@ -36,6 +36,8 @@ STDOUT_API = '/stdout' VERSION_API = '/version' +PARAMETER_META_API = '/parameter-meta' NNI_SYS_DIR = os.environ['NNI_SYS_DIR'] NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID'] -NNI_EXP_ID = os.environ['NNI_EXP_ID'] \ No newline at end of file +NNI_EXP_ID = os.environ['NNI_EXP_ID'] +MULTI_PHASE = os.environ['MULTI_PHASE'] diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index 6a487758d5..d47ea67c44 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -28,12 +28,13 @@ import sys import select import json +import threading from pyhdfs import HdfsClient import pkg_resources -from .rest_utils import rest_post -from .url_utils import gen_send_stdout_url, gen_send_version_url +from .rest_utils import rest_post, rest_get +from .url_utils import gen_send_stdout_url, gen_send_version_url, gen_parameter_meta_url -from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH +from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, MULTI_PHASE from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType @@ -138,6 +139,18 @@ def check_version(args): except AttributeError as err: nni_log(LogType.Error, err) +def is_multi_phase(): + return MULTI_PHASE and (MULTI_PHASE in ['True', 'true']) + +def fetch_parameter_file(args): + class FetchThread(threading.Thread): + def run(self): + meta_list = rest_get(gen_parameter_meta_url(args.nnimanager_ip, args.nnimanager_port), 10) + print(meta_list) + + fetch_file_thread = FetchThread() + fetch_file_thread.start() + if __name__ == '__main__': '''NNI Trial Keeper main function''' PARSER = argparse.ArgumentParser() @@ -159,6 +172,8 @@ def check_version(args): exit(1) check_version(args) try: + if is_multi_phase(): + fetch_parameter_file(args) main_loop(args) except SystemExit as se: nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code)) diff --git a/tools/nni_trial_tool/url_utils.py b/tools/nni_trial_tool/url_utils.py index 94c37c0b03..988b330c02 100644 --- a/tools/nni_trial_tool/url_utils.py +++ b/tools/nni_trial_tool/url_utils.py @@ -18,7 +18,7 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -from .constants import API_ROOT_URL, BASE_URL, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID, VERSION_API +from .constants import API_ROOT_URL, BASE_URL, STDOUT_API, NNI_TRIAL_JOB_ID, NNI_EXP_ID, VERSION_API, PARAMETER_META_API def gen_send_stdout_url(ip, port): '''Generate send stdout url''' @@ -26,4 +26,8 @@ def gen_send_stdout_url(ip, port): def gen_send_version_url(ip, port): '''Generate send error url''' - return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, VERSION_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID) \ No newline at end of file + return '{0}:{1}{2}{3}/{4}/{5}'.format(BASE_URL.format(ip), port, API_ROOT_URL, VERSION_API, NNI_EXP_ID, NNI_TRIAL_JOB_ID) + +def gen_parameter_meta_url(ip, port): + '''Generate send error url''' + return '{0}:{1}{2}{3}'.format(BASE_URL.format(ip), port, API_ROOT_URL, PARAMETER_META_API) From b47dc4cb8a7d938880bd0d2f3c653744e393746a Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 20 Jun 2019 17:40:13 +0800 Subject: [PATCH 05/19] updates --- tools/nni_trial_tool/trial_keeper.py | 91 +++++++++++++++++++++------- 1 file changed, 68 insertions(+), 23 deletions(-) diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index d47ea67c44..db156f4b9c 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -34,13 +34,42 @@ from .rest_utils import rest_post, rest_get from .url_utils import gen_send_stdout_url, gen_send_version_url, gen_parameter_meta_url -from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, MULTI_PHASE -from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal +from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, MULTI_PHASE, NNI_TRIAL_JOB_ID +from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType logger = logging.getLogger('trial_keeper') regular = re.compile('v?(?P[0-9](\.[0-9]){0,1}).*') +_hdfs_client = None + +def get_hdfs_client(args): + global _hdfs_client + + if _hdfs_client is not None: + return _hdfs_client + # backward compatibility + hdfs_host = None + hdfs_output_dir = None + if args.hdfs_host: + hdfs_host = args.hdfs_host + elif args.pai_hdfs_host: + hdfs_host = args.pai_hdfs_host + else: + return None + + if hdfs_host is not None and args.nni_hdfs_exp_dir is not None: + try: + if args.webhdfs_path: + _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5) + else: + # backward compatibility + _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) + except Exception as e: + nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) + raise e + return _hdfs_client + def main_loop(args): '''main loop logic for trial keeper''' @@ -53,28 +82,14 @@ def main_loop(args): # redirect trial keeper's stdout and stderr to syslog trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection) sys.stdout = sys.stderr = trial_keeper_syslogger - # backward compatibility - hdfs_host = None - hdfs_output_dir = None - if args.hdfs_host: - hdfs_host = args.hdfs_host - elif args.pai_hdfs_host: - hdfs_host = args.pai_hdfs_host if args.hdfs_output_dir: hdfs_output_dir = args.hdfs_output_dir elif args.pai_hdfs_output_dir: hdfs_output_dir = args.pai_hdfs_output_dir - if hdfs_host is not None and args.nni_hdfs_exp_dir is not None: - try: - if args.webhdfs_path: - hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5) - else: - # backward compatibility - hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) - except Exception as e: - nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) - raise e + hdfs_client = get_hdfs_client(args) + + if hdfs_client is not None: copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client) # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior @@ -142,13 +157,43 @@ def check_version(args): def is_multi_phase(): return MULTI_PHASE and (MULTI_PHASE in ['True', 'true']) +def download_parameter(meta_list, args): + """ + Download parameter file to local working directory. + meta_list format is defined in paiJobRestServer.ts + example meta_list: + [ + {"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"}, + {"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"} + ] + """ + print(os.listdir('.')) + print('trial id:', NNI_TRIAL_JOB_ID) + print('meta_list:', meta_list) + for meta in meta_list: + if meta['trialId'] == NNI_TRIAL_JOB_ID: + fn = os.path.basename(meta) + if not os.path.exists(fn): + hdfs_client = get_hdfs_client(args) + copyHdfsFileToLocal(meta['filePath'], fn, hdfs_client, override=False) + def fetch_parameter_file(args): class FetchThread(threading.Thread): + def __init__(self, args): + super(FetchThread, self).__init__() + self.args = args def run(self): - meta_list = rest_get(gen_parameter_meta_url(args.nnimanager_ip, args.nnimanager_port), 10) - print(meta_list) - - fetch_file_thread = FetchThread() + while True: + uri = gen_parameter_meta_url(args.nnimanager_ip, args.nnimanager_port) + print('uri:', uri) + res = rest_get(uri, 10) + print('status code:', res.status_code) + if res.status_code == 200: + meta_list = res.json() + download_parameter(meta_list, self.args) + time.sleep(5) + + fetch_file_thread = FetchThread(args) fetch_file_thread.start() if __name__ == '__main__': From e503824b1b64a81aeac0a4ec3a581d89a82fac86 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 20 Jun 2019 17:49:07 +0800 Subject: [PATCH 06/19] updates --- tools/nni_trial_tool/trial_keeper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index db156f4b9c..f375676835 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -172,7 +172,7 @@ def download_parameter(meta_list, args): print('meta_list:', meta_list) for meta in meta_list: if meta['trialId'] == NNI_TRIAL_JOB_ID: - fn = os.path.basename(meta) + fn = os.path.basename(meta['filePath']) if not os.path.exists(fn): hdfs_client = get_hdfs_client(args) copyHdfsFileToLocal(meta['filePath'], fn, hdfs_client, override=False) From 73ac91ec6db0128734afde003aa611e5ef51f3db Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 20 Jun 2019 17:58:28 +0800 Subject: [PATCH 07/19] updates --- tools/nni_trial_tool/trial_keeper.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index f375676835..81244d373c 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -34,7 +34,7 @@ from .rest_utils import rest_post, rest_get from .url_utils import gen_send_stdout_url, gen_send_version_url, gen_parameter_meta_url -from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, MULTI_PHASE, NNI_TRIAL_JOB_ID +from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType @@ -167,15 +167,16 @@ def download_parameter(meta_list, args): {"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"} ] """ - print(os.listdir('.')) + print('sysdir:', NNI_SYS_DIR) + print(os.listdir(NNI_SYS_DIR)) print('trial id:', NNI_TRIAL_JOB_ID) print('meta_list:', meta_list) for meta in meta_list: if meta['trialId'] == NNI_TRIAL_JOB_ID: - fn = os.path.basename(meta['filePath']) - if not os.path.exists(fn): + param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath'])) + if not os.path.exists(param_fp): hdfs_client = get_hdfs_client(args) - copyHdfsFileToLocal(meta['filePath'], fn, hdfs_client, override=False) + copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False) def fetch_parameter_file(args): class FetchThread(threading.Thread): From 505c0af860f5c2f108d4d73f40acd7a29ff14796 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 20 Jun 2019 18:14:39 +0800 Subject: [PATCH 08/19] updates --- tools/nni_trial_tool/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/nni_trial_tool/constants.py b/tools/nni_trial_tool/constants.py index 0013ee9984..2dc25a8678 100644 --- a/tools/nni_trial_tool/constants.py +++ b/tools/nni_trial_tool/constants.py @@ -36,7 +36,7 @@ STDOUT_API = '/stdout' VERSION_API = '/version' -PARAMETER_META_API = '/parameter-meta' +PARAMETER_META_API = '/parameter-file-meta' NNI_SYS_DIR = os.environ['NNI_SYS_DIR'] NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID'] NNI_EXP_ID = os.environ['NNI_EXP_ID'] From a9d57c2a9731736434c726414ccc7daeb10e48d2 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 20 Jun 2019 18:36:04 +0800 Subject: [PATCH 09/19] updates --- tools/nni_trial_tool/trial_keeper.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index 81244d373c..bd8d2467ee 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -34,7 +34,8 @@ from .rest_utils import rest_post, rest_get from .url_utils import gen_send_stdout_url, gen_send_version_url, gen_parameter_meta_url -from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR +from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, \ + MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR, NNI_EXP_ID from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType @@ -167,12 +168,11 @@ def download_parameter(meta_list, args): {"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"} ] """ - print('sysdir:', NNI_SYS_DIR) - print(os.listdir(NNI_SYS_DIR)) - print('trial id:', NNI_TRIAL_JOB_ID) - print('meta_list:', meta_list) + nni_log(LogType.Debug, str(meta_list)) + nni_log(LogType.Debug, 'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID)) + nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR))) for meta in meta_list: - if meta['trialId'] == NNI_TRIAL_JOB_ID: + if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID: param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath'])) if not os.path.exists(param_fp): hdfs_client = get_hdfs_client(args) @@ -183,12 +183,14 @@ class FetchThread(threading.Thread): def __init__(self, args): super(FetchThread, self).__init__() self.args = args + def run(self): + uri = gen_parameter_meta_url(args.nnimanager_ip, args.nnimanager_port) + nni_log(LogType.Info, uri) + while True: - uri = gen_parameter_meta_url(args.nnimanager_ip, args.nnimanager_port) - print('uri:', uri) res = rest_get(uri, 10) - print('status code:', res.status_code) + nni_log(LogType.Debug, 'status code: {}'.format(res.status_code)) if res.status_code == 200: meta_list = res.json() download_parameter(meta_list, self.args) From 1245091675d48db04059ae494738b442537877a3 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Thu, 20 Jun 2019 18:39:09 +0800 Subject: [PATCH 10/19] updates --- tools/nni_trial_tool/trial_keeper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index bd8d2467ee..5d16bdd0f2 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -185,7 +185,7 @@ def __init__(self, args): self.args = args def run(self): - uri = gen_parameter_meta_url(args.nnimanager_ip, args.nnimanager_port) + uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port) nni_log(LogType.Info, uri) while True: From ba36798949289c954e2f0fc9a3242755a2ae73c1 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 21 Jun 2019 11:34:07 +0800 Subject: [PATCH 11/19] add test cases --- .../multi_phase/multi_phase_batch.test.yml | 20 +++++++++++++++++ .../multi_phase_evolution.test.yml | 22 +++++++++++++++++++ .../multi_phase/multi_phase_grid.test.yml | 20 +++++++++++++++++ .../multi_phase/multi_phase_metis.test.yml | 22 +++++++++++++++++++ .../multi_phase/multi_phase_tpe.test.yml | 22 +++++++++++++++++++ 5 files changed, 106 insertions(+) create mode 100644 test/config_test/multi_phase/multi_phase_batch.test.yml create mode 100644 test/config_test/multi_phase/multi_phase_evolution.test.yml create mode 100644 test/config_test/multi_phase/multi_phase_grid.test.yml create mode 100644 test/config_test/multi_phase/multi_phase_metis.test.yml create mode 100644 test/config_test/multi_phase/multi_phase_tpe.test.yml diff --git a/test/config_test/multi_phase/multi_phase_batch.test.yml b/test/config_test/multi_phase/multi_phase_batch.test.yml new file mode 100644 index 0000000000..089cec9c04 --- /dev/null +++ b/test/config_test/multi_phase/multi_phase_batch.test.yml @@ -0,0 +1,20 @@ +authorName: nni +experimentName: default_test +maxExecDuration: 5m +maxTrialNum: 8 +trialConcurrency: 4 +searchSpacePath: ./search_space.json + +tuner: + builtinTunerName: BatchTuner + +trial: + codeDir: . + command: python3 multi_phase.py + gpuNum: 0 + +useAnnotation: false +multiPhase: true +multiThread: false + +trainingServicePlatform: local diff --git a/test/config_test/multi_phase/multi_phase_evolution.test.yml b/test/config_test/multi_phase/multi_phase_evolution.test.yml new file mode 100644 index 0000000000..bc06b8a256 --- /dev/null +++ b/test/config_test/multi_phase/multi_phase_evolution.test.yml @@ -0,0 +1,22 @@ +authorName: nni +experimentName: default_test +maxExecDuration: 5m +maxTrialNum: 8 +trialConcurrency: 4 +searchSpacePath: ./search_space.json + +tuner: + builtinTunerName: Evolution + classArgs: + optimize_mode: maximize + +trial: + codeDir: . + command: python3 multi_phase.py + gpuNum: 0 + +useAnnotation: false +multiPhase: true +multiThread: false + +trainingServicePlatform: local diff --git a/test/config_test/multi_phase/multi_phase_grid.test.yml b/test/config_test/multi_phase/multi_phase_grid.test.yml new file mode 100644 index 0000000000..793224e40e --- /dev/null +++ b/test/config_test/multi_phase/multi_phase_grid.test.yml @@ -0,0 +1,20 @@ +authorName: nni +experimentName: default_test +maxExecDuration: 5m +maxTrialNum: 8 +trialConcurrency: 4 +searchSpacePath: ./search_space.json + +tuner: + builtinTunerName: GridSearch + +trial: + codeDir: . + command: python3 multi_phase.py + gpuNum: 0 + +useAnnotation: false +multiPhase: true +multiThread: false + +trainingServicePlatform: local diff --git a/test/config_test/multi_phase/multi_phase_metis.test.yml b/test/config_test/multi_phase/multi_phase_metis.test.yml new file mode 100644 index 0000000000..16b0c8a07a --- /dev/null +++ b/test/config_test/multi_phase/multi_phase_metis.test.yml @@ -0,0 +1,22 @@ +authorName: nni +experimentName: default_test +maxExecDuration: 5m +maxTrialNum: 8 +trialConcurrency: 4 +searchSpacePath: ./search_space.json + +tuner: + builtinTunerName: MetisTuner + classArgs: + optimize_mode: maximize + +trial: + codeDir: . + command: python3 multi_phase.py + gpuNum: 0 + +useAnnotation: false +multiPhase: true +multiThread: false + +trainingServicePlatform: local diff --git a/test/config_test/multi_phase/multi_phase_tpe.test.yml b/test/config_test/multi_phase/multi_phase_tpe.test.yml new file mode 100644 index 0000000000..af21870320 --- /dev/null +++ b/test/config_test/multi_phase/multi_phase_tpe.test.yml @@ -0,0 +1,22 @@ +authorName: nni +experimentName: default_test +maxExecDuration: 5m +maxTrialNum: 8 +trialConcurrency: 4 +searchSpacePath: ./search_space.json + +tuner: + builtinTunerName: TPE + classArgs: + optimize_mode: maximize + +trial: + codeDir: . + command: python3 multi_phase.py + gpuNum: 0 + +useAnnotation: false +multiPhase: true +multiThread: false + +trainingServicePlatform: local From bf9d8f810aef7bd0f4cd6caf30990d6c4b6abaa0 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 21 Jun 2019 14:11:38 +0800 Subject: [PATCH 12/19] updates --- src/sdk/pynni/tests/test_multi_phase_tuner.py | 110 ------------------ 1 file changed, 110 deletions(-) delete mode 100644 src/sdk/pynni/tests/test_multi_phase_tuner.py diff --git a/src/sdk/pynni/tests/test_multi_phase_tuner.py b/src/sdk/pynni/tests/test_multi_phase_tuner.py deleted file mode 100644 index aef611402c..0000000000 --- a/src/sdk/pynni/tests/test_multi_phase_tuner.py +++ /dev/null @@ -1,110 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# -# MIT License -# -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and -# associated documentation files (the "Software"), to deal in the Software without restriction, -# including without limitation the rights to use, copy, modify, merge, publish, distribute, -# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all copies or -# substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT -# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT -# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -# ================================================================================================== - - -import logging -import random -from io import BytesIO - -import nni -import nni.protocol -from nni.protocol import CommandType, send, receive -from nni.multi_phase.multi_phase_tuner import MultiPhaseTuner -from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher - -from unittest import TestCase, main - -class NaiveMultiPhaseTuner(MultiPhaseTuner): - ''' - supports only choices - ''' - def __init__(self): - self.search_space = None - - def generate_parameters(self, parameter_id, trial_job_id=None): - """Returns a set of trial (hyper-)parameters, as a serializable object. - User code must override either this function or 'generate_multiple_parameters()'. - parameter_id: int - """ - generated_parameters = {} - if self.search_space is None: - raise AssertionError('Search space not specified') - for k in self.search_space: - param = self.search_space[k] - if not param['_type'] == 'choice': - raise ValueError('Only choice type is supported') - param_values = param['_value'] - generated_parameters[k] = param_values[random.randint(0, len(param_values)-1)] - logging.getLogger(__name__).debug(generated_parameters) - return generated_parameters - - - def receive_trial_result(self, parameter_id, parameters, value, trial_job_id): - logging.getLogger(__name__).debug('receive_trial_result: {},{},{},{}'.format(parameter_id, parameters, value, trial_job_id)) - - def receive_customized_trial_result(self, parameter_id, parameters, value, trial_job_id): - pass - - def update_search_space(self, search_space): - self.search_space = search_space - - -_in_buf = BytesIO() -_out_buf = BytesIO() - -def _reverse_io(): - _in_buf.seek(0) - _out_buf.seek(0) - nni.protocol._out_file = _in_buf - nni.protocol._in_file = _out_buf - -def _restore_io(): - _in_buf.seek(0) - _out_buf.seek(0) - nni.protocol._in_file = _in_buf - nni.protocol._out_file = _out_buf - -def _test_tuner(): - _reverse_io() # now we are sending to Tuner's incoming stream - send(CommandType.UpdateSearchSpace, "{\"learning_rate\": {\"_value\": [0.0001, 0.001, 0.002, 0.005, 0.01], \"_type\": \"choice\"}, \"optimizer\": {\"_value\": [\"Adam\", \"SGD\"], \"_type\": \"choice\"}}") - send(CommandType.RequestTrialJobs, '2') - send(CommandType.ReportMetricData, '{"parameter_id":0,"type":"PERIODICAL","value":10,"trial_job_id":"abc"}') - send(CommandType.ReportMetricData, '{"parameter_id":1,"type":"FINAL","value":11,"trial_job_id":"abc"}') - send(CommandType.AddCustomizedTrialJob, '{"param":-1}') - send(CommandType.ReportMetricData, '{"parameter_id":2,"type":"FINAL","value":22,"trial_job_id":"abc"}') - send(CommandType.RequestTrialJobs, '1') - send(CommandType.TrialEnd, '{"trial_job_id":"abc"}') - _restore_io() - - tuner = NaiveMultiPhaseTuner() - dispatcher = MultiPhaseMsgDispatcher(tuner) - dispatcher.run() - - _reverse_io() # now we are receiving from Tuner's outgoing stream - - command, data = receive() # this one is customized - print(command, data) - -class MultiPhaseTestCase(TestCase): - def test_tuner(self): - _test_tuner() - -if __name__ == '__main__': - main() \ No newline at end of file From 172d66864fe58e95dbc5c68a2c3f1e2ad0722305 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 21 Jun 2019 14:35:37 +0800 Subject: [PATCH 13/19] updates --- src/sdk/pynni/tests/test_tuner.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/sdk/pynni/tests/test_tuner.py b/src/sdk/pynni/tests/test_tuner.py index 7b44d60695..d27bd73247 100644 --- a/src/sdk/pynni/tests/test_tuner.py +++ b/src/sdk/pynni/tests/test_tuner.py @@ -103,11 +103,9 @@ def test_tuner(self): command, data = receive() # this one is customized data = json.loads(data) self.assertIs(command, CommandType.NewTrialJob) - self.assertEqual(data, { - 'parameter_id': 2, - 'parameter_source': 'customized', - 'parameters': { 'param': -1 } - }) + self.assertEqual(data['parameter_id'], 2) + self.assertEqual(data['parameter_source'], 'customized') + self.assertEqual(data['parameters'], { 'param': -1 }) self._assert_params(3, 6, [[1,4,11,False], [2,-1,22,True]], {'name':'SS0'}) From 2a6cecfe9f78349affc53ff9e65ebbf29b700a08 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 21 Jun 2019 15:49:04 +0800 Subject: [PATCH 14/19] update multiphase doc --- docs/en_US/MultiPhase.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/en_US/MultiPhase.md b/docs/en_US/MultiPhase.md index 6859144a0f..5c73fc6453 100644 --- a/docs/en_US/MultiPhase.md +++ b/docs/en_US/MultiPhase.md @@ -38,7 +38,15 @@ To enable multi-phase, you should also add `multiPhase: true` in your experiment ### Write a tuner that leverages multi-phase: -Before writing a multi-phase tuner, we highly suggest you to go through [Customize Tuner](https://nni.readthedocs.io/en/latest/Customize_Tuner.html). Different from writing a normal tuner, your tuner needs to inherit from `MultiPhaseTuner` (in nni.multi_phase_tuner). The key difference between `Tuner` and `MultiPhaseTuner` is that the methods in MultiPhaseTuner are aware of additional information, that is, `trial_job_id`. With this information, the tuner could know which trial is requesting a configuration, and which trial is reporting results. This information provides enough flexibility for your tuner to deal with different trials and different phases. For example, you may want to use the trial_job_id parameter of generate_parameters method to generate hyperparameters for a specific trial job. +Before writing a multi-phase tuner, we highly suggest you to go through [Customize Tuner](https://nni.readthedocs.io/en/latest/Customize_Tuner.html). Same as writing a normal tuner, your tuner needs to inherit from `Tuner` class. When you enable multi-phase through configuration (set `multiPhase` to true), your tuner will get an additional parameter `trial_job_id` via tuner's following methods: +``` +generate_parameters +generate_multiple_parameters +receive_trial_result +receive_customized_trial_result +trial_end +``` +With this information, the tuner could know which trial is requesting a configuration, and which trial is reporting results. This information provides enough flexibility for your tuner to deal with different trials and different phases. For example, you may want to use the trial_job_id parameter of generate_parameters method to generate hyperparameters for a specific trial job. Of course, to use your multi-phase tuner, __you should add `multiPhase: true` in your experiment YAML configure file__. From 4fea76aa2a7f5d73b2917aedf15b18e2c1dc0a52 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Fri, 21 Jun 2019 17:15:27 +0800 Subject: [PATCH 15/19] updates --- src/nni_manager/training_service/pai/paiTrainingService.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/nni_manager/training_service/pai/paiTrainingService.ts b/src/nni_manager/training_service/pai/paiTrainingService.ts index dc7d4dcd8c..ad11cbfc98 100644 --- a/src/nni_manager/training_service/pai/paiTrainingService.ts +++ b/src/nni_manager/training_service/pai/paiTrainingService.ts @@ -650,10 +650,10 @@ class PAITrainingService implements TrainingService { private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise { if (this.paiClusterConfig === undefined) { - throw new Error(''); + throw new Error('PAI Cluster config is not initialized'); } if (this.paiTrialConfig === undefined) { - throw new Error(''); + throw new Error('PAI trial config is not initialized'); } const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId); From 508e0e58a1fe56f5299b2d53346ff4d3ea3dc9da Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Mon, 24 Jun 2019 12:15:03 +0800 Subject: [PATCH 16/19] updates --- src/nni_manager/training_service/pai/paiData.ts | 2 +- tools/nni_trial_tool/trial_keeper.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/nni_manager/training_service/pai/paiData.ts b/src/nni_manager/training_service/pai/paiData.ts index 6c3c2d9869..8820f55cbd 100644 --- a/src/nni_manager/training_service/pai/paiData.ts +++ b/src/nni_manager/training_service/pai/paiData.ts @@ -68,7 +68,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string = && cd $NNI_SYS_DIR && sh install_nni.sh \ && python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' \ --pai_hdfs_output_dir '{9}' --pai_hdfs_host '{10}' --pai_user_name {11} --nni_hdfs_exp_dir '{12}' --webhdfs_path '/webhdfs/api/v1' \ ---nni_manager_version '{13}' --log_collection '{13}'`; +--nni_manager_version '{13}' --log_collection '{14}'`; export const PAI_OUTPUT_DIR_FORMAT: string = `hdfs://{0}:9000/`; diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index 5224a63a53..2c4d31139c 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -194,6 +194,8 @@ def run(self): if res.status_code == 200: meta_list = res.json() download_parameter(meta_list, self.args) + else: + nni_log(LogType.Warning, 'rest response: {}'.format(str(res))) time.sleep(5) fetch_file_thread = FetchThread(args) From cbba24e7c3506088ec8f5338599285c75861b9ce Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Mon, 24 Jun 2019 12:54:33 +0800 Subject: [PATCH 17/19] updates --- examples/tuners/ga_customer_tuner/customer_tuner.py | 4 ++-- examples/tuners/random_nas_tuner/random_nas_tuner.py | 4 ++-- .../tuners/weight_sharing/ga_customer_tuner/customer_tuner.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/tuners/ga_customer_tuner/customer_tuner.py b/examples/tuners/ga_customer_tuner/customer_tuner.py index fec2289a49..2620d86499 100644 --- a/examples/tuners/ga_customer_tuner/customer_tuner.py +++ b/examples/tuners/ga_customer_tuner/customer_tuner.py @@ -79,7 +79,7 @@ def __init__(self, optimize_mode, population_size = 32): logger.debug('init population done.') return - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): """Returns a set of trial graph config, as a serializable object. parameter_id : int """ @@ -109,7 +109,7 @@ def generate_parameters(self, parameter_id): return temp - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): ''' Record an observation of the objective function parameter_id : int diff --git a/examples/tuners/random_nas_tuner/random_nas_tuner.py b/examples/tuners/random_nas_tuner/random_nas_tuner.py index 30594e2ca6..f5e500120c 100644 --- a/examples/tuners/random_nas_tuner/random_nas_tuner.py +++ b/examples/tuners/random_nas_tuner/random_nas_tuner.py @@ -49,12 +49,12 @@ def update_search_space(self, search_space): self.searchspace_json = search_space self.random_state = np.random.RandomState() - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): '''generate ''' return random_archi_generator(self.searchspace_json, self.random_state) - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): '''receive ''' pass diff --git a/examples/tuners/weight_sharing/ga_customer_tuner/customer_tuner.py b/examples/tuners/weight_sharing/ga_customer_tuner/customer_tuner.py index 86328859c6..68b110d051 100644 --- a/examples/tuners/weight_sharing/ga_customer_tuner/customer_tuner.py +++ b/examples/tuners/weight_sharing/ga_customer_tuner/customer_tuner.py @@ -112,7 +112,7 @@ def init_population(self, population_size, graph_max_layer, graph_min_layer): population.append(Individual(indiv_id=self.generate_new_id(), graph_cfg=graph_tmp, result=None)) return population - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): """Returns a set of trial graph config, as a serializable object. An example configuration: ```json @@ -196,7 +196,7 @@ def generate_parameters(self, parameter_id): logger.debug("trial {} ready".format(indiv.indiv_id)) return param_json - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): ''' Record an observation of the objective function parameter_id : int From 611b8e9aea9d5552129f01f8eb7adce3dfc7ab01 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Mon, 24 Jun 2019 13:49:43 +0800 Subject: [PATCH 18/19] updates --- docs/en_US/CustomizeTuner.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en_US/CustomizeTuner.md b/docs/en_US/CustomizeTuner.md index 6b5f2b820c..8228b2dd5f 100644 --- a/docs/en_US/CustomizeTuner.md +++ b/docs/en_US/CustomizeTuner.md @@ -31,7 +31,7 @@ class CustomizedTuner(Tuner): def __init__(self, ...): ... - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): ''' Receive trial's final result. parameter_id: int @@ -41,7 +41,7 @@ class CustomizedTuner(Tuner): # your code implements here. ... - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): ''' Returns a set of trial (hyper-)parameters, as a serializable object parameter_id: int @@ -51,7 +51,7 @@ class CustomizedTuner(Tuner): ... ``` -`receive_trial_result` will receive the `parameter_id, parameters, value` as parameters input. Also, Tuner will receive the `value` object are exactly same value that Trial send. +`receive_trial_result` will receive the `parameter_id, parameters, value` as parameters input. Also, Tuner will receive the `value` object are exactly same value that Trial send. If `multiPhase` is set to `true` in the experiment configuration file, an additional `trial_job_id` parameter is passed to `receive_trial_result` and `generate_parameters` through the `**kwargs` parameter. The `your_parameters` return from `generate_parameters` function, will be package as json object by NNI SDK. NNI SDK will unpack json object so the Trial will receive the exact same `your_parameters` from Tuner. @@ -59,7 +59,7 @@ For example: If the you implement the `generate_parameters` like this: ```python -def generate_parameters(self, parameter_id): +def generate_parameters(self, parameter_id, **kwargs): ''' Returns a set of trial (hyper-)parameters, as a serializable object parameter_id: int From b19609cb7bd9c0e29e29d6f470a78504ab54a414 Mon Sep 17 00:00:00 2001 From: Chengmin Chi Date: Mon, 24 Jun 2019 13:57:01 +0800 Subject: [PATCH 19/19] updates --- src/sdk/pynni/tests/test_tuner.py | 4 ++-- test/async_sharing_test/simple_tuner.py | 4 ++-- test/config_test/multi_thread/multi_thread_tuner.py | 4 ++-- test/naive_test/naive_tuner.py | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/sdk/pynni/tests/test_tuner.py b/src/sdk/pynni/tests/test_tuner.py index d27bd73247..41e80cfa6d 100644 --- a/src/sdk/pynni/tests/test_tuner.py +++ b/src/sdk/pynni/tests/test_tuner.py @@ -35,7 +35,7 @@ def __init__(self): self.trial_results = [ ] self.search_space = None - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): # report Tuner's internal states to generated parameters, # so we don't need to pause the main loop self.param += 2 @@ -45,7 +45,7 @@ def generate_parameters(self, parameter_id): 'search_space': self.search_space } - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): reward = extract_scalar_reward(value) self.trial_results.append((parameter_id, parameters['param'], reward, False)) diff --git a/test/async_sharing_test/simple_tuner.py b/test/async_sharing_test/simple_tuner.py index de40ea9117..0252d4819a 100644 --- a/test/async_sharing_test/simple_tuner.py +++ b/test/async_sharing_test/simple_tuner.py @@ -22,7 +22,7 @@ def __init__(self): self.sig_event = Event() self.thread_lock = Lock() - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): if self.f_id is None: self.thread_lock.acquire() self.f_id = parameter_id @@ -50,7 +50,7 @@ def generate_parameters(self, parameter_id): self.thread_lock.release() return self.trial_meta[parameter_id] - def receive_trial_result(self, parameter_id, parameters, reward): + def receive_trial_result(self, parameter_id, parameters, reward, **kwargs): self.thread_lock.acquire() if parameter_id == self.f_id: self.trial_meta[parameter_id]['checksum'] = reward['checksum'] diff --git a/test/config_test/multi_thread/multi_thread_tuner.py b/test/config_test/multi_thread/multi_thread_tuner.py index 77fb3983be..e6db0d39a4 100644 --- a/test/config_test/multi_thread/multi_thread_tuner.py +++ b/test/config_test/multi_thread/multi_thread_tuner.py @@ -6,7 +6,7 @@ class MultiThreadTuner(Tuner): def __init__(self): self.parent_done = False - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): if parameter_id == 0: return {'x': 0} else: @@ -14,7 +14,7 @@ def generate_parameters(self, parameter_id): time.sleep(2) return {'x': 1} - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): if parameter_id == 0: self.parent_done = True diff --git a/test/naive_test/naive_tuner.py b/test/naive_test/naive_tuner.py index 3e5bb73df1..e5692fdcd7 100644 --- a/test/naive_test/naive_tuner.py +++ b/test/naive_test/naive_tuner.py @@ -16,12 +16,12 @@ def __init__(self, optimize_mode): self.cur = 0 _logger.info('init') - def generate_parameters(self, parameter_id): + def generate_parameters(self, parameter_id, **kwargs): self.cur += 1 _logger.info('generate parameters: %s' % self.cur) return { 'x': self.cur } - def receive_trial_result(self, parameter_id, parameters, value): + def receive_trial_result(self, parameter_id, parameters, value, **kwargs): reward = extract_scalar_reward(value) _logger.info('receive trial result: %s, %s, %s' % (parameter_id, parameters, reward)) _result.write('%d %d\n' % (parameters['x'], reward))