Skip to content
Merged
77 changes: 58 additions & 19 deletions openml/runs/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import xmltodict
import numpy as np
import warnings
import openml
from sklearn.model_selection._search import BaseSearchCV
import sklearn

from ..exceptions import PyOpenMLError
from .. import config
Expand Down Expand Up @@ -59,7 +58,6 @@ def run_task(task, model, avoid_duplicate_runs=True):
raise PyOpenMLError("Run already exists in server. Run id(s): %s" %str(ids))

dataset = task.get_dataset()
X, Y = dataset.get_data(target=task.target_name)

class_labels = task.class_labels
if class_labels is None:
Expand All @@ -68,19 +66,19 @@ def run_task(task, model, avoid_duplicate_runs=True):

# execute the run
run = OpenMLRun(task_id=task.task_id, flow_id=None, dataset_id=dataset.dataset_id, model=model)
run.data_content, run.trace_content = _run_task_get_arffcontent(model, task, class_labels)
run.data_content, run.trace_content, run.trace_attributes = _run_task_get_arffcontent(model, task, class_labels)


if flow_id == False:
# means the flow did not exists.
# As we could run it, publish it now
# means the flow did not exists. As we could run it, publish it now
flow = flow.publish()
else:
# flow already existed, download it from server
# TODO (neccessary? is this a post condition of this function)
flow = get_flow(flow_id)

run.flow_id = flow.flow_id
config.logger.info('Executed Task %d with Flow id: %d' %(task.task_id, run.flow_id))
config.logger.info('Executed Task %d with Flow id: %d' % (task.task_id, run.flow_id))

return run

Expand Down Expand Up @@ -160,22 +158,27 @@ def _run_task_get_arffcontent(model, task, class_labels):
for rep in task.iterate_repeats():
fold_no = 0
for fold in rep:
model_fold = sklearn.base.clone(model, safe=True)
train_indices, test_indices = fold
trainX = X[train_indices]
trainY = Y[train_indices]
testX = X[test_indices]
testY = Y[test_indices]

model.fit(trainX, trainY)
try:
model_fold.fit(trainX, trainY)

if isinstance(model, BaseSearchCV):
_add_results_to_arfftrace(arff_tracecontent, fold_no, model, rep_no)
model_classes = model.best_estimator_.classes_
else:
model_classes = model.classes_
if isinstance(model_fold, sklearn.model_selection._search.BaseSearchCV):
arff_tracecontent.extend(_extract_arfftrace(model_fold, rep_no, fold_no))
model_classes = model_fold.best_estimator_.classes_
else:
model_classes = model_fold.classes_
except AttributeError as e:
# typically happens when training a regressor on classification task
raise PyOpenMLError(str(e))

ProbaY = model.predict_proba(testX)
PredY = model.predict(testX)
ProbaY = model_fold.predict_proba(testX)
PredY = model_fold.predict(testX)
if ProbaY.shape[1] != len(class_labels):
warnings.warn("Repeat %d Fold %d: estimator only predicted for %d/%d classes!" %(rep_no, fold_no, ProbaY.shape[1], len(class_labels)))

Expand All @@ -186,13 +189,18 @@ def _run_task_get_arffcontent(model, task, class_labels):
fold_no = fold_no + 1
rep_no = rep_no + 1

if not isinstance(model, BaseSearchCV):
if isinstance(model_fold, sklearn.model_selection._search.BaseSearchCV):
# arff_tracecontent is already set
arff_trace_attributes = _extract_arfftrace_attributes(model_fold)
else:
arff_tracecontent = None
arff_trace_attributes = None

return arff_datacontent, arff_tracecontent
return arff_datacontent, arff_tracecontent, arff_trace_attributes


def _add_results_to_arfftrace(arff_tracecontent, fold_no, model, rep_no):
def _extract_arfftrace(model, rep_no, fold_no):
arff_tracecontent = []
for itt_no in range(0, len(model.cv_results_['mean_test_score'])):
# we use the string values for True and False, as it is defined in this way by the OpenML server
selected = 'false'
Expand All @@ -204,6 +212,30 @@ def _add_results_to_arfftrace(arff_tracecontent, fold_no, model, rep_no):
if key.startswith("param_"):
arff_line.append(str(model.cv_results_[key][itt_no]))
arff_tracecontent.append(arff_line)
return arff_tracecontent

def _extract_arfftrace_attributes(model):
# attributes that will be in trace arff, regardless of the model
trace_attributes = [('repeat', 'NUMERIC'),
('fold', 'NUMERIC'),
('iteration', 'NUMERIC'),
('evaluation', 'NUMERIC'),
('selected', ['true', 'false'])]

# model dependent attributes for trace arff
for key in model.cv_results_:
if key.startswith("param_"):
if all(isinstance(i, (bool)) for i in model.cv_results_[key]):
type = ['True', 'False']
elif all(isinstance(i, (int, float)) for i in model.cv_results_[key]):
type = 'NUMERIC'
else:
values = list(set(model.cv_results_[key])) # unique values
type = [str(i) for i in values]

attribute = ("parameter_" + key[6:], type)
trace_attributes.append(attribute)
return trace_attributes


def get_runs(run_ids):
Expand Down Expand Up @@ -306,9 +338,16 @@ def _create_run_from_xml(xml):
dataset_id = int(run['oml:input_data']['oml:dataset']['oml:did'])

