diff --git a/openml/_api_calls.py b/openml/_api_calls.py index e059b06db..803dc6b42 100644 --- a/openml/_api_calls.py +++ b/openml/_api_calls.py @@ -80,7 +80,7 @@ def _read_url_files(url, data=None, file_elements=None): files=file_elements, ) if response.status_code != 200: - raise _parse_server_exception(response, url=url) + raise _parse_server_exception(response, url) if 'Content-Encoding' not in response.headers or \ response.headers['Content-Encoding'] != 'gzip': warnings.warn('Received uncompressed content from OpenML for {}.' @@ -95,7 +95,7 @@ def _read_url(url, request_method, data=None): response = send_request(request_method=request_method, url=url, data=data) if response.status_code != 200: - raise _parse_server_exception(response, url=url) + raise _parse_server_exception(response, url) if 'Content-Encoding' not in response.headers or \ response.headers['Content-Encoding'] != 'gzip': warnings.warn('Received uncompressed content from OpenML for {}.' @@ -137,15 +137,15 @@ def send_request( return response -def _parse_server_exception(response, url=None): +def _parse_server_exception(response, url): # OpenML has a sophisticated error system # where information about failures is provided. try to parse this try: server_exception = xmltodict.parse(response.text) except Exception: raise OpenMLServerError( - 'Unexpected server error. Please contact the developers!\n' - 'Status code: {}\n{}'.format(response.status_code, response.text)) + 'Unexpected server error when calling {}. Please contact the developers!\n' + 'Status code: {}\n{}'.format(url, response.status_code, response.text)) server_error = server_exception['oml:error'] code = int(server_error['oml:code']) diff --git a/openml/extensions/extension_interface.py b/openml/extensions/extension_interface.py index 0719ea574..6346cb0bf 100644 --- a/openml/extensions/extension_interface.py +++ b/openml/extensions/extension_interface.py @@ -1,12 +1,15 @@ from abc import ABC, abstractmethod from collections import OrderedDict # noqa: F401 -from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING +from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, Union + +import numpy as np +import scipy.sparse # Avoid import cycles: https://mypy.readthedocs.io/en/latest/common_issues.html#import-cycles if TYPE_CHECKING: from openml.flows import OpenMLFlow from openml.tasks.task import OpenMLTask - from openml.runs.trace import OpenMLRunTrace, OpenMLTraceIteration + from openml.runs.trace import OpenMLRunTrace, OpenMLTraceIteration # noqa F401 class Extension(ABC): @@ -147,15 +150,16 @@ def _run_model_on_fold( self, model: Any, task: 'OpenMLTask', + X_train: Union[np.ndarray, scipy.sparse.spmatrix], rep_no: int, fold_no: int, - sample_no: int, - add_local_measures: bool, - ) -> Tuple[List[List], List[List], 'OrderedDict[str, float]', Any]: + y_train: Optional[np.ndarray] = None, + X_test: Optional[Union[np.ndarray, scipy.sparse.spmatrix]] = None, + ) -> Tuple[np.ndarray, np.ndarray, 'OrderedDict[str, float]', Optional['OpenMLRunTrace']]: """Run a model on a repeat,fold,subsample triplet of the task and return prediction information. Returns the data that is necessary to construct the OpenML Run object. Is used by - run_task_get_arff_content. + :func:`openml.runs.run_flow_on_task`. Parameters ---------- @@ -163,31 +167,29 @@ def _run_model_on_fold( The UNTRAINED model to run. The model instance will be copied and not altered. task : OpenMLTask The task to run the model on. + X_train : array-like + Training data for the given repetition and fold. rep_no : int The repeat of the experiment (0-based; in case of 1 time CV, always 0) fold_no : int The fold nr of the experiment (0-based; in case of holdout, always 0) - sample_no : int - In case of learning curves, the index of the subsample (0-based; in case of no - learning curve, always 0) - add_local_measures : bool - Determines whether to calculate a set of measures (i.e., predictive accuracy) locally, - to later verify server behaviour. + y_train : Optional[np.ndarray] (default=None) + Target attributes for supervised tasks. In case of classification, these are integer + indices to the potential classes specified by dataset. + X_test : Optional, array-like (default=None) + Test attributes to test for generalization in supervised tasks. Returns ------- - arff_datacontent : List[List] - Arff representation (list of lists) of the predictions that were - generated by this fold (required to populate predictions.arff) - arff_tracecontent : List[List] - Arff representation (list of lists) of the trace data that was generated by this fold - (will be used to populate trace.arff, leave it empty if the model did not perform any - hyperparameter optimization). + predictions : np.ndarray + Model predictions. + probabilities : Optional, np.ndarray + Predicted probabilities (only applicable for supervised classification tasks). user_defined_measures : OrderedDict[str, float] User defined measures that were generated on this fold - model : Any - The model trained on this repeat,fold,subsample triple. Will be used to generate trace - information later on (in ``obtain_arff_trace``). + trace : Optional, OpenMLRunTrace + Hyperparameter optimization trace (only applicable for supervised tasks with + hyperparameter optimization). """ @abstractmethod @@ -222,21 +224,6 @@ def obtain_parameter_values( ################################################################################################ # Abstract methods for hyperparameter optimization - def is_hpo_class(self, model: Any) -> bool: - """Check whether the model performs hyperparameter optimization. - - Used to check whether an optimization trace can be extracted from the model after running - it. - - Parameters - ---------- - model : Any - - Returns - ------- - bool - """ - @abstractmethod def instantiate_model_from_hpo_class( self, @@ -258,25 +245,3 @@ def instantiate_model_from_hpo_class( Any """ # TODO a trace belongs to a run and therefore a flow -> simplify this part of the interface! - - @abstractmethod - def obtain_arff_trace( - self, - model: Any, - trace_content: List[List], - ) -> 'OpenMLRunTrace': - """Create arff trace object from a fitted model and the trace content obtained by - repeatedly calling ``run_model_on_task``. - - Parameters - ---------- - model : Any - A fitted hyperparameter optimization model. - - trace_content : List[List] - Trace content obtained by ``openml.runs.run_flow_on_task``. - - Returns - ------- - OpenMLRunTrace - """ diff --git a/openml/extensions/sklearn/extension.py b/openml/extensions/sklearn/extension.py index 78263098c..f098a8f4e 100644 --- a/openml/extensions/sklearn/extension.py +++ b/openml/extensions/sklearn/extension.py @@ -12,7 +12,9 @@ import warnings import numpy as np +import pandas as pd import scipy.stats +import scipy.sparse import sklearn.base import sklearn.model_selection import sklearn.pipeline @@ -93,7 +95,7 @@ def flow_to_model(self, flow: 'OpenMLFlow', initialize_with_defaults: bool = Fal Parameters ---------- - o : mixed + flow : mixed the object to deserialize (can be flow object, or any serialized parameter value that is accepted by) @@ -468,7 +470,7 @@ def _check_multiple_occurence_of_component_in_flow( ) -> None: to_visit_stack = [] # type: List[OpenMLFlow] to_visit_stack.extend(sub_components.values()) - known_sub_components = set() # type: Set[OpenMLFlow] + known_sub_components = set() # type: Set[str] while len(to_visit_stack) > 0: visitee = to_visit_stack.pop() if visitee.name in known_sub_components: @@ -935,7 +937,7 @@ def _prevent_optimize_n_jobs(self, model): model: The model that will be fitted """ - if self.is_hpo_class(model): + if self._is_hpo_class(model): if isinstance(model, sklearn.model_selection.GridSearchCV): param_distributions = model.param_grid elif isinstance(model, sklearn.model_selection.RandomizedSearchCV): @@ -973,7 +975,7 @@ def _can_measure_cputime(self, model: Any) -> bool: True if all n_jobs parameters will be either set to None or 1, False otherwise """ if not ( - isinstance(model, sklearn.base.BaseEstimator) or self.is_hpo_class(model) + isinstance(model, sklearn.base.BaseEstimator) or self._is_hpo_class(model) ): raise ValueError('model should be BaseEstimator or BaseSearchCV') @@ -1000,7 +1002,7 @@ def _can_measure_wallclocktime(self, model: Any) -> bool: True if no n_jobs parameters is set to -1, False otherwise """ if not ( - isinstance(model, sklearn.base.BaseEstimator) or self.is_hpo_class(model) + isinstance(model, sklearn.base.BaseEstimator) or self._is_hpo_class(model) ): raise ValueError('model should be BaseEstimator or BaseSearchCV') @@ -1096,11 +1098,12 @@ def _run_model_on_fold( self, model: Any, task: 'OpenMLTask', + X_train: Union[np.ndarray, scipy.sparse.spmatrix, pd.DataFrame], rep_no: int, fold_no: int, - sample_no: int, - add_local_measures: bool, - ) -> Tuple[List[List], List[List], 'OrderedDict[str, float]', Any]: + y_train: Optional[np.ndarray] = None, + X_test: Optional[Union[np.ndarray, scipy.sparse.spmatrix, pd.DataFrame]] = None, + ) -> Tuple[np.ndarray, np.ndarray, 'OrderedDict[str, float]', Optional[OpenMLRunTrace]]: """Run a model on a repeat,fold,subsample triplet of the task and return prediction information. @@ -1119,17 +1122,17 @@ def _run_model_on_fold( The UNTRAINED model to run. The model instance will be copied and not altered. task : OpenMLTask The task to run the model on. + X_train : array-like + Training data for the given repetition and fold. rep_no : int The repeat of the experiment (0-based; in case of 1 time CV, always 0) fold_no : int The fold nr of the experiment (0-based; in case of holdout, always 0) - sample_no : int - In case of learning curves, the index of the subsample (0-based; in case of no - learning curve, always 0) - add_local_measures : bool - Determines whether to calculate a set of measures (i.e., predictive accuracy) - locally, - to later verify server behaviour. + y_train : Optional[np.ndarray] (default=None) + Target attributes for supervised tasks. In case of classification, these are integer + indices to the potential classes specified by dataset. + X_test : Optional, array-like (default=None) + Test attributes to test for generalization in supervised tasks. Returns ------- @@ -1150,10 +1153,7 @@ def _run_model_on_fold( information later on (in ``obtain_arff_trace``). """ - def _prediction_to_probabilities( - y: np.ndarray, - model_classes: List, - ) -> np.ndarray: + def _prediction_to_probabilities(y: np.ndarray, classes: List[Any]) -> np.ndarray: """Transforms predicted probabilities to match with OpenML class indices. Parameters @@ -1171,15 +1171,20 @@ def _prediction_to_probabilities( # y: list or numpy array of predictions # model_classes: sklearn classifier mapping from original array id to # prediction index id - if not isinstance(model_classes, list): + if not isinstance(classes, list): raise ValueError('please convert model classes to list prior to ' 'calling this fn') - result = np.zeros((len(y), len(model_classes)), dtype=np.float32) + result = np.zeros((len(y), len(classes)), dtype=np.float32) for obs, prediction_idx in enumerate(y): - array_idx = model_classes.index(prediction_idx) - result[obs][array_idx] = 1.0 + result[obs][prediction_idx] = 1.0 return result + if isinstance(task, OpenMLSupervisedTask): + if y_train is None: + raise TypeError('argument y_train must not be of type None') + if X_test is None: + raise TypeError('argument X_test must not be of type None') + # TODO: if possible, give a warning if model is already fitted (acceptable # in case of custom experimentation, # but not desirable if we want to upload to OpenML). @@ -1191,20 +1196,6 @@ def _prediction_to_probabilities( can_measure_cputime = self._can_measure_cputime(model_copy) can_measure_wallclocktime = self._can_measure_wallclocktime(model_copy) - train_indices, test_indices = task.get_train_test_split_indices( - repeat=rep_no, fold=fold_no, sample=sample_no) - if isinstance(task, OpenMLSupervisedTask): - x, y = task.get_X_and_y() - train_x = x[train_indices] - train_y = y[train_indices] - test_x = x[test_indices] - test_y = y[test_indices] - elif isinstance(task, OpenMLClusteringTask): - train_x = train_indices - test_x = test_indices - else: - raise NotImplementedError(task.task_type) - user_defined_measures = OrderedDict() # type: 'OrderedDict[str, float]' try: @@ -1213,9 +1204,9 @@ def _prediction_to_probabilities( modelfit_start_walltime = time.time() if isinstance(task, OpenMLSupervisedTask): - model_copy.fit(train_x, train_y) + model_copy.fit(X_train, y_train) elif isinstance(task, OpenMLClusteringTask): - model_copy.fit(train_x) + model_copy.fit(X_train) modelfit_dur_cputime = (time.process_time() - modelfit_start_cputime) * 1000 if can_measure_cputime: @@ -1229,11 +1220,6 @@ def _prediction_to_probabilities( # typically happens when training a regressor on classification task raise PyOpenMLError(str(e)) - # extract trace, if applicable - arff_tracecontent = [] # type: List[List] - if self.is_hpo_class(model_copy): - arff_tracecontent.extend(self._extract_trace_data(model_copy, rep_no, fold_no)) - if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)): # search for model classes_ (might differ depending on modeltype) # first, pipelines are a special case (these don't have a classes_ @@ -1244,7 +1230,7 @@ def _prediction_to_probabilities( else: used_estimator = model_copy - if self.is_hpo_class(used_estimator): + if self._is_hpo_class(used_estimator): model_classes = used_estimator.best_estimator_.classes_ else: model_classes = used_estimator.classes_ @@ -1254,7 +1240,12 @@ def _prediction_to_probabilities( # In supervised learning this returns the predictions for Y, in clustering # it returns the clusters - pred_y = model_copy.predict(test_x) + if isinstance(task, OpenMLSupervisedTask): + pred_y = model_copy.predict(X_test) + elif isinstance(task, OpenMLClusteringTask): + pred_y = model_copy.predict(X_train) + else: + raise ValueError(task) if can_measure_cputime: modelpredict_duration_cputime = (time.process_time() @@ -1268,154 +1259,51 @@ def _prediction_to_probabilities( user_defined_measures['wall_clock_time_millis'] = (modelfit_dur_walltime + modelpredict_duration_walltime) - # add client-side calculated metrics. These is used on the server as - # consistency check, only useful for supervised tasks - def _calculate_local_measure(sklearn_fn, openml_name): - user_defined_measures[openml_name] = sklearn_fn(test_y, pred_y) - - # Task type specific outputs - arff_datacontent = [] - if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)): try: - proba_y = model_copy.predict_proba(test_x) + proba_y = model_copy.predict_proba(X_test) except AttributeError: - proba_y = _prediction_to_probabilities(pred_y, list(model_classes)) + proba_y = _prediction_to_probabilities(pred_y, list(task.class_labels)) if proba_y.shape[1] != len(task.class_labels): - warnings.warn( - "Repeat %d Fold %d: estimator only predicted for %d/%d classes!" - % (rep_no, fold_no, proba_y.shape[1], len(task.class_labels)) - ) + # Remap the probabilities in case there was a class missing at training time + # By default, the classification targets are mapped to be zero-based indices to the + # actual classes. Therefore, the model_classes contain the correct indices to the + # correct probability array. Example: + # classes in the dataset: 0, 1, 2, 3, 4, 5 + # classes in the training set: 0, 1, 2, 4, 5 + # then we need to add a column full of zeros into the probabilities for class 3 + # (because the rest of the library expects that the probabilities are ordered the + # same way as the classes are ordered). + proba_y_new = np.zeros((proba_y.shape[0], len(task.class_labels))) + for idx, model_class in enumerate(model_classes): + proba_y_new[:, model_class] = proba_y[:, idx] + proba_y = proba_y_new - if add_local_measures: - _calculate_local_measure(sklearn.metrics.accuracy_score, - 'predictive_accuracy') - - for i in range(0, len(test_indices)): - arff_line = self._prediction_to_row( - rep_no=rep_no, - fold_no=fold_no, - sample_no=sample_no, - row_id=test_indices[i], - correct_label=task.class_labels[test_y[i]], - predicted_label=pred_y[i], - predicted_probabilities=proba_y[i], - class_labels=task.class_labels, - model_classes_mapping=model_classes, + if proba_y.shape[1] != len(task.class_labels): + message = "Estimator only predicted for {}/{} classes!".format( + proba_y.shape[1], len(task.class_labels), ) - arff_datacontent.append(arff_line) + warnings.warn(message) + openml.config.logger.warn(message) elif isinstance(task, OpenMLRegressionTask): - if add_local_measures: - _calculate_local_measure( - sklearn.metrics.mean_absolute_error, - 'mean_absolute_error', - ) - - for i in range(0, len(test_indices)): - arff_line = [rep_no, fold_no, test_indices[i], pred_y[i], test_y[i]] - arff_datacontent.append(arff_line) + proba_y = None elif isinstance(task, OpenMLClusteringTask): - for i in range(0, len(test_indices)): - arff_line = [test_indices[i], pred_y[i]] # row_id, cluster ID - arff_datacontent.append(arff_line) + proba_y = None else: raise TypeError(type(task)) - return arff_datacontent, arff_tracecontent, user_defined_measures, model_copy - - def _prediction_to_row( - self, - rep_no: int, - fold_no: int, - sample_no: int, - row_id: int, - correct_label: str, - predicted_label: int, - predicted_probabilities: np.ndarray, - class_labels: List, - model_classes_mapping: List, - ) -> List: - """Util function that turns probability estimates of a classifier for a - given instance into the right arff format to upload to openml. - - Parameters - ---------- - rep_no : int - The repeat of the experiment (0-based; in case of 1 time CV, - always 0) - fold_no : int - The fold nr of the experiment (0-based; in case of holdout, - always 0) - sample_no : int - In case of learning curves, the index of the subsample (0-based; - in case of no learning curve, always 0) - row_id : int - row id in the initial dataset - correct_label : str - original label of the instance - predicted_label : str - the label that was predicted - predicted_probabilities : array (size=num_classes) - probabilities per class - class_labels : array (size=num_classes) - model_classes_mapping : list - A list of classes the model produced. - Obtained by BaseEstimator.classes_ - - Returns - ------- - arff_line : list - representation of the current prediction in OpenML format - """ - if not isinstance(rep_no, (int, np.integer)): - raise ValueError('rep_no should be int') - if not isinstance(fold_no, (int, np.integer)): - raise ValueError('fold_no should be int') - if not isinstance(sample_no, (int, np.integer)): - raise ValueError('sample_no should be int') - if not isinstance(row_id, (int, np.integer)): - raise ValueError('row_id should be int') - if not len(predicted_probabilities) == len(model_classes_mapping): - raise ValueError('len(predicted_probabilities) != len(class_labels)') - - arff_line = [rep_no, fold_no, sample_no, row_id] # type: List[Any] - for class_label_idx in range(len(class_labels)): - if class_label_idx in model_classes_mapping: - index = np.where(model_classes_mapping == class_label_idx)[0][0] - # TODO: WHY IS THIS 2D??? - arff_line.append(predicted_probabilities[index]) - else: - arff_line.append(0.0) - - arff_line.append(class_labels[predicted_label]) - arff_line.append(correct_label) - return arff_line + if self._is_hpo_class(model_copy): + trace_data = self._extract_trace_data(model_copy, rep_no, fold_no) + trace = self._obtain_arff_trace(model_copy, trace_data) # type: Optional[OpenMLRunTrace] # noqa E501 + else: + trace = None - def _extract_trace_data(self, model, rep_no, fold_no): - arff_tracecontent = [] - for itt_no in range(0, len(model.cv_results_['mean_test_score'])): - # we use the string values for True and False, as it is defined in - # this way by the OpenML server - selected = 'false' - if itt_no == model.best_index_: - selected = 'true' - test_score = model.cv_results_['mean_test_score'][itt_no] - arff_line = [rep_no, fold_no, itt_no, test_score, selected] - for key in model.cv_results_: - if key.startswith('param_'): - value = model.cv_results_[key][itt_no] - if value is not np.ma.masked: - serialized_value = json.dumps(value) - else: - serialized_value = np.nan - arff_line.append(serialized_value) - arff_tracecontent.append(arff_line) - return arff_tracecontent + return pred_y, proba_y, user_defined_measures, trace def obtain_parameter_values( self, @@ -1594,7 +1482,7 @@ def _openml_param_name_to_sklearn( ################################################################################################ # Methods for hyperparameter optimization - def is_hpo_class(self, model: Any) -> bool: + def _is_hpo_class(self, model: Any) -> bool: """Check whether the model performs hyperparameter optimization. Used to check whether an optimization trace can be extracted from the model after @@ -1629,7 +1517,7 @@ def instantiate_model_from_hpo_class( ------- Any """ - if not self.is_hpo_class(model): + if not self._is_hpo_class(model): raise AssertionError( 'Flow model %s is not an instance of sklearn.model_selection._search.BaseSearchCV' % model @@ -1638,7 +1526,28 @@ def instantiate_model_from_hpo_class( base_estimator.set_params(**trace_iteration.get_parameters()) return base_estimator - def obtain_arff_trace( + def _extract_trace_data(self, model, rep_no, fold_no): + arff_tracecontent = [] + for itt_no in range(0, len(model.cv_results_['mean_test_score'])): + # we use the string values for True and False, as it is defined in + # this way by the OpenML server + selected = 'false' + if itt_no == model.best_index_: + selected = 'true' + test_score = model.cv_results_['mean_test_score'][itt_no] + arff_line = [rep_no, fold_no, itt_no, test_score, selected] + for key in model.cv_results_: + if key.startswith('param_'): + value = model.cv_results_[key][itt_no] + if value is not np.ma.masked: + serialized_value = json.dumps(value) + else: + serialized_value = np.nan + arff_line.append(serialized_value) + arff_tracecontent.append(arff_line) + return arff_tracecontent + + def _obtain_arff_trace( self, model: Any, trace_content: List, @@ -1658,7 +1567,7 @@ def obtain_arff_trace( ------- OpenMLRunTrace """ - if not self.is_hpo_class(model): + if not self._is_hpo_class(model): raise AssertionError( 'Flow model %s is not an instance of sklearn.model_selection._search.BaseSearchCV' % model diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 6e89e40e1..df73c701d 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -1,9 +1,11 @@ from collections import OrderedDict import io +import itertools import os from typing import Any, List, Optional, Set, Tuple, Union, TYPE_CHECKING # noqa F401 import warnings +import sklearn.metrics import xmltodict import openml @@ -16,7 +18,8 @@ from ..flows import get_flow, flow_exists, OpenMLFlow from ..setups import setup_exists, initialize_model from ..exceptions import OpenMLCacheException, OpenMLServerException, OpenMLRunsExistError -from ..tasks import OpenMLTask +from ..tasks import OpenMLTask, OpenMLClassificationTask, OpenMLClusteringTask, \ + OpenMLRegressionTask, OpenMLSupervisedTask, OpenMLLearningCurveTask from .run import OpenMLRun from .trace import OpenMLRunTrace from ..tasks import TaskTypeEnum @@ -206,6 +209,7 @@ def run_flow_on_task( # execute the run res = _run_task_get_arffcontent( + flow=flow, model=flow.model, task=task, extension=flow.extension, @@ -366,6 +370,7 @@ def run_exists(task_id: int, setup_id: int) -> Set[int]: def _run_task_get_arffcontent( + flow: OpenMLFlow, model: Any, task: OpenMLTask, extension: 'Extension', @@ -377,7 +382,7 @@ def _run_task_get_arffcontent( 'OrderedDict[str, OrderedDict]', ]: arff_datacontent = [] # type: List[List] - arff_tracecontent = [] # type: List[List] + traces = [] # type: List[OpenMLRunTrace] # stores fold-based evaluation measures. In case of a sample based task, # this information is multiple times overwritten, but due to the ordering # of tne loops, eventually it contains the information based on the full @@ -392,50 +397,123 @@ def _run_task_get_arffcontent( # methods, less maintenance, less confusion) num_reps, num_folds, num_samples = task.get_split_dimensions() - for rep_no in range(num_reps): - for fold_no in range(num_folds): - for sample_no in range(num_samples): - ( - arff_datacontent_fold, - arff_tracecontent_fold, - user_defined_measures_fold, - model_fold, - ) = extension._run_model_on_fold( - model=model, - task=task, - rep_no=rep_no, - fold_no=fold_no, - sample_no=sample_no, - add_local_measures=add_local_measures, + for n_fit, (rep_no, fold_no, sample_no) in enumerate(itertools.product( + range(num_reps), + range(num_folds), + range(num_samples), + ), start=1): + + train_indices, test_indices = task.get_train_test_split_indices( + repeat=rep_no, fold=fold_no, sample=sample_no) + if isinstance(task, OpenMLSupervisedTask): + x, y = task.get_X_and_y(dataset_format='array') + train_x = x[train_indices] + train_y = y[train_indices] + test_x = x[test_indices] + test_y = y[test_indices] + elif isinstance(task, OpenMLClusteringTask): + x = task.get_X(dataset_format='array') + train_x = x[train_indices] + train_y = None + test_x = None + test_y = None + else: + raise NotImplementedError(task.task_type) + + config.logger.info( + "Going to execute flow '%s' on task %d for repeat %d fold %d sample %d.", + flow.name, task.task_id, rep_no, fold_no, sample_no, + ) + + ( + pred_y, + proba_y, + user_defined_measures_fold, + trace, + ) = extension._run_model_on_fold( + model=model, + task=task, + X_train=train_x, + y_train=train_y, + rep_no=rep_no, + fold_no=fold_no, + X_test=test_x, + ) + if trace is not None: + traces.append(trace) + + # add client-side calculated metrics. These is used on the server as + # consistency check, only useful for supervised tasks + def _calculate_local_measure(sklearn_fn, openml_name): + user_defined_measures_fold[openml_name] = sklearn_fn(test_y, pred_y) + + if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)): + + for i, tst_idx in enumerate(test_indices): + + arff_line = [rep_no, fold_no, sample_no, tst_idx] # type: List[Any] + for j, class_label in enumerate(task.class_labels): + arff_line.append(proba_y[i][j]) + + arff_line.append(task.class_labels[pred_y[i]]) + arff_line.append(task.class_labels[test_y[i]]) + + arff_datacontent.append(arff_line) + + if add_local_measures: + _calculate_local_measure( + sklearn.metrics.accuracy_score, + 'predictive_accuracy', ) - arff_datacontent.extend(arff_datacontent_fold) - arff_tracecontent.extend(arff_tracecontent_fold) - - for measure in user_defined_measures_fold: - - if measure not in user_defined_measures_per_fold: - user_defined_measures_per_fold[measure] = OrderedDict() - if rep_no not in user_defined_measures_per_fold[measure]: - user_defined_measures_per_fold[measure][rep_no] = OrderedDict() - - if measure not in user_defined_measures_per_sample: - user_defined_measures_per_sample[measure] = OrderedDict() - if rep_no not in user_defined_measures_per_sample[measure]: - user_defined_measures_per_sample[measure][rep_no] = OrderedDict() - if fold_no not in user_defined_measures_per_sample[ - measure][rep_no]: - user_defined_measures_per_sample[measure][rep_no][fold_no] = OrderedDict() - - user_defined_measures_per_fold[measure][rep_no][ - fold_no] = user_defined_measures_fold[measure] - user_defined_measures_per_sample[measure][rep_no][fold_no][ - sample_no] = user_defined_measures_fold[measure] - - # Note that we need to use a fitted model (i.e., model_fold, and not model) - # here, to ensure it contains the hyperparameter data (in cv_results_) - if extension.is_hpo_class(model): - trace = extension.obtain_arff_trace(model_fold, arff_tracecontent) # type: Optional[OpenMLRunTrace] # noqa E501 + elif isinstance(task, OpenMLRegressionTask): + + for i in range(0, len(test_indices)): + arff_line = [rep_no, fold_no, test_indices[i], pred_y[i], test_y[i]] + arff_datacontent.append(arff_line) + + if add_local_measures: + _calculate_local_measure( + sklearn.metrics.mean_absolute_error, + 'mean_absolute_error', + ) + + elif isinstance(task, OpenMLClusteringTask): + for i in range(0, len(test_indices)): + arff_line = [test_indices[i], pred_y[i]] # row_id, cluster ID + arff_datacontent.append(arff_line) + + else: + raise TypeError(type(task)) + + for measure in user_defined_measures_fold: + + if measure not in user_defined_measures_per_fold: + user_defined_measures_per_fold[measure] = OrderedDict() + if rep_no not in user_defined_measures_per_fold[measure]: + user_defined_measures_per_fold[measure][rep_no] = OrderedDict() + + if measure not in user_defined_measures_per_sample: + user_defined_measures_per_sample[measure] = OrderedDict() + if rep_no not in user_defined_measures_per_sample[measure]: + user_defined_measures_per_sample[measure][rep_no] = OrderedDict() + if fold_no not in user_defined_measures_per_sample[measure][rep_no]: + user_defined_measures_per_sample[measure][rep_no][fold_no] = OrderedDict() + + user_defined_measures_per_fold[measure][rep_no][fold_no] = ( + user_defined_measures_fold[measure] + ) + user_defined_measures_per_sample[measure][rep_no][fold_no][sample_no] = ( + user_defined_measures_fold[measure] + ) + + if len(traces) > 0: + if len(traces) != n_fit: + raise ValueError( + 'Did not find enough traces (expected {}, found {})'.format(n_fit, len(traces)) + ) + else: + trace = OpenMLRunTrace.merge_traces(traces) else: trace = None @@ -467,13 +545,19 @@ def get_runs(run_ids): @openml.utils.thread_safe_if_oslo_installed -def get_run(run_id): +def get_run(run_id: int, ignore_cache: bool = False) -> OpenMLRun: """Gets run corresponding to run_id. Parameters ---------- run_id : int + ignore_cache : bool + Whether to ignore the cache. If ``true`` this will download and overwrite the run xml + even if the requested run is already cached. + + ignore_cache + Returns ------- run : OpenMLRun @@ -487,11 +571,13 @@ def get_run(run_id): os.makedirs(run_dir) try: - return _get_cached_run(run_id) + if not ignore_cache: + return _get_cached_run(run_id) + else: + raise OpenMLCacheException(message='dummy') - except (OpenMLCacheException): - run_xml = openml._api_calls._perform_api_call("run/%d" % run_id, - 'get') + except OpenMLCacheException: + run_xml = openml._api_calls._perform_api_call("run/%d" % run_id, 'get') with io.open(run_file, "w", encoding='utf8') as fh: fh.write(run_xml) diff --git a/openml/runs/trace.py b/openml/runs/trace.py index 08fccaa61..42e89c50b 100644 --- a/openml/runs/trace.py +++ b/openml/runs/trace.py @@ -1,8 +1,10 @@ -import arff +from collections import OrderedDict import json import os +from typing import List, Tuple # noqa F401 + +import arff import xmltodict -from collections import OrderedDict PREFIX = 'parameter_' REQUIRED_ATTRIBUTES = [ @@ -281,7 +283,7 @@ def _trace_from_arff_struct(cls, attributes, content, error_message): setup_string=None, evaluation=evaluation, selected=selected, - paramaters=parameters, + parameters=parameters, ) trace[(repeat, fold, iteration)] = current @@ -346,9 +348,41 @@ def trace_from_xml(cls, xml): return cls(run_id, trace) + @classmethod + def merge_traces(cls, traces: List['OpenMLRunTrace']) -> 'OpenMLRunTrace': + + merged_trace = OrderedDict() # type: OrderedDict[Tuple[int, int, int], OpenMLTraceIteration] # noqa E501 + + previous_iteration = None + for trace in traces: + for iteration in trace: + key = (iteration.repeat, iteration.fold, iteration.iteration) + if previous_iteration is not None: + if ( + list(merged_trace[previous_iteration].parameters.keys()) + != list(iteration.parameters.keys()) + ): + raise ValueError( + 'Cannot merge traces because the parameters are not equal: {} vs {}'. + format( + list(merged_trace[previous_iteration].parameters.keys()), + list(iteration.parameters.keys()), + ) + ) + + if key in merged_trace: + raise ValueError( + "Cannot merge traces because key '{}' was encountered twice".format(key) + ) + + merged_trace[key] = iteration + previous_iteration = key + + return cls(None, merged_trace) + def __str__(self): - return '[Run id: %d, %d trace iterations]' % ( - self.run_id, + return '[Run id: %d, %d trace iterations]'.format( + -1 if self.run_id is None else self.run_id, len(self.trace_iterations), ) @@ -394,25 +428,25 @@ def __init__( setup_string, evaluation, selected, - paramaters=None, + parameters=None, ): if not isinstance(selected, bool): raise TypeError(type(selected)) - if setup_string and paramaters: + if setup_string and parameters: raise ValueError( 'Can only be instantiated with either ' 'setup_string or parameters argument.' ) - elif not setup_string and not paramaters: + elif not setup_string and not parameters: raise ValueError( 'Either setup_string or parameters needs to be passed as ' 'argument.' ) - if paramaters is not None and not isinstance(paramaters, OrderedDict): + if parameters is not None and not isinstance(parameters, OrderedDict): raise TypeError( 'argument parameters is not an instance of OrderedDict, but %s' - % str(type(paramaters)) + % str(type(parameters)) ) self.repeat = repeat @@ -421,7 +455,7 @@ def __init__( self.setup_string = setup_string self.evaluation = evaluation self.selected = selected - self.parameters = paramaters + self.parameters = parameters def get_parameters(self): result = {} diff --git a/openml/tasks/task.py b/openml/tasks/task.py index 7479bf36c..ab1dcae02 100644 --- a/openml/tasks/task.py +++ b/openml/tasks/task.py @@ -1,5 +1,10 @@ import io import os +from typing import Union + +import numpy as np +import pandas as pd +import scipy.sparse from .. import datasets from .split import OpenMLSplit @@ -108,9 +113,18 @@ def __init__(self, task_id, task_type_id, task_type, data_set_id, self.target_name = target_name self.split = None - def get_X_and_y(self): + def get_X_and_y( + self, + dataset_format: str = 'array', + ) -> Union[np.ndarray, pd.DataFrame, scipy.sparse.spmatrix]: """Get data associated with the current task. + Parameters + ---------- + dataset_format : str + Data structure of the returned data. See :meth:`openml.datasets.OpenMLDataset.get_data` + for possible options. + Returns ------- tuple - X and y @@ -120,7 +134,7 @@ def get_X_and_y(self): if self.task_type_id not in (1, 2, 3): raise NotImplementedError(self.task_type) X_and_y = dataset.get_data( - dataset_format='array', target=self.target_name + dataset_format=dataset_format, target=self.target_name, ) return X_and_y @@ -177,6 +191,29 @@ def __init__(self, task_id, task_type_id, task_type, data_set_id, ) self.number_of_clusters = number_of_clusters + def get_X( + self, + dataset_format: str = 'array', + ) -> Union[np.ndarray, pd.DataFrame, scipy.sparse.spmatrix]: + """Get data associated with the current task. + + Parameters + ---------- + dataset_format : str + Data structure of the returned data. See :meth:`openml.datasets.OpenMLDataset.get_data` + for possible options. + + Returns + ------- + tuple - X and y + + """ + dataset = self.get_dataset() + X_and_y = dataset.get_data( + dataset_format=dataset_format, target=None, + ) + return X_and_y + class OpenMLLearningCurveTask(OpenMLClassificationTask): def __init__(self, task_id, task_type_id, task_type, data_set_id, diff --git a/openml/testing.py b/openml/testing.py index a4fa9cc8b..1ce0862d0 100644 --- a/openml/testing.py +++ b/openml/testing.py @@ -144,6 +144,7 @@ def _check_fold_timing_evaluations( num_folds: int, max_time_allowed: float = 60000.0, task_type: int = TaskTypeEnum.SUPERVISED_CLASSIFICATION, + check_scores: bool = True, ): """ Checks whether the right timing measures are attached to the run @@ -167,10 +168,11 @@ def _check_fold_timing_evaluations( 'wall_clock_time_millis': (0, max_time_allowed), } - if task_type in (TaskTypeEnum.SUPERVISED_CLASSIFICATION, TaskTypeEnum.LEARNING_CURVE): - check_measures['predictive_accuracy'] = (0, 1.) - elif task_type == TaskTypeEnum.SUPERVISED_REGRESSION: - check_measures['mean_absolute_error'] = (0, float("inf")) + if check_scores: + if task_type in (TaskTypeEnum.SUPERVISED_CLASSIFICATION, TaskTypeEnum.LEARNING_CURVE): + check_measures['predictive_accuracy'] = (0, 1.) + elif task_type == TaskTypeEnum.SUPERVISED_REGRESSION: + check_measures['mean_absolute_error'] = (0, float("inf")) self.assertIsInstance(fold_evaluations, dict) if sys.version_info[:2] >= (3, 3): diff --git a/tests/test_extensions/test_sklearn_extension/test_sklearn_extension.py b/tests/test_extensions/test_sklearn_extension/test_sklearn_extension.py index ae5e1b576..aef064ad5 100644 --- a/tests/test_extensions/test_sklearn_extension/test_sklearn_extension.py +++ b/tests/test_extensions/test_sklearn_extension/test_sklearn_extension.py @@ -810,6 +810,25 @@ def test_serialize_advanced_grid(self): self.assertEqual(grid[1]['classify__C'], deserialized[1]['classify__C']) + def test_serialize_advanced_grid_fails(self): + # This unit test is checking that the test we skip above would actually fail + + param_grid = { + "base_estimator": [ + sklearn.tree.DecisionTreeClassifier(), + sklearn.tree.ExtraTreeClassifier()] + } + + clf = sklearn.model_selection.GridSearchCV( + sklearn.ensemble.BaggingClassifier(), + param_grid=param_grid, + ) + with self.assertRaisesRegex( + TypeError, + ".*OpenMLFlow.*is not JSON serializable", + ): + self.extension.model_to_flow(clf) + def test_serialize_resampling(self): kfold = sklearn.model_selection.StratifiedKFold( n_splits=4, shuffle=True) @@ -1254,101 +1273,280 @@ def test_seed_model_raises(self): with self.assertRaises(ValueError): self.extension.seed_model(model=clf, seed=42) - def test_run_model_on_fold(self): - task = openml.tasks.get_task(7) - num_instances = 320 - num_folds = 1 - num_repeats = 1 + def test_run_model_on_fold_classification_1(self): + task = openml.tasks.get_task(1) - clf = sklearn.linear_model.SGDClassifier(loss='log', random_state=1) + X, y = task.get_X_and_y() + train_indices, test_indices = task.get_train_test_split_indices( + repeat=0, fold=0, sample=0) + X_train = X[train_indices] + y_train = y[train_indices] + X_test = X[test_indices] + y_test = y[test_indices] + + pipeline = sklearn.pipeline.Pipeline(steps=[ + ('imp', sklearn.preprocessing.Imputer()), + ('clf', sklearn.tree.DecisionTreeClassifier()), + ]) # TODO add some mocking here to actually test the innards of this function, too! res = self.extension._run_model_on_fold( - clf, task, 0, 0, 0, - add_local_measures=True) + model=pipeline, + task=task, + fold_no=0, + rep_no=0, + X_train=X_train, + y_train=y_train, + X_test=X_test, + ) + + y_hat, y_hat_proba, user_defined_measures, trace = res - arff_datacontent, arff_tracecontent, user_defined_measures, model = res # predictions - self.assertIsInstance(arff_datacontent, list) + self.assertIsInstance(y_hat, np.ndarray) + self.assertEqual(y_hat.shape, y_test.shape) + self.assertIsInstance(y_hat_proba, np.ndarray) + self.assertEqual(y_hat_proba.shape, (y_test.shape[0], 6)) + np.testing.assert_array_almost_equal(np.sum(y_hat_proba, axis=1), np.ones(y_test.shape)) + # The class '4' (at index 3) is not present in the training data. We check that the + # predicted probabilities for that class are zero! + np.testing.assert_array_almost_equal(y_hat_proba[:, 3], np.zeros(y_test.shape)) + for i in (0, 1, 2, 4, 5): + self.assertTrue(np.any(y_hat_proba[:, i] != np.zeros(y_test.shape))) + + # check user defined measures + fold_evaluations = collections.defaultdict(lambda: collections.defaultdict(dict)) + for measure in user_defined_measures: + fold_evaluations[measure][0][0] = user_defined_measures[measure] + # trace. SGD does not produce any - self.assertIsInstance(arff_tracecontent, list) - self.assertEqual(len(arff_tracecontent), 0) + self.assertIsNone(trace) + + self._check_fold_timing_evaluations( + fold_evaluations, + num_repeats=1, + num_folds=1, + task_type=task.task_type_id, + check_scores=False, + ) - fold_evaluations = collections.defaultdict( - lambda: collections.defaultdict(dict)) + def test_run_model_on_fold_classification_2(self): + task = openml.tasks.get_task(7) + + X, y = task.get_X_and_y() + train_indices, test_indices = task.get_train_test_split_indices( + repeat=0, fold=0, sample=0) + X_train = X[train_indices] + y_train = y[train_indices] + X_test = X[test_indices] + y_test = y[test_indices] + + pipeline = sklearn.model_selection.GridSearchCV( + sklearn.tree.DecisionTreeClassifier(), + { + "max_depth": [1, 2], + }, + ) + # TODO add some mocking here to actually test the innards of this function, too! + res = self.extension._run_model_on_fold( + model=pipeline, + task=task, + fold_no=0, + rep_no=0, + X_train=X_train, + y_train=y_train, + X_test=X_test, + ) + + y_hat, y_hat_proba, user_defined_measures, trace = res + + # predictions + self.assertIsInstance(y_hat, np.ndarray) + self.assertEqual(y_hat.shape, y_test.shape) + self.assertIsInstance(y_hat_proba, np.ndarray) + self.assertEqual(y_hat_proba.shape, (y_test.shape[0], 2)) + np.testing.assert_array_almost_equal(np.sum(y_hat_proba, axis=1), np.ones(y_test.shape)) + for i in (0, 1): + self.assertTrue(np.any(y_hat_proba[:, i] != np.zeros(y_test.shape))) + + # check user defined measures + fold_evaluations = collections.defaultdict(lambda: collections.defaultdict(dict)) for measure in user_defined_measures: fold_evaluations[measure][0][0] = user_defined_measures[measure] - self._check_fold_timing_evaluations(fold_evaluations, num_repeats, num_folds, - task_type=task.task_type_id) - - # 10 times 10 fold CV of 150 samples - self.assertEqual(len(arff_datacontent), num_instances * num_repeats) - for arff_line in arff_datacontent: - # check number columns - self.assertEqual(len(arff_line), 8) - # check repeat - self.assertGreaterEqual(arff_line[0], 0) - self.assertLessEqual(arff_line[0], num_repeats - 1) - # check fold - self.assertGreaterEqual(arff_line[1], 0) - self.assertLessEqual(arff_line[1], num_folds - 1) - # check row id - self.assertGreaterEqual(arff_line[2], 0) - self.assertLessEqual(arff_line[2], num_instances - 1) - # check confidences - self.assertAlmostEqual(sum(arff_line[4:6]), 1.0) - self.assertIn(arff_line[6], ['won', 'nowin']) - self.assertIn(arff_line[7], ['won', 'nowin']) - - def test__prediction_to_row(self): - repeat_nr = 0 - fold_nr = 0 - clf = sklearn.pipeline.Pipeline(steps=[ - ('Imputer', Imputer(strategy='mean')), - ('VarianceThreshold', sklearn.feature_selection.VarianceThreshold(threshold=0.05)), - ('Estimator', sklearn.naive_bayes.GaussianNB())] + # check that it produced and returned a trace object of the correct length + self.assertIsInstance(trace, OpenMLRunTrace) + self.assertEqual(len(trace.trace_iterations), 2) + + self._check_fold_timing_evaluations( + fold_evaluations, + num_repeats=1, + num_folds=1, + task_type=task.task_type_id, + check_scores=False, ) - task = openml.tasks.get_task(20) - train, test = task.get_train_test_split_indices(repeat_nr, fold_nr) - X, y = task.get_X_and_y() - clf.fit(X[train], y[train]) - - test_X = X[test] - test_y = y[test] - - probaY = clf.predict_proba(test_X) - predY = clf.predict(test_X) - sample_nr = 0 # default for this task - for idx in range(0, len(test_X)): - arff_line = self.extension._prediction_to_row( - rep_no=repeat_nr, - fold_no=fold_nr, - sample_no=sample_nr, - row_id=idx, - correct_label=task.class_labels[test_y[idx]], - predicted_label=predY[idx], - predicted_probabilities=probaY[idx], - class_labels=task.class_labels, - model_classes_mapping=clf.classes_, + + def test_run_model_on_fold_classification_3(self): + + class HardNaiveBayes(sklearn.naive_bayes.GaussianNB): + # class for testing a naive bayes classifier that does not allow soft + # predictions + def __init__(self, priors=None): + super(HardNaiveBayes, self).__init__(priors) + + def predict_proba(*args, **kwargs): + raise AttributeError('predict_proba is not available when ' + 'probability=False') + + # task 1 (test server) is important: it is a task with an unused class + tasks = [1, 3, 115] + flow = unittest.mock.Mock() + flow.name = 'dummy' + + for task_id in tasks: + task = openml.tasks.get_task(task_id) + X, y = task.get_X_and_y() + train_indices, test_indices = task.get_train_test_split_indices( + repeat=0, fold=0, sample=0) + X_train = X[train_indices] + y_train = y[train_indices] + X_test = X[test_indices] + clf1 = sklearn.pipeline.Pipeline(steps=[ + ('imputer', sklearn.preprocessing.Imputer()), + ('estimator', sklearn.naive_bayes.GaussianNB()) + ]) + clf2 = sklearn.pipeline.Pipeline(steps=[ + ('imputer', sklearn.preprocessing.Imputer()), + ('estimator', HardNaiveBayes()) + ]) + + pred_1, proba_1, _, _ = self.extension._run_model_on_fold( + model=clf1, + task=task, + X_train=X_train, + y_train=y_train, + X_test=X_test, + fold_no=0, + rep_no=0, + ) + pred_2, proba_2, _, _ = self.extension._run_model_on_fold( + model=clf2, + task=task, + X_train=X_train, + y_train=y_train, + X_test=X_test, + fold_no=0, + rep_no=0, ) - self.assertIsInstance(arff_line, list) - self.assertEqual(len(arff_line), 6 + len(task.class_labels)) - self.assertEqual(arff_line[0], repeat_nr) - self.assertEqual(arff_line[1], fold_nr) - self.assertEqual(arff_line[2], sample_nr) - self.assertEqual(arff_line[3], idx) - sum_ = 0.0 - for att_idx in range(4, 4 + len(task.class_labels)): - self.assertIsInstance(arff_line[att_idx], float) - self.assertGreaterEqual(arff_line[att_idx], 0.0) - self.assertLessEqual(arff_line[att_idx], 1.0) - sum_ += arff_line[att_idx] - self.assertAlmostEqual(sum_, 1.0) - - self.assertIn(arff_line[-1], task.class_labels) - self.assertIn(arff_line[-2], task.class_labels) - pass + # verifies that the predictions are identical + np.testing.assert_array_equal(pred_1, pred_2) + np.testing.assert_array_almost_equal(np.sum(proba_1, axis=1), np.ones(X_test.shape[0])) + # Test that there are predictions other than ones and zeros + self.assertLess( + np.sum(proba_1 == 0) + np.sum(proba_1 == 1), + X_test.shape[0] * len(task.class_labels), + ) + + np.testing.assert_array_almost_equal(np.sum(proba_2, axis=1), np.ones(X_test.shape[0])) + # Test that there are only ones and zeros predicted + self.assertEqual( + np.sum(proba_2 == 0) + np.sum(proba_2 == 1), + X_test.shape[0] * len(task.class_labels), + ) + + def test_run_model_on_fold_regression(self): + # There aren't any regression tasks on the test server + openml.config.server = self.production_server + task = openml.tasks.get_task(2999) + + X, y = task.get_X_and_y() + train_indices, test_indices = task.get_train_test_split_indices( + repeat=0, fold=0, sample=0) + X_train = X[train_indices] + y_train = y[train_indices] + X_test = X[test_indices] + y_test = y[test_indices] + + pipeline = sklearn.pipeline.Pipeline(steps=[ + ('imp', sklearn.preprocessing.Imputer()), + ('clf', sklearn.tree.DecisionTreeRegressor()), + ]) + # TODO add some mocking here to actually test the innards of this function, too! + res = self.extension._run_model_on_fold( + model=pipeline, + task=task, + fold_no=0, + rep_no=0, + X_train=X_train, + y_train=y_train, + X_test=X_test, + ) + + y_hat, y_hat_proba, user_defined_measures, trace = res + + # predictions + self.assertIsInstance(y_hat, np.ndarray) + self.assertEqual(y_hat.shape, y_test.shape) + self.assertIsNone(y_hat_proba) + + # check user defined measures + fold_evaluations = collections.defaultdict(lambda: collections.defaultdict(dict)) + for measure in user_defined_measures: + fold_evaluations[measure][0][0] = user_defined_measures[measure] + + # trace. SGD does not produce any + self.assertIsNone(trace) + + self._check_fold_timing_evaluations( + fold_evaluations, + num_repeats=1, + num_folds=1, + task_type=task.task_type_id, + check_scores=False, + ) + + def test_run_model_on_fold_clustering(self): + # There aren't any regression tasks on the test server + openml.config.server = self.production_server + task = openml.tasks.get_task(126033) + + X = task.get_X(dataset_format='array') + + pipeline = sklearn.pipeline.Pipeline(steps=[ + ('imp', sklearn.preprocessing.Imputer()), + ('clf', sklearn.cluster.KMeans()), + ]) + # TODO add some mocking here to actually test the innards of this function, too! + res = self.extension._run_model_on_fold( + model=pipeline, + task=task, + fold_no=0, + rep_no=0, + X_train=X, + ) + + y_hat, y_hat_proba, user_defined_measures, trace = res + + # predictions + self.assertIsInstance(y_hat, np.ndarray) + self.assertEqual(y_hat.shape, (X.shape[0], )) + self.assertIsNone(y_hat_proba) + + # check user defined measures + fold_evaluations = collections.defaultdict(lambda: collections.defaultdict(dict)) + for measure in user_defined_measures: + fold_evaluations[measure][0][0] = user_defined_measures[measure] + + # trace. SGD does not produce any + self.assertIsNone(trace) + + self._check_fold_timing_evaluations( + fold_evaluations, + num_repeats=1, + num_folds=1, + task_type=task.task_type_id, + check_scores=False, + ) def test__extract_trace_data(self): @@ -1363,7 +1561,7 @@ def test__extract_trace_data(self): param_grid, num_iters, ) - # just run the task + # just run the task on the model (without invoking any fancy extension & openml code) train, _ = task.get_train_test_split_indices(0, 0) X, y = task.get_X_and_y() with warnings.catch_warnings(): @@ -1374,7 +1572,7 @@ def test__extract_trace_data(self): self.assertIn(clf.best_estimator_.hidden_layer_sizes, param_grid['hidden_layer_sizes']) trace_list = self.extension._extract_trace_data(clf, rep_no=0, fold_no=0) - trace = self.extension.obtain_arff_trace(clf, trace_list) + trace = self.extension._obtain_arff_trace(clf, trace_list) self.assertIsInstance(trace, OpenMLRunTrace) self.assertIsInstance(trace_list, list) diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index cf8094a97..05cd953a8 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -4,6 +4,7 @@ import random import time import sys +import unittest.mock import numpy as np @@ -25,7 +26,7 @@ from sklearn.naive_bayes import GaussianNB from sklearn.model_selection._search import BaseSearchCV -from sklearn.tree import DecisionTreeClassifier, ExtraTreeClassifier +from sklearn.tree import DecisionTreeClassifier from sklearn.preprocessing.imputation import Imputer from sklearn.dummy import DummyClassifier from sklearn.preprocessing import StandardScaler @@ -39,17 +40,6 @@ from sklearn.pipeline import Pipeline -class HardNaiveBayes(GaussianNB): - # class for testing a naive bayes classifier that does not allow soft - # predictions - def __init__(self, priors=None): - super(HardNaiveBayes, self).__init__(priors) - - def predict_proba(*args, **kwargs): - raise AttributeError('predict_proba is not available when ' - 'probability=False') - - class TestRun(TestBase): _multiprocess_can_split_ = True # diabetis dataset, 768 observations, 0 missing vals, 33% holdout set @@ -83,11 +73,13 @@ def _wait_for_processed_run(self, run_id, max_waiting_time_seconds): # time.time() works in seconds start_time = time.time() while time.time() - start_time < max_waiting_time_seconds: - run = openml.runs.get_run(run_id) + run = openml.runs.get_run(run_id, ignore_cache=True) if len(run.evaluations) > 0: return else: time.sleep(10) + raise RuntimeError('Could not find any evaluations! Please check whether run {} was ' + 'evaluated correctly on the server'.format(run_id)) def _compare_predictions(self, predictions, predictions_prime): self.assertEqual(np.array(predictions_prime['data']).shape, @@ -437,7 +429,7 @@ def determine_grid_size(param_grid): # todo: check if runtime is present self._check_fold_timing_evaluations(run.fold_evaluations, 1, num_folds, task_type=task_type) - pass + return run def _run_and_upload_classification(self, clf, task_id, n_missing_vals, n_test_obs, flow_expected_rsv, @@ -448,11 +440,19 @@ def _run_and_upload_classification(self, clf, task_id, n_missing_vals, metric_name = 'predictive_accuracy' # openml metric name task_type = TaskTypeEnum.SUPERVISED_CLASSIFICATION # task type - self._run_and_upload(clf, task_id, n_missing_vals, n_test_obs, - flow_expected_rsv, num_folds=num_folds, - num_iterations=num_iterations, - metric=metric, metric_name=metric_name, - task_type=task_type, sentinel=sentinel) + return self._run_and_upload( + clf=clf, + task_id=task_id, + n_missing_vals=n_missing_vals, + n_test_obs=n_test_obs, + flow_expected_rsv=flow_expected_rsv, + num_folds=num_folds, + num_iterations=num_iterations, + metric=metric, + metric_name=metric_name, + task_type=task_type, + sentinel=sentinel, + ) def _run_and_upload_regression(self, clf, task_id, n_missing_vals, n_test_obs, flow_expected_rsv, @@ -463,11 +463,19 @@ def _run_and_upload_regression(self, clf, task_id, n_missing_vals, metric_name = 'mean_absolute_error' # openml metric name task_type = TaskTypeEnum.SUPERVISED_REGRESSION # task type - self._run_and_upload(clf, task_id, n_missing_vals, n_test_obs, - flow_expected_rsv, num_folds=num_folds, - num_iterations=num_iterations, - metric=metric, metric_name=metric_name, - task_type=task_type, sentinel=sentinel) + return self._run_and_upload( + clf=clf, + task_id=task_id, + n_missing_vals=n_missing_vals, + n_test_obs=n_test_obs, + flow_expected_rsv=flow_expected_rsv, + num_folds=num_folds, + num_iterations=num_iterations, + metric=metric, + metric_name=metric_name, + task_type=task_type, + sentinel=sentinel, + ) def test_run_and_upload_logistic_regression(self): lr = LogisticRegression(solver='lbfgs') @@ -559,9 +567,14 @@ def test_run_and_upload_gridsearch(self): task_id = self.TEST_SERVER_TASK_SIMPLE[0] n_missing_vals = self.TEST_SERVER_TASK_SIMPLE[1] n_test_obs = self.TEST_SERVER_TASK_SIMPLE[2] - self._run_and_upload_classification(gridsearch, task_id, - n_missing_vals, n_test_obs, - '62501') + run = self._run_and_upload_classification( + clf=gridsearch, + task_id=task_id, + n_missing_vals=n_missing_vals, + n_test_obs=n_test_obs, + flow_expected_rsv='62501', + ) + self.assertEqual(len(run.trace.trace_iterations), 9) def test_run_and_upload_randomsearch(self): randomsearch = RandomizedSearchCV( @@ -580,9 +593,14 @@ def test_run_and_upload_randomsearch(self): task_id = self.TEST_SERVER_TASK_SIMPLE[0] n_missing_vals = self.TEST_SERVER_TASK_SIMPLE[1] n_test_obs = self.TEST_SERVER_TASK_SIMPLE[2] - self._run_and_upload_classification(randomsearch, task_id, - n_missing_vals, n_test_obs, - '12172') + run = self._run_and_upload_classification( + clf=randomsearch, + task_id=task_id, + n_missing_vals=n_missing_vals, + n_test_obs=n_test_obs, + flow_expected_rsv='12172', + ) + self.assertEqual(len(run.trace.trace_iterations), 5) def test_run_and_upload_maskedarrays(self): # This testcase is important for 2 reasons: @@ -896,21 +914,6 @@ def test__run_exists(self): run_ids = run_exists(task.task_id, setup_exists) self.assertTrue(run_ids, msg=(run_ids, clf)) - def test_run_with_classifiers_in_param_grid(self): - task = openml.tasks.get_task(115) - - param_grid = { - "base_estimator": [DecisionTreeClassifier(), ExtraTreeClassifier()] - } - - clf = GridSearchCV(BaggingClassifier(), param_grid=param_grid) - with self.assertRaises(TypeError): - openml.runs.run_model_on_task( - task=task, - model=clf, - avoid_duplicate_runs=False, - ) - def test_run_with_illegal_flow_id(self): # check the case where the user adds an illegal flow id to a # non-existing flow @@ -1026,8 +1029,11 @@ def test__run_task_get_arffcontent(self): num_folds = 10 num_repeats = 1 + flow = unittest.mock.Mock() + flow.name = 'dummy' clf = SGDClassifier(loss='log', random_state=1) res = openml.runs.functions._run_task_get_arffcontent( + flow=flow, extension=self.extension, model=clf, task=task, @@ -1220,12 +1226,15 @@ def test_run_on_dataset_with_missing_labels(self): # labels only declared in the arff file, but is not present in the # actual data + flow = unittest.mock.Mock() + flow.name = 'dummy' task = openml.tasks.get_task(2) model = Pipeline(steps=[('Imputer', Imputer(strategy='median')), ('Estimator', DecisionTreeClassifier())]) data_content, _, _, _ = _run_task_get_arffcontent( + flow=flow, model=model, task=task, extension=self.extension, @@ -1238,42 +1247,6 @@ def test_run_on_dataset_with_missing_labels(self): # repeat, fold, row_id, 6 confidences, prediction and correct label self.assertEqual(len(row), 12) - def test_predict_proba_hardclassifier(self): - # task 1 (test server) is important: it is a task with an unused class - tasks = [1, 3, 115] - - for task_id in tasks: - task = openml.tasks.get_task(task_id) - clf1 = sklearn.pipeline.Pipeline(steps=[ - ('imputer', sklearn.preprocessing.Imputer()), - ('estimator', GaussianNB()) - ]) - clf2 = sklearn.pipeline.Pipeline(steps=[ - ('imputer', sklearn.preprocessing.Imputer()), - ('estimator', HardNaiveBayes()) - ]) - - arff_content1, _, _, _ = _run_task_get_arffcontent( - model=clf1, - task=task, - extension=self.extension, - add_local_measures=True, - ) - arff_content2, _, _, _ = _run_task_get_arffcontent( - model=clf2, - task=task, - extension=self.extension, - add_local_measures=True, - ) - - # verifies last two arff indices (predict and correct) - # TODO: programmatically check wether these are indeed features - # (predict, correct) - predictionsA = np.array(arff_content1)[:, -2:] - predictionsB = np.array(arff_content2)[:, -2:] - - np.testing.assert_array_equal(predictionsA, predictionsB) - def test_get_cached_run(self): openml.config.cache_directory = self.static_cache_dir openml.runs.functions._get_cached_run(1) diff --git a/tests/test_runs/test_trace.py b/tests/test_runs/test_trace.py index c322343e5..29f3a1554 100644 --- a/tests/test_runs/test_trace.py +++ b/tests/test_runs/test_trace.py @@ -15,7 +15,7 @@ def test_get_selected_iteration(self): setup_string='parameter_%d%d%d' % (i, j, k), evaluation=1.0 * i + 0.1 * j + 0.01 * k, selected=(i == j and i == k and i == 2), - paramaters=None, + parameters=None, ) trace_iterations[(i, j, k)] = t