Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion openml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from . import tasks
from . import runs
from . import flows
from . import setups
from .runs import OpenMLRun
from .tasks import OpenMLTask, OpenMLSplit
from .flows import OpenMLFlow
Expand Down Expand Up @@ -66,4 +67,4 @@ def populate_cache(task_ids=None, dataset_ids=None, flow_ids=None,

__all__ = ['OpenMLDataset', 'OpenMLDataFeature', 'OpenMLRun',
'OpenMLSplit', 'datasets', 'OpenMLTask', 'OpenMLFlow',
'config', 'runs', 'flows', 'tasks']
'config', 'runs', 'flows', 'tasks', 'setups']
4 changes: 2 additions & 2 deletions openml/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
class PyOpenMLError(Exception):
def __init__(self, message):
self.message = message
super(PyOpenMLError, self).__init__(message)


class OpenMLServerError(PyOpenMLError):
"""class for when something is really wrong on the server
(result did not parse to dict), contains unparsed error."""

def __init__(self, message):
message = "OpenML Server error: " + message
super(OpenMLServerError, self).__init__(message)

#
Expand All @@ -18,7 +19,6 @@ class OpenMLServerException(OpenMLServerError):
def __init__(self, code, message, additional=None):
self.code = code
self.additional = additional
message = "OpenML Server exception: " + message
super(OpenMLServerException, self).__init__(message)


Expand Down
2 changes: 1 addition & 1 deletion openml/flows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .flow import OpenMLFlow
from .flow import OpenMLFlow, _copy_server_fields

from .sklearn_converter import sklearn_to_flow, flow_to_sklearn, _check_n_jobs
from .functions import get_flow, list_flows, flow_exists, assert_flows_equal
Expand Down
48 changes: 36 additions & 12 deletions openml/flows/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,21 @@ def _from_dict(cls, xml_dict):
arguments['tags'] = tags

arguments['model'] = None
return cls(**arguments)
flow = cls(**arguments)

# try to parse to a model because not everything that can be
# deserialized has to come from scikit-learn. If it can't be
# serialized, but comes from scikit-learn this is worth an exception
try:
from .sklearn_converter import flow_to_sklearn
model = flow_to_sklearn(flow)
except Exception as e:
if arguments['external_version'].startswith('sklearn'):
raise e
model = None
flow.model = model

return flow

def publish(self):
"""Publish flow to OpenML server.
Expand All @@ -332,32 +346,42 @@ def publish(self):
self : OpenMLFlow

"""
# Import at top not possible because of cyclic dependencies. In
# particular, flow.py tries to import functions.py in order to call
# get_flow(), while functions.py tries to import flow.py in order to
# instantiate an OpenMLFlow.
import openml.flows.functions

xml_description = self._to_xml()

file_elements = {'description': xml_description}
return_value = _perform_api_call("flow/", file_elements=file_elements)
self.flow_id = int(xmltodict.parse(return_value)['oml:upload_flow']['oml:id'])
flow_id = int(xmltodict.parse(return_value)['oml:upload_flow']['oml:id'])
flow = openml.flows.functions.get_flow(flow_id)
_copy_server_fields(flow, self)
try:
_check_flow(self)
openml.flows.functions.assert_flows_equal(self, flow, flow.upload_date)
except ValueError as e:
message = e.args[0]
raise ValueError("Flow was not stored correctly on the server. "
"New flow ID is %d. Please check manually and "
"remove the flow if necessary! Error is:\n'%s'" %
(self.flow_id, message))
(flow_id, message))
return self


def _copy_server_fields(source_flow, target_flow):
fields_added_by_the_server = ['flow_id', 'uploader', 'version',
'upload_date']
for field in fields_added_by_the_server:
setattr(target_flow, field, getattr(source_flow, field))

for name, component in source_flow.components.items():
assert name in target_flow.components
_copy_server_fields(component, target_flow.components[name])


def _add_if_nonempty(dic, key, value):
if value is not None:
dic[key] = value


def _check_flow(flow):
# Import is not possible at the top of the file as this would cause an
# ImportError due to an import cycle.
import openml.flows.functions

flow_copy = openml.flows.functions.get_flow(flow.flow_id)
openml.flows.functions.assert_flows_equal(flow, flow_copy)
59 changes: 51 additions & 8 deletions openml/flows/functions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import dateutil.parser

import xmltodict
import six

from openml._api_calls import _perform_api_call
from . import OpenMLFlow, flow_to_sklearn
from . import OpenMLFlow


def get_flow(flow_id):
Expand All @@ -24,9 +26,6 @@ def get_flow(flow_id):
flow_dict = xmltodict.parse(flow_xml)
flow = OpenMLFlow._from_dict(flow_dict)