predictions_url = None
for file_dict in run['oml:output_data']['oml:file']:
if isinstance(run['oml:output_data']['oml:file'], dict):
# only one result.. probably due to an upload error
file_dict = run['oml:output_data']['oml:file']
if file_dict['oml:name'] == 'predictions':
predictions_url = file_dict['oml:url']
else:
# multiple files, the normal case
for file_dict in run['oml:output_data']['oml:file']:
if file_dict['oml:name'] == 'predictions':
predictions_url = file_dict['oml:url']
if predictions_url is None:
raise ValueError('No URL to download predictions for run %d in run '
'description XML' % run_id)
Expand Down
37 changes: 9 additions & 28 deletions openml/runs/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import arff
import xmltodict
from sklearn.base import BaseEstimator
from sklearn.model_selection._search import BaseSearchCV

import openml
from ..tasks import get_task
Expand All @@ -23,8 +22,8 @@ class OpenMLRun(object):
def __init__(self, task_id, flow_id, dataset_id, setup_string=None,
files=None, setup_id=None, tags=None, uploader=None, uploader_name=None,
evaluations=None, detailed_evaluations=None,
data_content=None, trace_content=None, model=None, task_type=None,
task_evaluation_measure=None, flow_name=None,
data_content=None, trace_attributes=None, trace_content=None,
model=None, task_type=None, task_evaluation_measure=None, flow_name=None,
parameter_settings=None, predictions_url=None, task=None,
flow=None, run_id=None):
self.uploader = uploader
Expand All @@ -42,6 +41,7 @@ def __init__(self, task_id, flow_id, dataset_id, setup_string=None,
self.evaluations = evaluations
self.detailed_evaluations = detailed_evaluations
self.data_content = data_content
self.trace_attributes = trace_attributes
self.trace_content = trace_content
self.error_message = None
self.task = task
Expand Down Expand Up @@ -80,7 +80,7 @@ def _generate_arff_dict(self):
arff_dict['relation'] = 'openml_task_' + str(task.task_id) + '_predictions'
return arff_dict

def _generate_trace_arff_dict(self, model):
def _generate_trace_arff_dict(self):
"""Generates the arff dictionary for uploading predictions to the server.

Assumes that the run has been executed.
Expand All @@ -91,32 +91,13 @@ def _generate_trace_arff_dict(self, model):
Dictionary representation of the ARFF file that will be uploaded.
Contains information about the optimization trace.
"""
if self.trace_content is None:
if self.trace_content is None or len(self.trace_content) == 0:
raise ValueError('No trace content avaiable.')
if not isinstance(model, BaseSearchCV):
raise PyOpenMLError('Cannot generate trace on provided classifier. (This should never happen.)')
if len(self.trace_attributes) != len(self.trace_content[0]):
raise ValueError('Trace_attributes and trace_content not compatible')

arff_dict = {}
arff_dict['attributes'] = [('repeat', 'NUMERIC'),
('fold', 'NUMERIC'),
('iteration', 'NUMERIC'),
('evaluation', 'NUMERIC'),
('selected', ['true', 'false'])]
for key in model.cv_results_:
if key.startswith("param_"):
type = 'STRING'
if all(isinstance(i, (bool)) for i in model.cv_results_[key]):
type = ['True', 'False']
elif all(isinstance(i, (int, float)) for i in model.cv_results_[key]):
type = 'NUMERIC'
else:
values = list(set(model.cv_results_[key])) # unique values
type = [str(i) for i in values]
print(key + ": " + str(type))

attribute = ("parameter_" + key[6:], type)
arff_dict['attributes'].append(attribute)

arff_dict['attributes'] = self.trace_attributes
arff_dict['data'] = self.trace_content
arff_dict['relation'] = 'openml_task_' + str(self.task_id) + '_predictions'

Expand Down Expand Up @@ -145,7 +126,7 @@ def publish(self):
file_elements['predictions'] = ("predictions.arff", predictions)

if self.trace_content is not None:
trace_arff = arff.dumps(self._generate_trace_arff_dict(self.model))
trace_arff = arff.dumps(self._generate_trace_arff_dict())
file_elements['trace'] = ("trace.arff", trace_arff)

return_value = _perform_api_call("/run/", file_elements=file_elements)
Expand Down
9 changes: 4 additions & 5 deletions tests/test_runs/test_run_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ def test_run_regression_on_classif_task(self):

clf = LinearRegression()
task = openml.tasks.get_task(task_id)
self.assertRaisesRegexp(AttributeError,
"'LinearRegression' object has no attribute 'classes_'",
openml.runs.run_task, task=task, model=clf)
self.assertRaises(openml.exceptions.PyOpenMLError, openml.runs.run_task,
task=task, model=clf, avoid_duplicate_runs=False)

@mock.patch('openml.flows.sklearn_to_flow')
def test_check_erronous_sklearn_flow_fails(self, sklearn_to_flow_mock):
Expand Down Expand Up @@ -124,7 +123,7 @@ def test__run_task_get_arffcontent(self):
clf, task, class_labels)

clf = SGDClassifier(loss='log', random_state=1)
arff_datacontent, arff_tracecontent = openml.runs.functions._run_task_get_arffcontent(
arff_datacontent, arff_tracecontent, _ = openml.runs.functions._run_task_get_arffcontent(
clf, task, class_labels)
# predictions
self.assertIsInstance(arff_datacontent, list)
Expand Down Expand Up @@ -291,7 +290,7 @@ def test_run_on_dataset_with_missing_labels(self):
model = Pipeline(steps=[('Imputer', Imputer(strategy='median')),
('Estimator', DecisionTreeClassifier())])

data_content, _ = _run_task_get_arffcontent(model, task, class_labels)
data_content, _, _ = _run_task_get_arffcontent(model, task, class_labels)
# 2 folds, 5 repeats; keep in mind that this task comes from the test
# server, the task on the live server is different
self.assertEqual(len(data_content), 4490)
Expand Down