From 8fadddc94b9f9fd67ea762669ba2f8ff19357873 Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Thu, 11 May 2017 17:09:34 +0200 Subject: [PATCH 1/9] split function run_task into two functions also parse parameters when running a flow on a task fix publishing error --- openml/__init__.py | 3 +- openml/flows/__init__.py | 2 +- openml/flows/flow.py | 39 +++++--- openml/flows/functions.py | 8 +- openml/runs/__init__.py | 8 +- openml/runs/functions.py | 60 +++++++----- openml/runs/run.py | 62 +++++++++---- openml/setups/functions.py | 52 ++++++----- tests/test_flows/test_flow.py | 7 -- tests/test_runs/test_run_functions.py | 106 +++++++++++++++------- tests/test_setups/test_setup_functions.py | 18 ++-- 11 files changed, 230 insertions(+), 135 deletions(-) diff --git a/openml/__init__.py b/openml/__init__.py index 1e10edba6..a17999db0 100644 --- a/openml/__init__.py +++ b/openml/__init__.py @@ -21,6 +21,7 @@ from . import tasks from . import runs from . import flows +from . import setups from .runs import OpenMLRun from .tasks import OpenMLTask, OpenMLSplit from .flows import OpenMLFlow @@ -66,4 +67,4 @@ def populate_cache(task_ids=None, dataset_ids=None, flow_ids=None, __all__ = ['OpenMLDataset', 'OpenMLDataFeature', 'OpenMLRun', 'OpenMLSplit', 'datasets', 'OpenMLTask', 'OpenMLFlow', - 'config', 'runs', 'flows', 'tasks'] + 'config', 'runs', 'flows', 'tasks', 'setups'] diff --git a/openml/flows/__init__.py b/openml/flows/__init__.py index f28e4994a..2d70e9e32 100644 --- a/openml/flows/__init__.py +++ b/openml/flows/__init__.py @@ -1,4 +1,4 @@ -from .flow import OpenMLFlow +from .flow import OpenMLFlow, _copy_server_fields from .sklearn_converter import sklearn_to_flow, flow_to_sklearn, _check_n_jobs from .functions import get_flow, list_flows, flow_exists, assert_flows_equal diff --git a/openml/flows/flow.py b/openml/flows/flow.py index 03a9d112d..f85dacf70 100644 --- a/openml/flows/flow.py +++ b/openml/flows/flow.py @@ -322,7 +322,16 @@ def _from_dict(cls, xml_dict): arguments['tags'] = tags arguments['model'] = None - return cls(**arguments) + flow = cls(**arguments) + + if 'sklearn' in arguments['external_version']: + from .sklearn_converter import flow_to_sklearn + model = flow_to_sklearn(flow) + else: + model = None + flow.model = model + + return flow def publish(self): """Publish flow to OpenML server. @@ -332,32 +341,38 @@ def publish(self): self : OpenMLFlow """ + import openml.flows.functions xml_description = self._to_xml() file_elements = {'description': xml_description} return_value = _perform_api_call("flow/", file_elements=file_elements) - self.flow_id = int(xmltodict.parse(return_value)['oml:upload_flow']['oml:id']) + flow_id = int(xmltodict.parse(return_value)['oml:upload_flow']['oml:id']) + flow = openml.flows.functions.get_flow(flow_id) try: - _check_flow(self) + openml.flows.functions.assert_flows_equal(self, flow) except ValueError as e: message = e.args[0] raise ValueError("Flow was not stored correctly on the server. " "New flow ID is %d. Please check manually and " "remove the flow if necessary! Error is:\n'%s'" % - (self.flow_id, message)) + (flow_id, message)) + _copy_server_fields(flow, self) return self +def _copy_server_fields(source_flow, target_flow): + fields_added_by_the_server = ['flow_id', 'uploader', 'version', + 'upload_date'] + for field in fields_added_by_the_server: + setattr(target_flow, field, getattr(source_flow, field)) + + for name, component in source_flow.components.items(): + assert name in target_flow.components + _copy_server_fields(component, target_flow.components[name]) + + def _add_if_nonempty(dic, key, value): if value is not None: dic[key] = value - -def _check_flow(flow): - # Import is not possible at the top of the file as this would cause an - # ImportError due to an import cycle. - import openml.flows.functions - - flow_copy = openml.flows.functions.get_flow(flow.flow_id) - openml.flows.functions.assert_flows_equal(flow, flow_copy) diff --git a/openml/flows/functions.py b/openml/flows/functions.py index d2da8a9bc..429a50c28 100644 --- a/openml/flows/functions.py +++ b/openml/flows/functions.py @@ -2,7 +2,7 @@ import six from openml._api_calls import _perform_api_call -from . import OpenMLFlow, flow_to_sklearn +from . import OpenMLFlow def get_flow(flow_id): @@ -24,9 +24,6 @@ def get_flow(flow_id): flow_dict = xmltodict.parse(flow_xml) flow = OpenMLFlow._from_dict(flow_dict) - if 'sklearn' in flow.external_version: - flow.model = flow_to_sklearn(flow) - return flow @@ -144,8 +141,7 @@ def assert_flows_equal(flow1, flow2): raise TypeError('Argument 2 must be of type OpenMLFlow, but is %s' % type(flow2)) - generated_by_the_server = ['flow_id', 'uploader', 'version', - 'upload_date', ] + generated_by_the_server = ['flow_id', 'uploader', 'version', 'upload_date'] ignored_by_python_API = ['binary_url', 'binary_format', 'binary_md5', 'model'] diff --git a/openml/runs/__init__.py b/openml/runs/__init__.py index deb045d4a..628ccf93b 100644 --- a/openml/runs/__init__.py +++ b/openml/runs/__init__.py @@ -1,6 +1,8 @@ from .run import OpenMLRun from .trace import OpenMLRunTrace, OpenMLTraceIteration -from .functions import (run_task, get_run, list_runs, get_runs, get_run_trace, - initialize_model_from_run, initialize_model_from_trace) +from .functions import (run_model_on_task, run_flow_on_task, get_run, list_runs, + get_runs, get_run_trace, initialize_model_from_run, + initialize_model_from_trace) -__all__ = ['OpenMLRun', 'run_task', 'get_run', 'list_runs', 'get_runs'] +__all__ = ['OpenMLRun', 'run_model_on_task', 'run_flow_on_task', 'get_run', + 'list_runs', 'get_runs'] diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 9e57a0fcb..aadaecae3 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -13,10 +13,11 @@ from ..exceptions import PyOpenMLError from .. import config -from ..flows import sklearn_to_flow, get_flow, flow_exists, _check_n_jobs +from ..flows import sklearn_to_flow, get_flow, flow_exists, _check_n_jobs, \ + _copy_server_fields from ..setups import setup_exists, initialize_model from ..exceptions import OpenMLCacheException, OpenMLServerException -from .._api_calls import _perform_api_call, _file_id_to_url +from .._api_calls import _perform_api_call from .run import OpenMLRun, _get_version_information from .trace import OpenMLRunTrace, OpenMLTraceIteration @@ -25,7 +26,31 @@ # circular imports -def run_task(task, model, avoid_duplicate_runs=True, flow_tags=None, seed=None): +def run_model_on_task(task, model, avoid_duplicate_runs=True, flow_tags=None, + seed=None): + flow = sklearn_to_flow(model) + + # returns flow id if the flow exists on the server, False otherwise + flow_id = flow_exists(flow.name, flow.external_version) + + if flow_id == False: + # TODO this is potential race condition! someone could upload the + # same flow in the meantime! + # means the flow did not exists. As we could run it, publish it now + flow = flow.publish() + else: + # flow already existed, download it from server + # TODO (neccessary? is this a post condition of this function) + flow_from_server = get_flow(flow_id) + _copy_server_fields(flow_from_server, flow) + + return run_flow_on_task(task=task, flow=flow, + avoid_duplicate_runs=avoid_duplicate_runs, + flow_tags=flow_tags, seed=seed) + + +def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None, + seed=None): """Performs a CV run on the dataset of the given task, using the split. Parameters @@ -51,23 +76,18 @@ def run_task(task, model, avoid_duplicate_runs=True, flow_tags=None, seed=None): """ if flow_tags is not None and not isinstance(flow_tags, list): raise ValueError("flow_tags should be list") - # TODO move this into its onwn module. While it somehow belongs here, it - # adds quite a lot of functionality which is better suited in other places! - # TODO why doesn't this accept a flow as input? - this would make this more flexible! - model = _get_seeded_model(model, seed) - flow = sklearn_to_flow(model) - # returns flow id if the flow exists on the server, False otherwise - flow_id = flow_exists(flow.name, flow.external_version) + flow.model = _get_seeded_model(flow.model, seed=seed) # skips the run if it already exists and the user opts for this in the config file. # also, if the flow is not present on the server, the check is not needed. - if avoid_duplicate_runs and flow_id: - flow = get_flow(flow_id) - setup_id = setup_exists(flow, model) + if avoid_duplicate_runs: + flow_from_server = get_flow(flow.flow_id) + setup_id = setup_exists(flow_from_server) ids = _run_exists(task.task_id, setup_id) if ids: raise PyOpenMLError("Run already exists in server. Run id(s): %s" %str(ids)) + _copy_server_fields(flow_from_server, flow) dataset = task.get_dataset() @@ -79,18 +99,12 @@ def run_task(task, model, avoid_duplicate_runs=True, flow_tags=None, seed=None): run_environment = _get_version_information() tags = ['openml-python', run_environment[1]] # execute the run - run = OpenMLRun(task_id=task.task_id, flow_id=None, dataset_id=dataset.dataset_id, model=model, tags=tags) - res = _run_task_get_arffcontent(model, task, class_labels) + run = OpenMLRun(task_id=task.task_id, flow_id=None, dataset_id=dataset.dataset_id, + model=flow.model, tags=tags) + run.parameter_settings = OpenMLRun._parse_parameters(flow) + res = _run_task_get_arffcontent(flow.model, task, class_labels) run.data_content, run.trace_content, run.trace_attributes, run.detailed_evaluations = res - if flow_id == False: - # means the flow did not exists. As we could run it, publish it now - flow = flow.publish() - else: - # flow already existed, download it from server - # TODO (neccessary? is this a post condition of this function) - flow = get_flow(flow_id) - run.flow_id = flow.flow_id config.logger.info('Executed Task %d with Flow id: %d' % (task.task_id, run.flow_id)) diff --git a/openml/runs/run.py b/openml/runs/run.py index de487660d..4a6bc1ea7 100644 --- a/openml/runs/run.py +++ b/openml/runs/run.py @@ -1,4 +1,5 @@ from collections import OrderedDict +import json import sys import time @@ -143,11 +144,6 @@ def _create_description_xml(self): XML description of run. """ - # TODO: don't we have flow object in data structure? Use this one - downloaded_flow = openml.flows.get_flow(self.flow_id) - - openml_param_settings = OpenMLRun._parse_parameters(self.model, downloaded_flow) - # as a tag, it must be of the form ([a-zA-Z0-9_\-\.])+ # so we format time from 'mm/dd/yy hh:mm:ss' to 'mm-dd-yy_hh.mm.ss' # well_formatted_time = time.strftime("%c").replace( @@ -156,7 +152,7 @@ def _create_description_xml(self): # [self.model.__module__ + "." + self.model.__class__.__name__] description = _to_dict(taskid=self.task_id, flow_id=self.flow_id, setup_string=_create_setup_string(self.model), - parameter_settings=openml_param_settings, + parameter_settings=self.parameter_settings, error_message=self.error_message, detailed_evaluations=self.detailed_evaluations, tags=self.tags) @@ -164,19 +160,28 @@ def _create_description_xml(self): return description_xml @staticmethod - def _parse_parameters(model, server_flow): - """Extracts all parameter settings from a model in OpenML format. + def _parse_parameters(flow): + """Extracts all parameter settings from the model inside a flow in + OpenML format. Parameters ---------- - model - the scikit-learn model (fitted) flow openml flow object (containing flow ids, i.e., it has to be downloaded from the server) """ - if server_flow.flow_id is None: - raise ValueError("The flow parameter needs to be downloaded from server") + + # Depth-first search to check if all components were uploaded to the + # server before parsing the parameters + stack = list() + stack.append(flow) + while len(stack) > 0: + current = stack.pop() + if current.flow_id is None: + raise ValueError("Flow %s has no flow_id!" % current.name) + else: + for component in current.components.values(): + stack.append(component) def get_flow_dict(_flow): flow_map = {_flow.name: _flow.flow_id} @@ -184,28 +189,45 @@ def get_flow_dict(_flow): flow_map.update(get_flow_dict(_flow.components[subflow])) return flow_map - def extract_parameters(_flow, _param_dict, _main_call=False, main_id=None): + def extract_parameters(_flow, _flow_dict, _main_call=False, main_id=None): # _flow is openml flow object, _param dict maps from flow name to flow id # for the main call, the param dict can be overridden (useful for unit tests / sentinels) - # this way, for flows without subflows we do not have to rely on _param_dict + # this way, for flows without subflows we do not have to rely on _flow_dict _params = [] for _param_name in _flow.parameters: _current = OrderedDict() _current['oml:name'] = _param_name - _current['oml:value'] = _flow.parameters[_param_name] + + _tmp = openml.flows.sklearn_to_flow(_flow.model.get_params()[_param_name]) + + # Try to filter out components which are handled further down! + if isinstance(_tmp, openml.flows.OpenMLFlow): + continue + try: + _tmp = json.dumps(_tmp) + except TypeError as e: + # Python3.5 exception message: + # is not JSON serializable + # Python3.6 exception message: + # Object of type 'OpenMLFlow' is not JSON serializable + if 'OpenMLFlow' in e.args[0] and \ + 'is not JSON serializable' in e.args[0]: + continue + + _current['oml:value'] = _tmp if _main_call: _current['oml:component'] = main_id else: - _current['oml:component'] = _param_dict[_flow.name] + _current['oml:component'] = _flow_dict[_flow.name] _params.append(_current) + for _identifier in _flow.components: - _params.extend(extract_parameters(_flow.components[_identifier], _param_dict)) + _params.extend(extract_parameters(_flow.components[_identifier], _flow_dict)) return _params - flow_dict = get_flow_dict(server_flow) - local_flow = openml.flows.sklearn_to_flow(model) + flow_dict = get_flow_dict(flow) + parameters = extract_parameters(flow, flow_dict, True, flow.flow_id) - parameters = extract_parameters(local_flow, flow_dict, True, server_flow.flow_id) return parameters ################################################################################ diff --git a/openml/setups/functions.py b/openml/setups/functions.py index cb3ddd6e9..9d8cc4bd4 100644 --- a/openml/setups/functions.py +++ b/openml/setups/functions.py @@ -2,18 +2,18 @@ import openml import xmltodict +import copy from .setup import OpenMLSetup, OpenMLParameter - -def setup_exists(downloaded_flow, sklearn_model): +def setup_exists(flow): ''' Checks whether a flow / hyperparameter configuration already exists on the server Parameter --------- - downloaded_flow : flow + flow : flow the openml flow object (should be downloaded from server) sklearn_model : BaseEstimator The base estimator that was used to create the flow. Will @@ -26,9 +26,11 @@ def setup_exists(downloaded_flow, sklearn_model): ''' # sadly, this api call relies on a run object - openml_param_settings = openml.runs.OpenMLRun._parse_parameters(sklearn_model, downloaded_flow) - description = xmltodict.unparse(_to_dict(downloaded_flow.flow_id, openml_param_settings), pretty=True) - file_elements = {'description': ('description.arff',description)} + openml_param_settings = openml.runs.OpenMLRun._parse_parameters(flow) + description = xmltodict.unparse(_to_dict(flow.flow_id, + openml_param_settings), + pretty=True) + file_elements = {'description': ('description.arff', description)} result = openml._api_calls._perform_api_call('/setup/exists/', file_elements=file_elements) @@ -75,37 +77,39 @@ def initialize_model(setup_id): model : sklearn model the scikitlearn model with all parameters initailized ''' - def _to_dict_of_dicts(_params): - # this subfunction transforms an openml setup object into - # a dict of dicts, structured: flow_id maps to dict of - # parameter_names mapping to parameter_value - _res = {} - for _param in _params: - _flow_id = _params[_param].flow_id - _param_name = _params[_param].parameter_name - _param_value = _params[_param].value - if _flow_id not in _res: - _res[_flow_id] = {} - _res[_flow_id][_param_name] = _param_value - return _res + + # transform an openml setup object into + # a dict of dicts, structured: flow_id maps to dict of + # parameter_names mapping to parameter_value + + setup = get_setup(setup_id) + parameters = {} + for _param in setup.parameters: + _flow_id = setup.parameters[_param].flow_id + _param_name = setup.parameters[_param].parameter_name + _param_value = setup.parameters[_param].value + if _flow_id not in parameters: + parameters[_flow_id] = {} + parameters[_flow_id][_param_name] = _param_value def _reconstruct_flow(_flow, _params): - # sets the values of flow parameters (and subflows) to + # recursively set the values of flow parameters (and subflows) to # the specific values from a setup. _params is a dict of # dicts, mapping from flow id to param name to param value # (obtained by using the subfunction _to_dict_of_dicts) for _param in _flow.parameters: + if _flow.flow_id not in _params: + continue + if _param not in _params[_flow.flow_id]: + continue _flow.parameters[_param] = _params[_flow.flow_id][_param] for _identifier in _flow.components: _flow.components[_identifier] = _reconstruct_flow(_flow.components[_identifier], _params) return _flow - setup = get_setup(setup_id) - parameters = _to_dict_of_dicts(setup.parameters) - flow = openml.flows.get_flow(setup.flow_id) - # now we 'abuse' the parameter object by passing in the # parameters obtained from the setup + flow = openml.flows.get_flow(setup.flow_id) flow = _reconstruct_flow(flow, parameters) return openml.flows.flow_to_sklearn(flow) diff --git a/tests/test_flows/test_flow.py b/tests/test_flows/test_flow.py index b9496858c..fdd032366 100644 --- a/tests/test_flows/test_flow.py +++ b/tests/test_flows/test_flow.py @@ -202,7 +202,6 @@ def test_existing_flow_exists(self): downloaded_flow_id = openml.flows.flow_exists(flow.name, flow.external_version) self.assertEquals(downloaded_flow_id, flow.flow_id) - def test_sklearn_to_upload_to_flow(self): iris = sklearn.datasets.load_iris() X = iris.data @@ -243,12 +242,6 @@ def test_sklearn_to_upload_to_flow(self): local_xml = flow._to_xml() server_xml = new_flow._to_xml() - local_xml = re.sub('[0-9]+', '', local_xml) - server_xml = re.sub('[0-9]+', '', server_xml) - server_xml = re.sub('[0-9]+', '', server_xml) - server_xml = re.sub('[0-9]+', '', server_xml) - server_xml = re.sub('[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}', '', server_xml) - for i in range(10): # Make sure that we replace all occurences of two newlines local_xml = local_xml.replace(sentinel, '') diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index d8eb47f94..8dfb5161a 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -15,6 +15,7 @@ from openml.runs.functions import _run_task_get_arffcontent, \ _get_seeded_model, _run_exists, _extract_arfftrace, \ _extract_arfftrace_attributes, _prediction_to_row, _check_n_jobs +from openml.flows.sklearn_converter import sklearn_to_flow from sklearn.naive_bayes import GaussianNB from sklearn.model_selection._search import BaseSearchCV @@ -70,7 +71,9 @@ def _check_serialized_optimized_run(self, run_id): self._wait_for_processed_run(run_id, 80) model_prime = openml.runs.initialize_model_from_trace(run_id, 0, 0) - run_prime = openml.runs.run_task(task, model_prime, avoid_duplicate_runs=False) + run_prime = openml.runs.run_model_on_task(task, model_prime, + avoid_duplicate_runs=False, + seed=1) predictions_prime = run_prime._generate_arff_dict() self.assertEquals(len(predictions_prime['data']), len(predictions['data'])) @@ -87,10 +90,22 @@ def _check_serialized_optimized_run(self, run_id): return True + def _perform_run(self, task_id, num_instances, clf, + random_state_value=None, check_setup=True): + + def _remove_random_state(flow): + if 'random_state' in flow.parameters: + del flow.parameters['random_state'] + for component in flow.components.values(): + _remove_random_state(component) + + flow = sklearn_to_flow(clf) + flow, _ = self._add_sentinel_to_flow_name(flow, None) + flow.publish() - def _perform_run(self, task_id, num_instances, clf, check_setup=True): task = openml.tasks.get_task(task_id) - run = openml.runs.run_task(task, clf, openml.config.avoid_duplicate_runs) + run = openml.runs.run_flow_on_task(task, flow, seed=1, + avoid_duplicate_runs=openml.config.avoid_duplicate_runs) run_ = run.publish() self.assertEqual(run_, run) self.assertIsInstance(run.dataset_id, int) @@ -107,11 +122,32 @@ def _perform_run(self, task_id, num_instances, clf, check_setup=True): flow_local = openml.flows.sklearn_to_flow(clf) flow_server = openml.flows.sklearn_to_flow(clf_server) + if flow.class_name not in \ + ['sklearn.model_selection._search.GridSearchCV', + 'sklearn.pipeline.Pipeline']: + # If the flow is initialized from a model without a random state, + # the flow is on the server without any random state + self.assertEqual(flow.parameters['random_state'], 'null') + # As soon as a flow is run, a random state is set in the model. + # If a flow is re-instantiated + self.assertEqual(flow_local.parameters['random_state'], + random_state_value) + self.assertEqual(flow_server.parameters['random_state'], + random_state_value) + _remove_random_state(flow_local) + _remove_random_state(flow_server) openml.flows.assert_flows_equal(flow_local, flow_server) # and test the initialize setup from run function clf_server2 = openml.runs.initialize_model_from_run(run_server.run_id) flow_server2 = openml.flows.sklearn_to_flow(clf_server2) + if flow.class_name not in \ + ['sklearn.model_selection._search.GridSearchCV', + 'sklearn.pipeline.Pipeline']: + self.assertEqual(flow_server2.parameters['random_state'], + random_state_value) + + _remove_random_state(flow_server2) openml.flows.assert_flows_equal(flow_local, flow_server2) #self.assertEquals(clf.get_params(), clf_prime.get_params()) @@ -155,19 +191,21 @@ def test_run_regression_on_classif_task(self): clf = LinearRegression() task = openml.tasks.get_task(task_id) - self.assertRaises(AttributeError, openml.runs.run_task, + self.assertRaises(AttributeError, openml.runs.run_model_on_task, task=task, model=clf, avoid_duplicate_runs=False) - @mock.patch('openml.flows.sklearn_to_flow') - def test_check_erronous_sklearn_flow_fails(self, sklearn_to_flow_mock): + def test_check_erronous_sklearn_flow_fails(self): task_id = 115 task = openml.tasks.get_task(task_id) # Invalid parameter values clf = LogisticRegression(C='abc') - self.assertEqual(sklearn_to_flow_mock.call_count, 0) - self.assertRaisesRegexp(ValueError, "Penalty term must be positive; got \(C='abc'\)", - openml.runs.run_task, task=task, model=clf) + self.assertRaisesRegexp(ValueError, + "Penalty term must be positive; got " + # u? for 2.7/3.4-6 compability + "\(C=u?'abc'\)", + openml.runs.run_model_on_task, task=task, + model=clf) def test_run_and_upload(self): # This unit test is ment to test the following functions, using a varity of flows: @@ -182,16 +220,15 @@ def test_run_and_upload(self): num_iterations = 5 # for base search classifiers clfs = [LogisticRegression(), - Pipeline(steps=(('scaler', StandardScaler(with_mean=False)), - ('dummy', DummyClassifier(strategy='prior')))), + Pipeline(steps=[('scaler', StandardScaler(with_mean=False)), + ('dummy', DummyClassifier(strategy='prior'))]), Pipeline(steps=[('Imputer', Imputer(strategy='median')), ('VarianceThreshold', VarianceThreshold()), - ('Estimator', RandomizedSearchCV(DecisionTreeClassifier(), - {'min_samples_split': [2 ** x for x in - range(1, 7 + 1)], - 'min_samples_leaf': [2 ** x for x in - range(0, 6 + 1)]}, - cv=3, n_iter=10))]), + ('Estimator', RandomizedSearchCV( + DecisionTreeClassifier(), + {'min_samples_split': [2 ** x for x in range(1, 7 + 1)], + 'min_samples_leaf': [2 ** x for x in range(0, 6 + 1)]}, + cv=3, n_iter=10))]), GridSearchCV(BaggingClassifier(base_estimator=SVC()), {"base_estimator__C": [0.01, 0.1, 10], "base_estimator__gamma": [0.01, 0.1, 10]}), @@ -202,12 +239,18 @@ def test_run_and_upload(self): "min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "bootstrap": [True, False], "criterion": ["gini", "entropy"]}, - cv=StratifiedKFold(n_splits=2), - n_iter=num_iterations) - ] - - for clf in clfs: - run = self._perform_run(task_id, num_test_instances, clf) + cv=StratifiedKFold(n_splits=2, + random_state=1), + n_iter=num_iterations)] + + # The random states for the RandomizedSearchCV is set after the + # random state of the RandomForestClassifier is set, therefore, + # it has a different value than the other examples before + random_state_values = ['62501'] * (len(clfs) - 1) + ['33003'] + + for clf, rsv in zip(clfs, random_state_values): + run = self._perform_run(task_id, num_test_instances, clf, + random_state_value=rsv) if isinstance(clf, BaseSearchCV): if isinstance(clf, GridSearchCV): grid_iterations = 1 @@ -229,7 +272,7 @@ def test_initialize_model_from_run(self): ('VarianceThreshold', VarianceThreshold(threshold=0.05)), ('Estimator', GaussianNB())]) task = openml.tasks.get_task(11) - run = openml.runs.run_task(task, clf, avoid_duplicate_runs=False) + run = openml.runs.run_model_on_task(task, clf, avoid_duplicate_runs=False) run_ = run.publish() run = openml.runs.get_run(run_.run_id) @@ -265,7 +308,7 @@ def test_get_run_trace(self): # [SPEED] make unit test faster by exploiting run information from the past try: # in case the run did not exists yet - run = openml.runs.run_task(task, clf, avoid_duplicate_runs=True) + run = openml.runs.run_model_on_task(task, clf, avoid_duplicate_runs=True) run = run.publish() self._wait_for_processed_run(run.run_id, 80) run_id = run.run_id @@ -275,7 +318,7 @@ def test_get_run_trace(self): flow_exists = openml.flows.flow_exists(flow.name, flow.external_version) self.assertIsInstance(flow_exists, int) downloaded_flow = openml.flows.get_flow(flow_exists) - setup_exists = openml.setups.setup_exists(downloaded_flow, clf) + setup_exists = openml.setups.setup_exists(downloaded_flow) self.assertIsInstance(setup_exists, int) run_ids = _run_exists(task.task_id, setup_exists) run_id = random.choice(list(run_ids)) @@ -294,13 +337,15 @@ def test__run_exists(self): sklearn.pipeline.Pipeline(steps=[('Imputer', Imputer(strategy='most_frequent')), ('VarianceThreshold', VarianceThreshold(threshold=0.1)), ('Estimator', DecisionTreeClassifier(max_depth=4))])] - task = openml.tasks.get_task(1) + + + task = openml.tasks.get_task(115) for clf in clfs: try: # first populate the server with this run. # skip run if it was already performed. - run = openml.runs.run_task(task, clf, avoid_duplicate_runs=True) + run = openml.runs.run_model_on_task(task, clf, avoid_duplicate_runs=True) run.publish() except openml.exceptions.PyOpenMLError: # run already existed. Great. @@ -310,12 +355,11 @@ def test__run_exists(self): flow_exists = openml.flows.flow_exists(flow.name, flow.external_version) self.assertIsInstance(flow_exists, int) downloaded_flow = openml.flows.get_flow(flow_exists) - setup_exists = openml.setups.setup_exists(downloaded_flow, clf) + setup_exists = openml.setups.setup_exists(downloaded_flow) self.assertIsInstance(setup_exists, int) run_ids = _run_exists(task.task_id, setup_exists) self.assertGreater(len(run_ids), 0) - def test__get_seeded_model(self): # randomized models that are initialized without seeds, can be seeded randomized_clfs = [ @@ -457,7 +501,7 @@ def test_run_with_classifiers_in_param_grid(self): } clf = GridSearchCV(BaggingClassifier(), param_grid=param_grid) - self.assertRaises(TypeError, openml.runs.run_task, + self.assertRaises(TypeError, openml.runs.run_model_on_task, task=task, model=clf, avoid_duplicate_runs=False) def test__run_task_get_arffcontent(self): diff --git a/tests/test_setups/test_setup_functions.py b/tests/test_setups/test_setup_functions.py index 29e116f7c..434525865 100644 --- a/tests/test_setups/test_setup_functions.py +++ b/tests/test_setups/test_setup_functions.py @@ -41,30 +41,34 @@ def test_nonexisting_setup_exists(self): # although the flow exists (created as of previous statement), # we can be sure there are no setups (yet) as it was just created # and hasn't been ran - setup_id = openml.setups.setup_exists(flow, dectree) + setup_id = openml.setups.setup_exists(flow) self.assertFalse(setup_id) - def test_existing_setup_exists(self): # first publish a nonexiting flow # because of the sentinel, we can not use flows that contain subflows classif = DecisionTreeClassifier(max_depth=5, - min_samples_split=3) + min_samples_split=3, + # Not setting the random state will + # make this flow fail as running it + # will add a random random_state. + random_state=1) flow = openml.flows.sklearn_to_flow(classif) flow.name = 'TEST%s%s' % (get_sentinel(), flow.name) + # Replace the flow by a flow in which the ID got set up correctly flow = flow.publish() flow = openml.flows.get_flow(flow.flow_id) # although the flow exists, we can be sure there are no # setups (yet) as it hasn't been ran - setup_id = openml.setups.setup_exists(flow, classif) + setup_id = openml.setups.setup_exists(flow) self.assertFalse(setup_id) # now run the flow on an easy task: - task = openml.tasks.get_task(115) #diabetes - run = openml.runs.run_task(task, classif) + task = openml.tasks.get_task(115) # diabetes + run = openml.runs.run_flow_on_task(task, flow) # spoof flow id, otherwise the sentinel is ignored run.flow_id = flow.flow_id run = run.publish() @@ -72,7 +76,7 @@ def test_existing_setup_exists(self): run = openml.runs.get_run(run.run_id) # execute the function we are interested in - setup_id = openml.setups.setup_exists(flow, classif) + setup_id = openml.setups.setup_exists(flow) self.assertEquals(setup_id, run.setup_id) def test_get_setup(self): From 9d6bea1cf3c1a65f8feb4dbf5c68e323b3bba1e1 Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Tue, 16 May 2017 16:24:02 +0200 Subject: [PATCH 2/9] make unit test stricter --- openml/flows/functions.py | 16 +++++ openml/runs/run.py | 36 ++++++------ openml/setups/functions.py | 30 +++++++--- tests/test_runs/test_run.py | 49 ++++++++++++++++ tests/test_runs/test_run_functions.py | 84 +++++++++++++++------------ 5 files changed, 153 insertions(+), 62 deletions(-) create mode 100644 tests/test_runs/test_run.py diff --git a/openml/flows/functions.py b/openml/flows/functions.py index 429a50c28..993fbbcd1 100644 --- a/openml/flows/functions.py +++ b/openml/flows/functions.py @@ -127,6 +127,22 @@ def _list_flows(api_call): return flows +def _check_flow_for_server_id(flow): + """Check if the given flow and it's components have a flow_id.""" + + # Depth-first search to check if all components were uploaded to the + # server before parsing the parameters + stack = list() + stack.append(flow) + while len(stack) > 0: + current = stack.pop() + if current.flow_id is None: + raise ValueError("Flow %s has no flow_id!" % current.name) + else: + for component in current.components.values(): + stack.append(component) + + def assert_flows_equal(flow1, flow2): """Check equality of two flows. diff --git a/openml/runs/run.py b/openml/runs/run.py index 4a6bc1ea7..430537d84 100644 --- a/openml/runs/run.py +++ b/openml/runs/run.py @@ -160,28 +160,24 @@ def _create_description_xml(self): return description_xml @staticmethod - def _parse_parameters(flow): + def _parse_parameters(flow, model=None): """Extracts all parameter settings from the model inside a flow in OpenML format. Parameters ---------- - flow + flow : OpenMLFlow openml flow object (containing flow ids, i.e., it has to be downloaded from the server) + model : BaseEstimator, optional + If not given, the parameters are extracted from ``flow.model``. + """ - # Depth-first search to check if all components were uploaded to the - # server before parsing the parameters - stack = list() - stack.append(flow) - while len(stack) > 0: - current = stack.pop() - if current.flow_id is None: - raise ValueError("Flow %s has no flow_id!" % current.name) - else: - for component in current.components.values(): - stack.append(component) + if model is None: + model = flow.model + + openml.flows.functions._check_flow_for_server_id(flow) def get_flow_dict(_flow): flow_map = {_flow.name: _flow.flow_id} @@ -189,7 +185,8 @@ def get_flow_dict(_flow): flow_map.update(get_flow_dict(_flow.components[subflow])) return flow_map - def extract_parameters(_flow, _flow_dict, _main_call=False, main_id=None): + def extract_parameters(_flow, _flow_dict, component_model, + _main_call=False, main_id=None): # _flow is openml flow object, _param dict maps from flow name to flow id # for the main call, the param dict can be overridden (useful for unit tests / sentinels) # this way, for flows without subflows we do not have to rely on _flow_dict @@ -198,7 +195,8 @@ def extract_parameters(_flow, _flow_dict, _main_call=False, main_id=None): _current = OrderedDict() _current['oml:name'] = _param_name - _tmp = openml.flows.sklearn_to_flow(_flow.model.get_params()[_param_name]) + _tmp = openml.flows.sklearn_to_flow( + component_model.get_params()[_param_name]) # Try to filter out components which are handled further down! if isinstance(_tmp, openml.flows.OpenMLFlow): @@ -222,14 +220,18 @@ def extract_parameters(_flow, _flow_dict, _main_call=False, main_id=None): _params.append(_current) for _identifier in _flow.components: - _params.extend(extract_parameters(_flow.components[_identifier], _flow_dict)) + subcomponent_model = component_model.get_params()[_identifier] + _params.extend(extract_parameters(_flow.components[_identifier], + _flow_dict, subcomponent_model)) return _params flow_dict = get_flow_dict(flow) - parameters = extract_parameters(flow, flow_dict, True, flow.flow_id) + parameters = extract_parameters(flow, flow_dict, model, + True, flow.flow_id) return parameters + ################################################################################ # Functions which cannot be in runs/functions due to circular imports diff --git a/openml/setups/functions.py b/openml/setups/functions.py index 9d8cc4bd4..bf69039e2 100644 --- a/openml/setups/functions.py +++ b/openml/setups/functions.py @@ -2,22 +2,25 @@ import openml import xmltodict -import copy from .setup import OpenMLSetup, OpenMLParameter +from openml.flows import sklearn_to_flow, flow_exists -def setup_exists(flow): + +def setup_exists(flow, model=None): ''' - Checks whether a flow / hyperparameter configuration already exists on the server + Checks whether a hyperparameter configuration already exists on the server. Parameter --------- flow : flow - the openml flow object (should be downloaded from server) - sklearn_model : BaseEstimator - The base estimator that was used to create the flow. Will - be used to extract parameter settings from. + The openml flow object. + + sklearn_model : BaseEstimator, optional + If given, the parameters are parsed from this model instead of the + model in the flow. If not given, parameters are parsed from + ``flow.model``. Returns ------- @@ -26,7 +29,18 @@ def setup_exists(flow): ''' # sadly, this api call relies on a run object - openml_param_settings = openml.runs.OpenMLRun._parse_parameters(flow) + openml.flows.functions._check_flow_for_server_id(flow) + + if model is None: + model = flow.model + else: + converted_flow = sklearn_to_flow(model) + exists = flow_exists(converted_flow.name, + converted_flow.external_version) + if exists != flow.flow_id: + raise ValueError('This should not happen!') + + openml_param_settings = openml.runs.OpenMLRun._parse_parameters(flow, model) description = xmltodict.unparse(_to_dict(flow.flow_id, openml_param_settings), pretty=True) diff --git a/tests/test_runs/test_run.py b/tests/test_runs/test_run.py new file mode 100644 index 000000000..6a47a6670 --- /dev/null +++ b/tests/test_runs/test_run.py @@ -0,0 +1,49 @@ +from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier +from sklearn.linear_model import LogisticRegression +from sklearn.model_selection import RandomizedSearchCV, StratifiedKFold + +from openml.testing import TestBase +from openml.flows.sklearn_converter import sklearn_to_flow +from openml import OpenMLRun + + +class TestRun(TestBase): + + def test_parse_parameters_flow_not_on_server(self): + + model = LogisticRegression() + flow = sklearn_to_flow(model) + self.assertRaisesRegexp(ValueError, + 'Flow sklearn.linear_model.logistic.LogisticRegression ' + 'has no flow_id!', + OpenMLRun._parse_parameters, flow) + + model = AdaBoostClassifier(base_estimator=LogisticRegression()) + flow = sklearn_to_flow(model) + flow.flow_id = 1 + self.assertRaisesRegexp(ValueError, + 'Flow sklearn.linear_model.logistic.LogisticRegression ' + 'has no flow_id!', + OpenMLRun._parse_parameters, flow) + + def test_parse_parameters(self): + + model = RandomizedSearchCV( + estimator=RandomForestClassifier(n_estimators=5), + param_distributions={"max_depth": [3, None], + "max_features": [1, 2, 3, 4], + "min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10], + "min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "bootstrap": [True, False], + "criterion": ["gini", "entropy"]}, + cv=StratifiedKFold(n_splits=2, random_state=1), + n_iter=5) + flow = sklearn_to_flow(model) + flow.flow_id = 1 + flow.components['estimator'].flow_id = 2 + parameters = OpenMLRun._parse_parameters(flow) + for parameter in parameters: + self.assertIsNotNone(parameter['oml:component'], msg=parameter) + if parameter['oml:name'] == 'n_estimators': + self.assertEqual(parameter['oml:value'], '5') + self.assertEqual(parameter['oml:component'], 2) diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index 8dfb5161a..d7ab72de4 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -32,11 +32,6 @@ StratifiedKFold from sklearn.pipeline import Pipeline -if sys.version_info[0] >= 3: - from unittest import mock -else: - import mock - class TestRun(TestBase): @@ -219,34 +214,50 @@ def test_run_and_upload(self): num_folds = 1 # because of holdout num_iterations = 5 # for base search classifiers - clfs = [LogisticRegression(), - Pipeline(steps=[('scaler', StandardScaler(with_mean=False)), - ('dummy', DummyClassifier(strategy='prior'))]), - Pipeline(steps=[('Imputer', Imputer(strategy='median')), - ('VarianceThreshold', VarianceThreshold()), - ('Estimator', RandomizedSearchCV( - DecisionTreeClassifier(), - {'min_samples_split': [2 ** x for x in range(1, 7 + 1)], - 'min_samples_leaf': [2 ** x for x in range(0, 6 + 1)]}, - cv=3, n_iter=10))]), - GridSearchCV(BaggingClassifier(base_estimator=SVC()), - {"base_estimator__C": [0.01, 0.1, 10], - "base_estimator__gamma": [0.01, 0.1, 10]}), - RandomizedSearchCV(RandomForestClassifier(n_estimators=5), - {"max_depth": [3, None], - "max_features": [1, 2, 3, 4], - "min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10], - "min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], - "bootstrap": [True, False], - "criterion": ["gini", "entropy"]}, - cv=StratifiedKFold(n_splits=2, - random_state=1), - n_iter=num_iterations)] - + clfs = [] + random_state_values = [] + + lr = LogisticRegression() + clfs.append(lr) + random_state_values.append('62501') + + pipeline1 = Pipeline(steps=[('scaler', StandardScaler(with_mean=False)), + ('dummy', DummyClassifier(strategy='prior'))]) + clfs.append(pipeline1) + random_state_values.append('62501') + + pipeline2 = Pipeline(steps=[('Imputer', Imputer(strategy='median')), + ('VarianceThreshold', VarianceThreshold()), + ('Estimator', RandomizedSearchCV( + DecisionTreeClassifier(), + {'min_samples_split': [2 ** x for x in range(1, 7 + 1)], + 'min_samples_leaf': [2 ** x for x in range(0, 6 + 1)]}, + cv=3, n_iter=10))]) + clfs.append(pipeline2) + random_state_values.append('62501') + + gridsearch = GridSearchCV(BaggingClassifier(base_estimator=SVC()), + {"base_estimator__C": [0.01, 0.1, 10], + "base_estimator__gamma": [0.01, 0.1, 10]}) + clfs.append(gridsearch) + random_state_values.append('62501') + + randomsearch = RandomizedSearchCV( + RandomForestClassifier(n_estimators=5), + {"max_depth": [3, None], + "max_features": [1, 2, 3, 4], + "min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10], + "min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "bootstrap": [True, False], + "criterion": ["gini", "entropy"]}, + cv=StratifiedKFold(n_splits=2, random_state=1), + n_iter=num_iterations) + + clfs.append(randomsearch) # The random states for the RandomizedSearchCV is set after the # random state of the RandomForestClassifier is set, therefore, # it has a different value than the other examples before - random_state_values = ['62501'] * (len(clfs) - 1) + ['33003'] + random_state_values.append('33003') for clf, rsv in zip(clfs, random_state_values): run = self._perform_run(task_id, num_test_instances, clf, @@ -333,12 +344,11 @@ def test__run_exists(self): # and can just check their status on line clfs = [sklearn.pipeline.Pipeline(steps=[('Imputer', Imputer(strategy='mean')), ('VarianceThreshold', VarianceThreshold(threshold=0.05)), - ('Estimator', GaussianNB())]), + ('Estimator', DecisionTreeClassifier(max_depth=4))]), sklearn.pipeline.Pipeline(steps=[('Imputer', Imputer(strategy='most_frequent')), ('VarianceThreshold', VarianceThreshold(threshold=0.1)), ('Estimator', DecisionTreeClassifier(max_depth=4))])] - task = openml.tasks.get_task(115) for clf in clfs: @@ -347,18 +357,18 @@ def test__run_exists(self): # skip run if it was already performed. run = openml.runs.run_model_on_task(task, clf, avoid_duplicate_runs=True) run.publish() - except openml.exceptions.PyOpenMLError: + except openml.exceptions.PyOpenMLError as e: # run already existed. Great. pass flow = openml.flows.sklearn_to_flow(clf) flow_exists = openml.flows.flow_exists(flow.name, flow.external_version) - self.assertIsInstance(flow_exists, int) + self.assertGreater(flow_exists, 0) downloaded_flow = openml.flows.get_flow(flow_exists) - setup_exists = openml.setups.setup_exists(downloaded_flow) - self.assertIsInstance(setup_exists, int) + setup_exists = openml.setups.setup_exists(downloaded_flow, clf) + self.assertGreater(setup_exists, 0) run_ids = _run_exists(task.task_id, setup_exists) - self.assertGreater(len(run_ids), 0) + self.assertTrue(run_ids, msg=(run_ids, clf)) def test__get_seeded_model(self): # randomized models that are initialized without seeds, can be seeded From 9a9917a3459b13cd52ff399245adb81cea50a56c Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Tue, 16 May 2017 16:35:30 +0200 Subject: [PATCH 3/9] incorporate comments from Jan --- openml/flows/flow.py | 2 +- openml/runs/functions.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/openml/flows/flow.py b/openml/flows/flow.py index f85dacf70..850fb16eb 100644 --- a/openml/flows/flow.py +++ b/openml/flows/flow.py @@ -324,7 +324,7 @@ def _from_dict(cls, xml_dict): arguments['model'] = None flow = cls(**arguments) - if 'sklearn' in arguments['external_version']: + if arguments['external_version'].startswith('sklearn'): from .sklearn_converter import flow_to_sklearn model = flow_to_sklearn(flow) else: diff --git a/openml/runs/functions.py b/openml/runs/functions.py index aadaecae3..13ef49f41 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -99,13 +99,12 @@ def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None, run_environment = _get_version_information() tags = ['openml-python', run_environment[1]] # execute the run - run = OpenMLRun(task_id=task.task_id, flow_id=None, dataset_id=dataset.dataset_id, - model=flow.model, tags=tags) + run = OpenMLRun(task_id=task.task_id, flow_id=flow.flow_id, + dataset_id=dataset.dataset_id, model=flow.model, tags=tags) run.parameter_settings = OpenMLRun._parse_parameters(flow) res = _run_task_get_arffcontent(flow.model, task, class_labels) run.data_content, run.trace_content, run.trace_attributes, run.detailed_evaluations = res - run.flow_id = flow.flow_id config.logger.info('Executed Task %d with Flow id: %d' % (task.task_id, run.flow_id)) return run From beaa04696fb44778b407e05b8a61bd103b6132fd Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Tue, 16 May 2017 22:18:25 +0200 Subject: [PATCH 4/9] improve testing of setup_exists --- openml/runs/functions.py | 2 +- openml/setups/functions.py | 4 +- tests/test_setups/test_setup_functions.py | 98 ++++++++++++++--------- 3 files changed, 62 insertions(+), 42 deletions(-) diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 13ef49f41..dcd1ded80 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -7,7 +7,7 @@ import warnings import numpy as np -import sklearn +import sklearn.pipeline import six import xmltodict diff --git a/openml/setups/functions.py b/openml/setups/functions.py index bf69039e2..4eb2610eb 100644 --- a/openml/setups/functions.py +++ b/openml/setups/functions.py @@ -34,9 +34,7 @@ def setup_exists(flow, model=None): if model is None: model = flow.model else: - converted_flow = sklearn_to_flow(model) - exists = flow_exists(converted_flow.name, - converted_flow.external_version) + exists = flow_exists(flow.name, flow.external_version) if exists != flow.flow_id: raise ValueError('This should not happen!') diff --git a/tests/test_setups/test_setup_functions.py b/tests/test_setups/test_setup_functions.py index 434525865..1082ddfb5 100644 --- a/tests/test_setups/test_setup_functions.py +++ b/tests/test_setups/test_setup_functions.py @@ -8,11 +8,9 @@ from sklearn.ensemble import BaggingClassifier from sklearn.tree import DecisionTreeClassifier - -if sys.version_info[0] >= 3: - from unittest import mock -else: - import mock +from sklearn.linear_model import LogisticRegression +from sklearn.naive_bayes import GaussianNB +from sklearn.base import BaseEstimator, ClassifierMixin def get_sentinel(): @@ -26,6 +24,29 @@ def get_sentinel(): return sentinel +class ParameterFreeClassifier(BaseEstimator, ClassifierMixin): + def __init__(self): + self.estimator = None + + def fit(self, X, y): + self.estimator = DecisionTreeClassifier() + self.estimator.fit(X, y) + self.classes_ = self.estimator.classes_ + return self + + def predict(self, X): + return self.estimator.predict(X) + + def predict_proba(self, X): + return self.estimator.predict_proba(X) + + def set_params(self, **params): + pass + + def get_params(self, deep=True): + return {} + + class TestRun(TestBase): @@ -45,39 +66,40 @@ def test_nonexisting_setup_exists(self): self.assertFalse(setup_id) def test_existing_setup_exists(self): - # first publish a nonexiting flow - - # because of the sentinel, we can not use flows that contain subflows - classif = DecisionTreeClassifier(max_depth=5, - min_samples_split=3, - # Not setting the random state will - # make this flow fail as running it - # will add a random random_state. - random_state=1) - flow = openml.flows.sklearn_to_flow(classif) - flow.name = 'TEST%s%s' % (get_sentinel(), flow.name) - - # Replace the flow by a flow in which the ID got set up correctly - flow = flow.publish() - flow = openml.flows.get_flow(flow.flow_id) - - # although the flow exists, we can be sure there are no - # setups (yet) as it hasn't been ran - setup_id = openml.setups.setup_exists(flow) - self.assertFalse(setup_id) - - # now run the flow on an easy task: - task = openml.tasks.get_task(115) # diabetes - run = openml.runs.run_flow_on_task(task, flow) - # spoof flow id, otherwise the sentinel is ignored - run.flow_id = flow.flow_id - run = run.publish() - # download the run, as it contains the right setup id - run = openml.runs.get_run(run.run_id) - - # execute the function we are interested in - setup_id = openml.setups.setup_exists(flow) - self.assertEquals(setup_id, run.setup_id) + clfs = [ParameterFreeClassifier(), # zero hyperparemeters + GaussianNB(), # one hyperparameter + DecisionTreeClassifier(max_depth=5, # many hyperparameters + min_samples_split=3, + # Not setting the random state will + # make this flow fail as running it + # will add a random random_state. + random_state=1)] + + for classif in clfs: + # first publish a nonexiting flow + flow = openml.flows.sklearn_to_flow(classif) + flow.name = 'TEST%s%s' % (get_sentinel(), flow.name) + flow.publish() + + # although the flow exists, we can be sure there are no + # setups (yet) as it hasn't been ran + setup_id = openml.setups.setup_exists(flow) + self.assertFalse(setup_id) + setup_id = openml.setups.setup_exists(flow, classif) + self.assertFalse(setup_id) + + # now run the flow on an easy task: + task = openml.tasks.get_task(115) # diabetes + run = openml.runs.run_flow_on_task(task, flow) + # spoof flow id, otherwise the sentinel is ignored + run.flow_id = flow.flow_id + run.publish() + # download the run, as it contains the right setup id + run = openml.runs.get_run(run.run_id) + + # execute the function we are interested in + setup_id = openml.setups.setup_exists(flow) + self.assertEquals(setup_id, run.setup_id) def test_get_setup(self): # no setups in default test server From c46f5b70bdde9bf0d897e3b05f4188566a65324d Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Wed, 17 May 2017 22:00:14 +0200 Subject: [PATCH 5/9] only check parameters of component if it is not older than the parent flow --- openml/flows/flow.py | 4 ++-- openml/flows/functions.py | 27 +++++++++++++++++++-- tests/test_flows/test_flow.py | 45 +++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 4 deletions(-) diff --git a/openml/flows/flow.py b/openml/flows/flow.py index 850fb16eb..1339bf125 100644 --- a/openml/flows/flow.py +++ b/openml/flows/flow.py @@ -349,15 +349,15 @@ def publish(self): return_value = _perform_api_call("flow/", file_elements=file_elements) flow_id = int(xmltodict.parse(return_value)['oml:upload_flow']['oml:id']) flow = openml.flows.functions.get_flow(flow_id) + _copy_server_fields(flow, self) try: - openml.flows.functions.assert_flows_equal(self, flow) + openml.flows.functions.assert_flows_equal(self, flow, flow.upload_date) except ValueError as e: message = e.args[0] raise ValueError("Flow was not stored correctly on the server. " "New flow ID is %d. Please check manually and " "remove the flow if necessary! Error is:\n'%s'" % (flow_id, message)) - _copy_server_fields(flow, self) return self diff --git a/openml/flows/functions.py b/openml/flows/functions.py index 993fbbcd1..a560f7f8b 100644 --- a/openml/flows/functions.py +++ b/openml/flows/functions.py @@ -1,3 +1,5 @@ +import dateutil.parser + import xmltodict import six @@ -143,11 +145,21 @@ def _check_flow_for_server_id(flow): stack.append(component) -def assert_flows_equal(flow1, flow2): +def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None): """Check equality of two flows. Two flows are equal if their all keys which are not set by the server are equal, as well as all their parameters and components. + + Parameters + ---------- + flow1 : OpenMLFlow + + flow2 : OpenMLFlow + + ignore_parameters_on_older_children : str + If set to ``OpenMLFlow.upload_date``, ignores parameters in a child + flow if it's upload date predates the upload date of the parent flow. """ if not isinstance(flow1, OpenMLFlow): raise TypeError('Argument 1 must be of type OpenMLFlow, but is %s' % @@ -157,6 +169,8 @@ def assert_flows_equal(flow1, flow2): raise TypeError('Argument 2 must be of type OpenMLFlow, but is %s' % type(flow2)) + # TODO as they are actually now saved during publish, it might be good to + # check for the equality of these as well. generated_by_the_server = ['flow_id', 'uploader', 'version', 'upload_date'] ignored_by_python_API = ['binary_url', 'binary_format', 'binary_md5', 'model'] @@ -174,9 +188,18 @@ def assert_flows_equal(flow1, flow2): if not name in attr2: raise ValueError('Component %s only available in ' 'argument2, but not in argument1.' % name) - assert_flows_equal(attr1[name], attr2[name]) + assert_flows_equal(attr1[name], attr2[name], ignore_parameters_on_older_children) else: + if key == 'parameters': + if ignore_parameters_on_older_children: + upload_date_current_flow = dateutil.parser.parse( + flow1.upload_date) + upload_date_parent_flow = dateutil.parser.parse( + ignore_parameters_on_older_children) + if upload_date_current_flow < upload_date_parent_flow: + continue + if attr1 != attr2: raise ValueError("Flow %s: values for attribute '%s' differ: " "'%s' vs '%s'." % diff --git a/tests/test_flows/test_flow.py b/tests/test_flows/test_flow.py index fdd032366..1b40d4f7e 100644 --- a/tests/test_flows/test_flow.py +++ b/tests/test_flows/test_flow.py @@ -28,6 +28,7 @@ from openml._api_calls import _perform_api_call import openml from openml.flows.sklearn_converter import _format_external_version +import openml.exceptions class TestFlow(TestBase): @@ -120,6 +121,50 @@ def test_publish_flow(self): flow.publish() self.assertIsInstance(flow.flow_id, int) + def test_publish_existing_flow(self): + clf = sklearn.tree.DecisionTreeClassifier(max_depth=2) + flow = openml.flows.sklearn_to_flow(clf) + flow, _ = self._add_sentinel_to_flow_name(flow, None) + flow.publish() + self.assertRaisesRegexp(openml.exceptions.OpenMLServerException, + 'flow already exists', flow.publish) + + def test_publish_flow_with_similar_components(self): + clf = sklearn.ensemble.VotingClassifier( + [('lr', sklearn.linear_model.LogisticRegression())]) + flow = openml.flows.sklearn_to_flow(clf) + flow, _ = self._add_sentinel_to_flow_name(flow, None) + flow.publish() + # For a flow where both components are published together, the upload + # date should be equal + self.assertEqual(flow.upload_date, + flow.components['lr'].upload_date, + (flow.name, flow.flow_id, + flow.components['lr'].name, flow.components['lr'].flow_id)) + + clf1 = sklearn.tree.DecisionTreeClassifier(max_depth=2) + flow1 = openml.flows.sklearn_to_flow(clf1) + flow1, sentinel = self._add_sentinel_to_flow_name(flow1, None) + flow1.publish() + + clf2 = sklearn.ensemble.VotingClassifier( + [('dt', sklearn.tree.DecisionTreeClassifier(max_depth=2))]) + flow2 = openml.flows.sklearn_to_flow(clf2) + flow2, _ = self._add_sentinel_to_flow_name(flow2, sentinel) + flow2.publish() + # If one component was published before the other, the components in + # the flow should have different upload dates + self.assertNotEqual(flow2.upload_date, + flow2.components['dt'].upload_date) + + clf3 = sklearn.ensemble.AdaBoostClassifier( + sklearn.tree.DecisionTreeClassifier(max_depth=3)) + flow3 = openml.flows.sklearn_to_flow(clf3) + flow3, _ = self._add_sentinel_to_flow_name(flow3, sentinel) + # Child flow has different parameter. Check for storing the flow + # correctly on the server should thus not check the child's parameters! + flow3.publish() + def test_semi_legal_flow(self): # TODO: Test if parameters are set correctly! # should not throw error as it contains two differentiable forms of Bagging From 00be4019b38f693d51d7f971a8820d6e3cdd3636 Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Wed, 17 May 2017 22:39:40 +0200 Subject: [PATCH 6/9] publish flow after running it --- openml/exceptions.py | 4 +-- openml/flows/functions.py | 8 ++++- openml/runs/functions.py | 45 ++++++++++++++++++--------- tests/test_runs/test_run_functions.py | 27 +++++++++++++++- 4 files changed, 65 insertions(+), 19 deletions(-) diff --git a/openml/exceptions.py b/openml/exceptions.py index f6eb75bd6..ae6f6be32 100644 --- a/openml/exceptions.py +++ b/openml/exceptions.py @@ -1,13 +1,14 @@ class PyOpenMLError(Exception): def __init__(self, message): + self.message = message super(PyOpenMLError, self).__init__(message) + class OpenMLServerError(PyOpenMLError): """class for when something is really wrong on the server (result did not parse to dict), contains unparsed error.""" def __init__(self, message): - message = "OpenML Server error: " + message super(OpenMLServerError, self).__init__(message) # @@ -18,7 +19,6 @@ class OpenMLServerException(OpenMLServerError): def __init__(self, code, message, additional=None): self.code = code self.additional = additional - message = "OpenML Server exception: " + message super(OpenMLServerException, self).__init__(message) diff --git a/openml/flows/functions.py b/openml/flows/functions.py index a560f7f8b..5ed44aaea 100644 --- a/openml/flows/functions.py +++ b/openml/flows/functions.py @@ -145,7 +145,8 @@ def _check_flow_for_server_id(flow): stack.append(component) -def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None): +def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None, + ignore_parameters=False): """Check equality of two flows. Two flows are equal if their all keys which are not set by the server @@ -160,6 +161,9 @@ def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None): ignore_parameters_on_older_children : str If set to ``OpenMLFlow.upload_date``, ignores parameters in a child flow if it's upload date predates the upload date of the parent flow. + + ignore_parameters : bool + Whether to ignore parameter values when comparing flows. """ if not isinstance(flow1, OpenMLFlow): raise TypeError('Argument 1 must be of type OpenMLFlow, but is %s' % @@ -199,6 +203,8 @@ def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None): ignore_parameters_on_older_children) if upload_date_current_flow < upload_date_parent_flow: continue + elif ignore_parameters: + continue if attr1 != attr2: raise ValueError("Flow %s: values for attribute '%s' differ: " diff --git a/openml/runs/functions.py b/openml/runs/functions.py index dcd1ded80..0f9cca047 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -11,6 +11,7 @@ import six import xmltodict +import openml from ..exceptions import PyOpenMLError from .. import config from ..flows import sklearn_to_flow, get_flow, flow_exists, _check_n_jobs, \ @@ -30,20 +31,6 @@ def run_model_on_task(task, model, avoid_duplicate_runs=True, flow_tags=None, seed=None): flow = sklearn_to_flow(model) - # returns flow id if the flow exists on the server, False otherwise - flow_id = flow_exists(flow.name, flow.external_version) - - if flow_id == False: - # TODO this is potential race condition! someone could upload the - # same flow in the meantime! - # means the flow did not exists. As we could run it, publish it now - flow = flow.publish() - else: - # flow already existed, download it from server - # TODO (neccessary? is this a post condition of this function) - flow_from_server = get_flow(flow_id) - _copy_server_fields(flow_from_server, flow) - return run_flow_on_task(task=task, flow=flow, avoid_duplicate_runs=avoid_duplicate_runs, flow_tags=flow_tags, seed=seed) @@ -82,6 +69,9 @@ def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None, # skips the run if it already exists and the user opts for this in the config file. # also, if the flow is not present on the server, the check is not needed. if avoid_duplicate_runs: + if flow.flow_id is None: + raise ValueError('Cannot check if a run exists if the ' + 'corresponding flow has not been published yet!') flow_from_server = get_flow(flow.flow_id) setup_id = setup_exists(flow_from_server) ids = _run_exists(task.task_id, setup_id) @@ -98,11 +88,17 @@ def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None, run_environment = _get_version_information() tags = ['openml-python', run_environment[1]] + # execute the run + res = _run_task_get_arffcontent(flow.model, task, class_labels) + + if flow.flow_id is None: + _publish_flow_if_necessary(flow) + run = OpenMLRun(task_id=task.task_id, flow_id=flow.flow_id, dataset_id=dataset.dataset_id, model=flow.model, tags=tags) run.parameter_settings = OpenMLRun._parse_parameters(flow) - res = _run_task_get_arffcontent(flow.model, task, class_labels) + run.data_content, run.trace_content, run.trace_attributes, run.detailed_evaluations = res config.logger.info('Executed Task %d with Flow id: %d' % (task.task_id, run.flow_id)) @@ -110,6 +106,25 @@ def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None, return run +def _publish_flow_if_necessary(flow): + # try publishing the flow if one has to assume it doesn't exist yet. It + # might fail because it already exists, then the flow is currently not + # reused + + try: + flow.publish() + except OpenMLServerException as e: + if e.message == "flow already exists": + flow_id = openml.flows.flow_exists(flow.name, + flow.external_version) + server_flow = get_flow(flow_id) + openml.flows.flow._copy_server_fields(server_flow, flow) + openml.flows.assert_flows_equal(flow, server_flow, + ignore_parameters=True) + else: + raise e + + def get_run_trace(run_id): """Get the optimization trace object for a given run id. diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index d7ab72de4..12cc99988 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -180,6 +180,17 @@ def _check_detailed_evaluations(self, detailed_evaluations, num_repeats, num_fol self.assertGreater(evaluation, 0) # should take at least one millisecond (?) self.assertLess(evaluation, max_time_allowed) + def test_avoid_duplicate_runs_with_unpublished_flow(self): + task_id = 115 + + clf = LogisticRegression() + flow = sklearn_to_flow(clf) + flow, _ = self._add_sentinel_to_flow_name(flow, None) + task = openml.tasks.get_task(task_id) + + self.assertRaisesRegexp(ValueError, 'Cannot check if a run exists if the corresponding flow has not been published yet!', + openml.runs.run_flow_on_task, task=task, + flow=flow, avoid_duplicate_runs=True) def test_run_regression_on_classif_task(self): task_id = 115 @@ -202,6 +213,21 @@ def test_check_erronous_sklearn_flow_fails(self): openml.runs.run_model_on_task, task=task, model=clf) + def test__publish_flow_if_necessary(self): + task_id = 115 + task = openml.tasks.get_task(task_id) + + clf = LogisticRegression() + flow = sklearn_to_flow(clf) + flow, sentinel = self._add_sentinel_to_flow_name(flow, None) + openml.runs.functions._publish_flow_if_necessary(flow) + self.assertIsNotNone(flow.flow_id) + + flow2 = sklearn_to_flow(clf) + flow2, _ = self._add_sentinel_to_flow_name(flow2, sentinel) + openml.runs.functions._publish_flow_if_necessary(flow2) + self.assertEqual(flow2.flow_id, flow.flow_id) + def test_run_and_upload(self): # This unit test is ment to test the following functions, using a varity of flows: # - openml.runs.run_task() @@ -277,7 +303,6 @@ def test_run_and_upload(self): self._check_detailed_evaluations(run.detailed_evaluations, 1, num_folds) pass - def test_initialize_model_from_run(self): clf = sklearn.pipeline.Pipeline(steps=[('Imputer', Imputer(strategy='median')), ('VarianceThreshold', VarianceThreshold(threshold=0.05)), From 57666fa3242cdf2479b7854feda09bfd611fde6e Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Thu, 18 May 2017 10:10:49 +0200 Subject: [PATCH 7/9] add comment on in-function import --- openml/flows/flow.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/openml/flows/flow.py b/openml/flows/flow.py index 1339bf125..f17d6aff8 100644 --- a/openml/flows/flow.py +++ b/openml/flows/flow.py @@ -341,6 +341,10 @@ def publish(self): self : OpenMLFlow """ + # Import at top not possible because of cyclic dependencies. In + # particular, flow.py tries to import functions.py in order to call + # get_flow(), while functions.py tries to import flow.py in order to + # instantiate an OpenMLFlow. import openml.flows.functions xml_description = self._to_xml() From 706489980005d851924652e22e9187026ca5f0d9 Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Thu, 18 May 2017 10:37:48 +0200 Subject: [PATCH 8/9] fix unit tests --- openml/flows/functions.py | 4 +++- openml/runs/functions.py | 8 +++----- tests/test_flows/test_flow.py | 3 +++ tests/test_runs/test_run_functions.py | 12 ------------ tests/test_setups/__init__.py | 3 +++ 5 files changed, 12 insertions(+), 18 deletions(-) create mode 100644 tests/test_setups/__init__.py diff --git a/openml/flows/functions.py b/openml/flows/functions.py index 5ed44aaea..7b3bd2d96 100644 --- a/openml/flows/functions.py +++ b/openml/flows/functions.py @@ -192,7 +192,9 @@ def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None, if not name in attr2: raise ValueError('Component %s only available in ' 'argument2, but not in argument1.' % name) - assert_flows_equal(attr1[name], attr2[name], ignore_parameters_on_older_children) + assert_flows_equal(attr1[name], attr2[name], + ignore_parameters_on_older_children, + ignore_parameters) else: if key == 'parameters': diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 0f9cca047..a46c03446 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -68,11 +68,9 @@ def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None, # skips the run if it already exists and the user opts for this in the config file. # also, if the flow is not present on the server, the check is not needed. - if avoid_duplicate_runs: - if flow.flow_id is None: - raise ValueError('Cannot check if a run exists if the ' - 'corresponding flow has not been published yet!') - flow_from_server = get_flow(flow.flow_id) + flow_id = flow_exists(flow.name, flow.external_version) + if avoid_duplicate_runs and flow_id: + flow_from_server = get_flow(flow_id) setup_id = setup_exists(flow_from_server) ids = _run_exists(task.task_id, setup_id) if ids: diff --git a/tests/test_flows/test_flow.py b/tests/test_flows/test_flow.py index 1b40d4f7e..f73de3a76 100644 --- a/tests/test_flows/test_flow.py +++ b/tests/test_flows/test_flow.py @@ -147,6 +147,9 @@ def test_publish_flow_with_similar_components(self): flow1, sentinel = self._add_sentinel_to_flow_name(flow1, None) flow1.publish() + # In order to assign different upload times to the flows! + time.sleep(1) + clf2 = sklearn.ensemble.VotingClassifier( [('dt', sklearn.tree.DecisionTreeClassifier(max_depth=2))]) flow2 = openml.flows.sklearn_to_flow(clf2) diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index 12cc99988..f3530a9f0 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -180,18 +180,6 @@ def _check_detailed_evaluations(self, detailed_evaluations, num_repeats, num_fol self.assertGreater(evaluation, 0) # should take at least one millisecond (?) self.assertLess(evaluation, max_time_allowed) - def test_avoid_duplicate_runs_with_unpublished_flow(self): - task_id = 115 - - clf = LogisticRegression() - flow = sklearn_to_flow(clf) - flow, _ = self._add_sentinel_to_flow_name(flow, None) - task = openml.tasks.get_task(task_id) - - self.assertRaisesRegexp(ValueError, 'Cannot check if a run exists if the corresponding flow has not been published yet!', - openml.runs.run_flow_on_task, task=task, - flow=flow, avoid_duplicate_runs=True) - def test_run_regression_on_classif_task(self): task_id = 115 diff --git a/tests/test_setups/__init__.py b/tests/test_setups/__init__.py new file mode 100644 index 000000000..d6b0c7b1a --- /dev/null +++ b/tests/test_setups/__init__.py @@ -0,0 +1,3 @@ +# Dummy to allow mock classes in the test files to have a version number for +# their parent module +__version__ = '0.1' \ No newline at end of file From faf5b261407d86638dd598faa1ef11e6c67fd229 Mon Sep 17 00:00:00 2001 From: Matthias Feurer Date: Thu, 18 May 2017 14:41:44 +0200 Subject: [PATCH 9/9] work on comments from Jan --- openml/flows/flow.py | 9 +++++++-- openml/runs/functions.py | 20 ++++++++++++++------ openml/runs/run.py | 27 ++++++++++++++++++++++++++- openml/setups/functions.py | 6 ++++++ tests/test_runs/test_run_functions.py | 15 +++++++-------- 5 files changed, 60 insertions(+), 17 deletions(-) diff --git a/openml/flows/flow.py b/openml/flows/flow.py index f17d6aff8..f57b03a61 100644 --- a/openml/flows/flow.py +++ b/openml/flows/flow.py @@ -324,10 +324,15 @@ def _from_dict(cls, xml_dict): arguments['model'] = None flow = cls(**arguments) - if arguments['external_version'].startswith('sklearn'): + # try to parse to a model because not everything that can be + # deserialized has to come from scikit-learn. If it can't be + # serialized, but comes from scikit-learn this is worth an exception + try: from .sklearn_converter import flow_to_sklearn model = flow_to_sklearn(flow) - else: + except Exception as e: + if arguments['external_version'].startswith('sklearn'): + raise e model = None flow.model = model diff --git a/openml/runs/functions.py b/openml/runs/functions.py index a46c03446..9b33a5ec2 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -29,6 +29,8 @@ def run_model_on_task(task, model, avoid_duplicate_runs=True, flow_tags=None, seed=None): + """See ``run_flow_on_task for a documentation.""" + flow = sklearn_to_flow(model) return run_flow_on_task(task=task, flow=flow, @@ -38,23 +40,29 @@ def run_model_on_task(task, model, avoid_duplicate_runs=True, flow_tags=None, def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None, seed=None): - """Performs a CV run on the dataset of the given task, using the split. + """Run the model provided by the flow on the dataset defined by task. + + Takes the flow and repeat information into account. In case a flow is not + yet published, it is published after executing the run (requires + internet connection). Parameters ---------- task : OpenMLTask Task to perform. model : sklearn model - a model which has a function fit(X,Y) and predict(X), + A model which has a function fit(X,Y) and predict(X), all supervised estimators of scikit learn follow this definition of a model [1] [1](http://scikit-learn.org/stable/tutorial/statistical_inference/supervised_learning.html) avoid_duplicate_runs : bool - if this flag is set to True, the run will throw an error if the - setup/task combination is already present on the server. + If this flag is set to True, the run will throw an error if the + setup/task combination is already present on the server. Works only + if the flow is already published on the server. This feature requires an + internet connection. flow_tags : list(str) - a list of tags that the flow should have at creation + A list of tags that the flow should have at creation. seed: int - the models that are not seeded will get this seed + Models that are not seeded will get this seed. Returns ------- diff --git a/openml/runs/run.py b/openml/runs/run.py index 430537d84..70077b856 100644 --- a/openml/runs/run.py +++ b/openml/runs/run.py @@ -190,6 +190,17 @@ def extract_parameters(_flow, _flow_dict, component_model, # _flow is openml flow object, _param dict maps from flow name to flow id # for the main call, the param dict can be overridden (useful for unit tests / sentinels) # this way, for flows without subflows we do not have to rely on _flow_dict + expected_parameters = set(_flow.parameters) + expected_components = set(_flow.components) + model_parameters = set([mp for mp in component_model.get_params() + if '__' not in mp]) + if len((expected_parameters | expected_components) ^ model_parameters) != 0: + raise ValueError('Parameters of the model do not match the ' + 'parameters expected by the ' + 'flow:\nexpected flow parameters: ' + '%s\nmodel parameters: %s' % ( + sorted(expected_parameters| expected_components), sorted(model_parameters))) + _params = [] for _param_name in _flow.parameters: _current = OrderedDict() @@ -198,7 +209,9 @@ def extract_parameters(_flow, _flow_dict, component_model, _tmp = openml.flows.sklearn_to_flow( component_model.get_params()[_param_name]) - # Try to filter out components which are handled further down! + # Try to filter out components (a.k.a. subflows) which are + # handled further down in the code (by recursively calling + # this function)! if isinstance(_tmp, openml.flows.OpenMLFlow): continue try: @@ -210,7 +223,19 @@ def extract_parameters(_flow, _flow_dict, component_model, # Object of type 'OpenMLFlow' is not JSON serializable if 'OpenMLFlow' in e.args[0] and \ 'is not JSON serializable' in e.args[0]: + # Additional check that the parameter that could not + # be parsed is actually a list/tuple which is used + # inside a feature union or pipeline + if not isinstance(_tmp, (list, tuple)): + raise e + for step_name, step in _tmp: + if isinstance(step_name, openml.flows.OpenMLFlow): + raise e + elif not isinstance(step, openml.flows.OpenMLFlow): + raise e continue + else: + raise e _current['oml:value'] = _tmp if _main_call: diff --git a/openml/setups/functions.py b/openml/setups/functions.py index 4eb2610eb..b8f80ad67 100644 --- a/openml/setups/functions.py +++ b/openml/setups/functions.py @@ -110,8 +110,14 @@ def _reconstruct_flow(_flow, _params): # dicts, mapping from flow id to param name to param value # (obtained by using the subfunction _to_dict_of_dicts) for _param in _flow.parameters: + # It can happen that no parameters of a flow are in a setup, + # then the flow_id is not in _params; usually happens for a + # sklearn.pipeline.Pipeline object, where the steps parameter is + # not in the setup if _flow.flow_id not in _params: continue + # It is not guaranteed that a setup on OpenML has all parameter + # settings of a flow, thus a param must not be in _params! if _param not in _params[_flow.flow_id]: continue _flow.parameters[_param] = _params[_flow.flow_id][_param] diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index f3530a9f0..41ebe16bd 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -229,16 +229,16 @@ def test_run_and_upload(self): num_iterations = 5 # for base search classifiers clfs = [] - random_state_values = [] + random_state_fixtures = [] lr = LogisticRegression() clfs.append(lr) - random_state_values.append('62501') + random_state_fixtures.append('62501') pipeline1 = Pipeline(steps=[('scaler', StandardScaler(with_mean=False)), ('dummy', DummyClassifier(strategy='prior'))]) clfs.append(pipeline1) - random_state_values.append('62501') + random_state_fixtures.append('62501') pipeline2 = Pipeline(steps=[('Imputer', Imputer(strategy='median')), ('VarianceThreshold', VarianceThreshold()), @@ -248,13 +248,13 @@ def test_run_and_upload(self): 'min_samples_leaf': [2 ** x for x in range(0, 6 + 1)]}, cv=3, n_iter=10))]) clfs.append(pipeline2) - random_state_values.append('62501') + random_state_fixtures.append('62501') gridsearch = GridSearchCV(BaggingClassifier(base_estimator=SVC()), {"base_estimator__C": [0.01, 0.1, 10], "base_estimator__gamma": [0.01, 0.1, 10]}) clfs.append(gridsearch) - random_state_values.append('62501') + random_state_fixtures.append('62501') randomsearch = RandomizedSearchCV( RandomForestClassifier(n_estimators=5), @@ -271,9 +271,9 @@ def test_run_and_upload(self): # The random states for the RandomizedSearchCV is set after the # random state of the RandomForestClassifier is set, therefore, # it has a different value than the other examples before - random_state_values.append('33003') + random_state_fixtures.append('33003') - for clf, rsv in zip(clfs, random_state_values): + for clf, rsv in zip(clfs, random_state_fixtures): run = self._perform_run(task_id, num_test_instances, clf, random_state_value=rsv) if isinstance(clf, BaseSearchCV): @@ -311,7 +311,6 @@ def test_initialize_model_from_run(self): self.assertEquals(flowS.components['Imputer'].parameters['strategy'], '"median"') self.assertEquals(flowS.components['VarianceThreshold'].parameters['threshold'], '0.05') - pass def test_get_run_trace(self): # get_run_trace is already tested implicitly in test_run_and_publish