if 'sklearn' in flow.external_version:
flow.model = flow_to_sklearn(flow)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startswith, (..again) ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of code is removed, I can't add anything here.


return flow


Expand Down Expand Up @@ -130,11 +129,41 @@ def _list_flows(api_call):
return flows


def assert_flows_equal(flow1, flow2):
def _check_flow_for_server_id(flow):
"""Check if the given flow and it's components have a flow_id."""

# Depth-first search to check if all components were uploaded to the
# server before parsing the parameters
stack = list()
stack.append(flow)
while len(stack) > 0:
current = stack.pop()
if current.flow_id is None:
raise ValueError("Flow %s has no flow_id!" % current.name)
else:
for component in current.components.values():
stack.append(component)


def assert_flows_equal(flow1, flow2, ignore_parameters_on_older_children=None,
ignore_parameters=False):
"""Check equality of two flows.

Two flows are equal if their all keys which are not set by the server
are equal, as well as all their parameters and components.

Parameters
----------
flow1 : OpenMLFlow

flow2 : OpenMLFlow

ignore_parameters_on_older_children : str
If set to ``OpenMLFlow.upload_date``, ignores parameters in a child
flow if it's upload date predates the upload date of the parent flow.

ignore_parameters : bool
Whether to ignore parameter values when comparing flows.
"""
if not isinstance(flow1, OpenMLFlow):
raise TypeError('Argument 1 must be of type OpenMLFlow, but is %s' %
Expand All @@ -144,8 +173,9 @@ def assert_flows_equal(flow1, flow2):
raise TypeError('Argument 2 must be of type OpenMLFlow, but is %s' %
type(flow2))

generated_by_the_server = ['flow_id', 'uploader', 'version',
'upload_date', ]
# TODO as they are actually now saved during publish, it might be good to
# check for the equality of these as well.
generated_by_the_server = ['flow_id', 'uploader', 'version', 'upload_date']
ignored_by_python_API = ['binary_url', 'binary_format', 'binary_md5',
'model']

Expand All @@ -162,9 +192,22 @@ def assert_flows_equal(flow1, flow2):
if not name in attr2:
raise ValueError('Component %s only available in '
'argument2, but not in argument1.' % name)
assert_flows_equal(attr1[name], attr2[name])
assert_flows_equal(attr1[name], attr2[name],
ignore_parameters_on_older_children,
ignore_parameters)

else:
if key == 'parameters':
if ignore_parameters_on_older_children:
upload_date_current_flow = dateutil.parser.parse(
flow1.upload_date)
upload_date_parent_flow = dateutil.parser.parse(
ignore_parameters_on_older_children)
if upload_date_current_flow < upload_date_parent_flow:
continue
elif ignore_parameters:
continue

if attr1 != attr2:
raise ValueError("Flow %s: values for attribute '%s' differ: "
"'%s' vs '%s'." %
Expand Down
8 changes: 5 additions & 3 deletions openml/runs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from .run import OpenMLRun
from .trace import OpenMLRunTrace, OpenMLTraceIteration
from .functions import (run_task, get_run, list_runs, get_runs, get_run_trace,
initialize_model_from_run, initialize_model_from_trace)
from .functions import (run_model_on_task, run_flow_on_task, get_run, list_runs,
get_runs, get_run_trace, initialize_model_from_run,
initialize_model_from_trace)

__all__ = ['OpenMLRun', 'run_task', 'get_run', 'list_runs', 'get_runs']
__all__ = ['OpenMLRun', 'run_model_on_task', 'run_flow_on_task', 'get_run',
'list_runs', 'get_runs']
94 changes: 64 additions & 30 deletions openml/runs/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@
import warnings

import numpy as np
import sklearn
import sklearn.pipeline
import six
import xmltodict

import openml
from ..exceptions import PyOpenMLError
from .. import config
from ..flows import sklearn_to_flow, get_flow, flow_exists, _check_n_jobs
from ..flows import sklearn_to_flow, get_flow, flow_exists, _check_n_jobs, \
_copy_server_fields
from ..setups import setup_exists, initialize_model
from ..exceptions import OpenMLCacheException, OpenMLServerException
from .._api_calls import _perform_api_call, _file_id_to_url
from .._api_calls import _perform_api_call
from .run import OpenMLRun, _get_version_information
from .trace import OpenMLRunTrace, OpenMLTraceIteration

Expand All @@ -25,24 +27,42 @@
# circular imports


def run_task(task, model, avoid_duplicate_runs=True, flow_tags=None, seed=None):
"""Performs a CV run on the dataset of the given task, using the split.
def run_model_on_task(task, model, avoid_duplicate_runs=True, flow_tags=None,
seed=None):
"""See ``run_flow_on_task for a documentation."""

flow = sklearn_to_flow(model)

return run_flow_on_task(task=task, flow=flow,
avoid_duplicate_runs=avoid_duplicate_runs,
flow_tags=flow_tags, seed=seed)


def run_flow_on_task(task, flow, avoid_duplicate_runs=True, flow_tags=None,
seed=None):
"""Run the model provided by the flow on the dataset defined by task.

Takes the flow and repeat information into account. In case a flow is not
yet published, it is published after executing the run (requires
internet connection).

Parameters
----------
task : OpenMLTask
Task to perform.
model : sklearn model
a model which has a function fit(X,Y) and predict(X),
A model which has a function fit(X,Y) and predict(X),
all supervised estimators of scikit learn follow this definition of a model [1]
[1](http://scikit-learn.org/stable/tutorial/statistical_inference/supervised_learning.html)
avoid_duplicate_runs : bool
if this flag is set to True, the run will throw an error if the
setup/task combination is already present on the server.
If this flag is set to True, the run will throw an error if the
setup/task combination is already present on the server. Works only
if the flow is already published on the server. This feature requires an
internet connection.
flow_tags : list(str)
a list of tags that the flow should have at creation
A list of tags that the flow should have at creation.
seed: int
the models that are not seeded will get this seed
Models that are not seeded will get this seed.

Returns
-------
Expand All @@ -51,23 +71,19 @@ def run_task(task, model, avoid_duplicate_runs=True, flow_tags=None, seed=None):
"""
if flow_tags is not None and not isinstance(flow_tags, list):
raise ValueError("flow_tags should be list")
# TODO move this into its onwn module. While it somehow belongs here, it
# adds quite a lot of functionality which is better suited in other places!
# TODO why doesn't this accept a flow as input? - this would make this more flexible!
model = _get_seeded_model(model, seed)
flow = sklearn_to_flow(model)

# returns flow id if the flow exists on the server, False otherwise
flow_id = flow_exists(flow.name, flow.external_version)
flow.model = _get_seeded_model(flow.model, seed=seed)

# skips the run if it already exists and the user opts for this in the config file.
# also, if the flow is not present on the server, the check is not needed.
flow_id = flow_exists(flow.name, flow.external_version)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document somewhere that we need to have the 'avoid_duplicate_runs' to false if we want offline experiments

if avoid_duplicate_runs and flow_id:
flow = get_flow(flow_id)
setup_id = setup_exists(flow, model)
flow_from_server = get_flow(flow_id)
setup_id = setup_exists(flow_from_server)
ids = _run_exists(task.task_id, setup_id)
if ids:
raise PyOpenMLError("Run already exists in server. Run id(s): %s" %str(ids))
_copy_server_fields(flow_from_server, flow)

dataset = task.get_dataset()

Expand All @@ -78,25 +94,43 @@ def run_task(task, model, avoid_duplicate_runs=True, flow_tags=None, seed=None):

run_environment = _get_version_information()
tags = ['openml-python', run_environment[1]]

# execute the run
run = OpenMLRun(task_id=task.task_id, flow_id=None, dataset_id=dataset.dataset_id, model=model, tags=tags)
res = _run_task_get_arffcontent(model, task, class_labels)
run.data_content, run.trace_content, run.trace_attributes, run.detailed_evaluations = res
res = _run_task_get_arffcontent(flow.model, task, class_labels)

if flow_id == False:
# means the flow did not exists. As we could run it, publish it now
flow = flow.publish()
else:
# flow already existed, download it from server
# TODO (neccessary? is this a post condition of this function)
flow = get_flow(flow_id)
if flow.flow_id is None:
_publish_flow_if_necessary(flow)

run = OpenMLRun(task_id=task.task_id, flow_id=flow.flow_id,
dataset_id=dataset.dataset_id, model=flow.model, tags=tags)
run.parameter_settings = OpenMLRun._parse_parameters(flow)

run.data_content, run.trace_content, run.trace_attributes, run.detailed_evaluations = res

run.flow_id = flow.flow_id
config.logger.info('Executed Task %d with Flow id: %d' % (task.task_id, run.flow_id))

return run


def _publish_flow_if_necessary(flow):
# try publishing the flow if one has to assume it doesn't exist yet. It
# might fail because it already exists, then the flow is currently not
# reused

try:
flow.publish()
except OpenMLServerException as e:
if e.message == "flow already exists":
flow_id = openml.flows.flow_exists(flow.name,
flow.external_version)
server_flow = get_flow(flow_id)
openml.flows.flow._copy_server_fields(server_flow, flow)
openml.flows.assert_flows_equal(flow, server_flow,
ignore_parameters=True)
else:
raise e


def get_run_trace(run_id):
"""Get the optimization trace object for a given run id.

Expand Down
Loading