Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiphase refactor and support OpenPAI training service. #1138

Merged
merged 24 commits into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/en_US/MultiPhase.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,15 @@ To enable multi-phase, you should also add `multiPhase: true` in your experiment

### Write a tuner that leverages multi-phase:

Before writing a multi-phase tuner, we highly suggest you to go through [Customize Tuner](https://nni.readthedocs.io/en/latest/Customize_Tuner.html). Different from writing a normal tuner, your tuner needs to inherit from `MultiPhaseTuner` (in nni.multi_phase_tuner). The key difference between `Tuner` and `MultiPhaseTuner` is that the methods in MultiPhaseTuner are aware of additional information, that is, `trial_job_id`. With this information, the tuner could know which trial is requesting a configuration, and which trial is reporting results. This information provides enough flexibility for your tuner to deal with different trials and different phases. For example, you may want to use the trial_job_id parameter of generate_parameters method to generate hyperparameters for a specific trial job.
Before writing a multi-phase tuner, we highly suggest you to go through [Customize Tuner](https://nni.readthedocs.io/en/latest/Customize_Tuner.html). Same as writing a normal tuner, your tuner needs to inherit from `Tuner` class. When you enable multi-phase through configuration (set `multiPhase` to true), your tuner will get an additional parameter `trial_job_id` via tuner's following methods:
```
generate_parameters
generate_multiple_parameters
receive_trial_result
receive_customized_trial_result
trial_end
```
With this information, the tuner could know which trial is requesting a configuration, and which trial is reporting results. This information provides enough flexibility for your tuner to deal with different trials and different phases. For example, you may want to use the trial_job_id parameter of generate_parameters method to generate hyperparameters for a specific trial job.

Of course, to use your multi-phase tuner, __you should add `multiPhase: true` in your experiment YAML configure file__.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ export abstract class ClusterJobRestServer extends RestServer {
this.port = basePort + 1;
}

get apiRootUrl(): string {
return this.API_ROOT_URL;
}

public get clusterRestServerPort(): number {
if (this.port === undefined) {
throw new Error('PAI Rest server port is undefined');
Expand Down Expand Up @@ -87,7 +91,7 @@ export abstract class ClusterJobRestServer extends RestServer {
protected abstract handleTrialMetrics(jobId : string, trialMetrics : any[]) : void;

// tslint:disable: no-unsafe-any no-any
private createRestHandler() : Router {
protected createRestHandler() : Router {
const router: Router = Router();

router.use((req: Request, res: Response, next: any) => {
Expand Down
8 changes: 4 additions & 4 deletions src/nni_manager/training_service/pai/paiData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ else
fi`;

export const PAI_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} \
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} \
&& cd $NNI_SYS_DIR && sh install_nni.sh \
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}' \
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1' \
--nni_manager_version '{12}' --log_collection '{13}'`;
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' \
--pai_hdfs_output_dir '{9}' --pai_hdfs_host '{10}' --pai_user_name {11} --nni_hdfs_exp_dir '{12}' --webhdfs_path '/webhdfs/api/v1' \
--nni_manager_version '{13}' --log_collection '{13}'`;
chicm-ms marked this conversation as resolved.
Show resolved Hide resolved

export const PAI_OUTPUT_DIR_FORMAT: string =
`hdfs://{0}:9000/`;
Expand Down
38 changes: 38 additions & 0 deletions src/nni_manager/training_service/pai/paiJobRestServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@

'use strict';

import { Request, Response, Router } from 'express';
import { Inject } from 'typescript-ioc';
import * as component from '../../common/component';
import { ClusterJobRestServer } from '../common/clusterJobRestServer';
import { PAITrainingService } from './paiTrainingService';

export interface ParameterFileMeta {
readonly experimentId: string;
readonly trialId: string;
readonly filePath: string;
}

/**
* PAI Training service Rest server, provides rest API to support pai job metrics update
*
*/
@component.Singleton
export class PAIJobRestServer extends ClusterJobRestServer {
private parameterFileMetaList: ParameterFileMeta[] = [];

@Inject
private readonly paiTrainingService : PAITrainingService;

Expand All @@ -52,4 +61,33 @@ export class PAIJobRestServer extends ClusterJobRestServer {
});
}
}

protected createRestHandler(): Router {
const router: Router = super.createRestHandler();

router.post(`/parameter-file-meta`, (req: Request, res: Response) => {
try {
this.log.info(`POST /parameter-file-meta, body is ${JSON.stringify(req.body)}`);
this.parameterFileMetaList.push(req.body);
res.send();
} catch (err) {
this.log.error(`POST parameter-file-meta error: ${err}`);
res.status(500);
res.send(err.message);
}
});

router.get(`/parameter-file-meta`, (req: Request, res: Response) => {
try {
this.log.info(`GET /parameter-file-meta`);
res.send(this.parameterFileMetaList);
} catch (err) {
this.log.error(`GET parameter-file-meta error: ${err}`);
res.status(500);
res.send(err.message);
}
});

return router;
}
}
70 changes: 64 additions & 6 deletions src/nni_manager/training_service/pai/paiTrainingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { MethodNotImplementedError } from '../../common/errors';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import {
JobApplicationForm, NNIManagerIpConfig, TrainingService,
HyperParameters, JobApplicationForm, NNIManagerIpConfig, TrainingService,
TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
} from '../../common/trainingService';
import { delay, generateParamFileName,
Expand All @@ -45,7 +45,7 @@ import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
import { PAI_LOG_PATH_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
import { PAIJobInfoCollector } from './paiJobInfoCollector';
import { PAIJobRestServer } from './paiJobRestServer';
import { PAIJobRestServer, ParameterFileMeta } from './paiJobRestServer';

import * as WebHDFS from 'webhdfs';

Expand Down Expand Up @@ -79,6 +79,7 @@ class PAITrainingService implements TrainingService {
private copyExpCodeDirPromise?: Promise<void>;
private versionCheck: boolean = true;
private logCollection: string;
private isMultiPhase: boolean = false;

constructor() {
this.log = getLogger();
Expand Down Expand Up @@ -179,12 +180,22 @@ class PAITrainingService implements TrainingService {
return deferred.promise;
}

public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
throw new MethodNotImplementedError();
public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
}
if (form.jobType === 'TRIAL') {
await this.writeParameterFile(trialJobId, (<TrialJobApplicationForm>form).hyperParameters);
} else {
throw new Error(`updateTrialJob failed: jobType ${form.jobType} not supported.`);
}

return trialJobDetail;
}

public get isMultiPhaseJobSupported(): boolean {
return false;
return true;
}

// tslint:disable:no-http-string
Expand Down Expand Up @@ -336,6 +347,9 @@ class PAITrainingService implements TrainingService {
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
case TrialConfigMetadataKey.MULTI_PHASE:
this.isMultiPhase = (value === 'true' || value === 'True');
break;
default:
//Reject for unknown keys
throw new Error(`Uknown key: ${key}`);
Expand Down Expand Up @@ -445,6 +459,7 @@ class PAITrainingService implements TrainingService {
trialJobId,
this.experimentId,
trialJobDetail.sequenceId,
this.isMultiPhase,
this.paiTrialConfig.command,
nniManagerIp,
this.paiRestServerPort,
Expand Down Expand Up @@ -632,7 +647,50 @@ class PAITrainingService implements TrainingService {
return Promise.race([timeoutDelay, deferred.promise])
.finally(() => { clearTimeout(timeoutId); });
}
// tslint:enable:no-any no-unsafe-any no-http-string

private async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
if (this.paiClusterConfig === undefined) {
throw new Error('PAI Cluster config is not initialized');
}
if (this.paiTrialConfig === undefined) {
throw new Error('PAI trial config is not initialized');
}

const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
const hpFileName: string = generateParamFileName(hyperParameters);
const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);

await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);

await this.postParameterFileMeta({
experimentId: this.experimentId,
trialId: trialJobId,
filePath: hdfsHpFilePath
});
}

private postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
const deferred : Deferred<void> = new Deferred<void>();
const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
const req: request.Options = {
uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
method: 'POST',
json: true,
body: parameterFileMeta
};
request(req, (err: Error, res: request.Response) => {
if (err) {
deferred.reject(err);
} else {
deferred.resolve();
}
});

return deferred.promise;
}
}

export { PAITrainingService };
10 changes: 4 additions & 6 deletions src/sdk/pynni/nni/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
import importlib

from .constants import ModuleName, ClassName, ClassArgs, AdvisorModuleName, AdvisorClassName
from nni.common import enable_multi_thread
from nni.common import enable_multi_thread, enable_multi_phase
from nni.msg_dispatcher import MsgDispatcher
from nni.multi_phase.multi_phase_dispatcher import MultiPhaseMsgDispatcher
logger = logging.getLogger('nni.main')
logger.debug('START')

Expand Down Expand Up @@ -126,6 +125,8 @@ def main():
args = parse_args()
if args.multi_thread:
enable_multi_thread()
if args.multi_phase:
enable_multi_phase()

if args.advisor_class_name:
# advisor is enabled and starts to run
Expand Down Expand Up @@ -180,10 +181,7 @@ def main():
if assessor is None:
raise AssertionError('Failed to create Assessor instance')

if args.multi_phase:
dispatcher = MultiPhaseMsgDispatcher(tuner, assessor)
else:
dispatcher = MsgDispatcher(tuner, assessor)
dispatcher = MsgDispatcher(tuner, assessor)

try:
dispatcher.run()
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/pynni/nni/batch_tuner/batch_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def update_search_space(self, search_space):
"""
self.values = self.is_valid(search_space)

def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
"""Returns a dict of trial (hyper-)parameters, as a serializable object.

Parameters
Expand All @@ -90,7 +90,7 @@ def generate_parameters(self, parameter_id):
raise nni.NoMoreTrialError('no more parameters now.')
return self.values[self.count]

def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
pass

def import_data(self, data):
Expand Down
8 changes: 8 additions & 0 deletions src/sdk/pynni/nni/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,18 @@ def init_logger(logger_file_path, log_level_name='info'):
sys.stdout = _LoggerFileWrapper(logger_file)

_multi_thread = False
_multi_phase = False

def enable_multi_thread():
global _multi_thread
_multi_thread = True

def multi_thread_enabled():
return _multi_thread

def enable_multi_phase():
global _multi_phase
_multi_phase = True

def multi_phase_enabled():
return _multi_phase
4 changes: 2 additions & 2 deletions src/sdk/pynni/nni/evolution_tuner/evolution_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def update_search_space(self, search_space):
self.searchspace_json, is_rand, self.random_state)
self.population.append(Individual(config=config))

def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
"""Returns a dict of trial (hyper-)parameters, as a serializable object.

Parameters
Expand Down Expand Up @@ -232,7 +232,7 @@ def generate_parameters(self, parameter_id):
config = split_index(total_config)
return config

def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
'''Record the result from a trial

Parameters
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/pynni/nni/gridsearch_tuner/gridsearch_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def update_search_space(self, search_space):
'''
self.expanded_search_space = self.json2parameter(search_space)

def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
self.count += 1
while (self.count <= len(self.expanded_search_space)-1):
_params_tuple = convert_dict2tuple(self.expanded_search_space[self.count])
Expand All @@ -147,7 +147,7 @@ def generate_parameters(self, parameter_id):
return self.expanded_search_space[self.count]
raise nni.NoMoreTrialError('no more parameters now.')

def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
pass

def import_data(self, data):
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/pynni/nni/hyperopt_tuner/hyperopt_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def update_search_space(self, search_space):
verbose=0)
self.rval.catch_eval_exceptions = False

def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
"""
Returns a set of trial (hyper-)parameters, as a serializable object.

Expand All @@ -269,7 +269,7 @@ def generate_parameters(self, parameter_id):
params = split_index(total_params)
return params

def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
"""
Record an observation of the objective function

Expand Down
4 changes: 2 additions & 2 deletions src/sdk/pynni/nni/metis_tuner/metis_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _pack_output(self, init_parameter):
return output


def generate_parameters(self, parameter_id):
def generate_parameters(self, parameter_id, **kwargs):
"""Generate next parameter for trial
If the number of trial result is lower than cold start number,
metis will first random generate some parameters.
Expand Down Expand Up @@ -205,7 +205,7 @@ def generate_parameters(self, parameter_id):
return results


def receive_trial_result(self, parameter_id, parameters, value):
def receive_trial_result(self, parameter_id, parameters, value, **kwargs):
"""Tuner receive result from trial.

Parameters
Expand Down
Loading