Skip to content

Commit

Permalink
feat(sdk.v2): surface metrics output artifacts to pipeline outputs (#…
Browse files Browse the repository at this point in the history
…5445)

* populates metrics outputs in root

* avoid kfp import in io_types.py

* populates outputs following the right path
  • Loading branch information
chensun committed Apr 9, 2021
1 parent d8b2b24 commit a804211
Show file tree
Hide file tree
Showing 15 changed files with 807 additions and 702 deletions.
5 changes: 5 additions & 0 deletions sdk/python/kfp/dsl/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ def get_artifact_type(cls) -> str:

return serialization_utils.yaml_dump(result_map)

@classmethod
def get_ir_type(cls) -> pipeline_spec_pb2.ArtifactTypeSchema:
return pipeline_spec_pb2.ArtifactTypeSchema(
instance_schema=cls.get_artifact_type())

@classmethod
def get_from_runtime_artifact(
cls, artifact: pipeline_spec_pb2.RuntimeArtifact) -> Any:
Expand Down
16 changes: 8 additions & 8 deletions sdk/python/kfp/dsl/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def build_component_spec_from_structure(
input_spec.name].type = type_utils.get_parameter_type(input_spec.type)
else:
result.input_definitions.artifacts[
input_spec.name].artifact_type.instance_schema = (
type_utils.get_artifact_type_schema(input_spec.type))
input_spec.name].artifact_type.CopyFrom(
type_utils.get_artifact_type_schema_message(input_spec.type))

for output_spec in component_spec.outputs or []:
if type_utils.is_parameter_type(output_spec.type):
Expand All @@ -71,8 +71,8 @@ def build_component_spec_from_structure(
output_spec.type)
else:
result.output_definitions.artifacts[
output_spec.name].artifact_type.instance_schema = (
type_utils.get_artifact_type_schema(output_spec.type))
output_spec.name].artifact_type.CopyFrom(
type_utils.get_artifact_type_schema_message(output_spec.type))

return result

Expand Down Expand Up @@ -100,8 +100,8 @@ def build_component_inputs_spec(
elif input_name not in getattr(component_spec.input_definitions,
'parameters', []):
component_spec.input_definitions.artifacts[
input_name].artifact_type.instance_schema = (
type_utils.get_artifact_type_schema(param.param_type))
input_name].artifact_type.CopyFrom(
type_utils.get_artifact_type_schema_message(param.param_type))


def build_component_outputs_spec(
Expand All @@ -122,8 +122,8 @@ def build_component_outputs_spec(
elif output_name not in getattr(component_spec.output_definitions,
'parameters', []):
component_spec.output_definitions.artifacts[
output_name].artifact_type.instance_schema = (
type_utils.get_artifact_type_schema(param.param_type))
output_name].artifact_type.CopyFrom(
type_utils.get_artifact_type_schema_message(param.param_type))


def build_task_inputs_spec(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
title: kfp.ClassificationMetrics
title: system.ClassificationMetrics
type: object
properties:
auPrc:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/kfp/dsl/type_schemas/metrics.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
title: kfp.Metrics
title: system.Metrics
type: object
properties:
accuracy:
Expand Down
21 changes: 15 additions & 6 deletions sdk/python/kfp/dsl/type_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@
from kfp.components import structures
from kfp.pipeline_spec import pipeline_spec_pb2
from kfp.dsl import artifact
from kfp.dsl import artifact_utils
from kfp.dsl import ontology_artifacts
from kfp.dsl import io_types

# ComponentSpec I/O types to (IR) PipelineTaskSpec I/O types mapping.
# The keys are normalized (lowercased). These are types viewed as Artifacts.
# The values are the corresponding IR artifact ontology types.
# TODO: migrate/merge other ontology_artifacts types to io_types
_ARTIFACT_TYPES_MAPPING = {
'model':
ontology_artifacts.Model.get_artifact_type(),
'dataset':
ontology_artifacts.Dataset.get_artifact_type(),
'metrics':
ontology_artifacts.Metrics.get_artifact_type(),
artifact_utils.read_schema_file('metrics.yaml'),
'classificationmetrics':
ontology_artifacts.ClassificationMetrics.get_artifact_type(),
artifact_utils.read_schema_file('classification_metrics.yaml'),
'slicedclassificationmetrics':
ontology_artifacts.SlicedClassificationMetrics.get_artifact_type(),
}
Expand All @@ -42,9 +45,9 @@
'dataset':
ontology_artifacts.Dataset,
'metrics':
ontology_artifacts.Metrics,
io_types.Metrics,
'classificationmetrics':
ontology_artifacts.ClassificationMetrics,
io_types.ClassificationMetrics,
'slicedclassificationmetrics':
ontology_artifacts.SlicedClassificationMetrics,
}
Expand Down Expand Up @@ -106,8 +109,14 @@ def get_artifact_type_schema_message(
type_name: str) -> pipeline_spec_pb2.ArtifactTypeSchema:
"""Gets the IR I/O artifact type msg for the given ComponentSpec I/O type."""
if isinstance(type_name, str):
return _ARTIFACT_CLASSES_MAPPING.get(type_name.lower(),
artifact.Artifact).get_ir_type()
artifact_class = _ARTIFACT_CLASSES_MAPPING.get(type_name.lower(),
artifact.Artifact)
# TODO: migrate all types to system. namespace.
if artifact_class.TYPE_NAME.startswith('system.'):
return pipeline_spec_pb2.ArtifactTypeSchema(
schema_title=artifact_class.TYPE_NAME)
else:
return artifact_class.get_ir_type()
else:
return artifact.Artifact.get_ir_type()

Expand Down
13 changes: 7 additions & 6 deletions sdk/python/kfp/dsl/type_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ def test_get_artifact_type_schema(self):
'title: kfp.Model' in type_utils.get_artifact_type_schema('Model'))
self.assertTrue(
'title: kfp.Dataset' in type_utils.get_artifact_type_schema('Dataset'))
print(type_utils.get_artifact_type_schema('Metrics'))
self.assertTrue(
'title: kfp.Metrics' in type_utils.get_artifact_type_schema('Metrics'))
self.assertTrue('title: kfp.ClassificationMetrics' in type_utils
'system.Metrics' in type_utils.get_artifact_type_schema('Metrics'))
self.assertTrue('system.ClassificationMetrics' in type_utils
.get_artifact_type_schema('ClassificationMetrics'))
self.assertTrue('title: kfp.SlicedClassificationMetrics' in type_utils
.get_artifact_type_schema('SlicedClassificationMetrics'))
Expand All @@ -61,11 +62,11 @@ def test_get_artifact_type_schema_message(self):
'title: kfp.Dataset' in type_utils.get_artifact_type_schema_message(
'Dataset').instance_schema)
self.assertTrue(
'title: kfp.Metrics' in type_utils.get_artifact_type_schema_message(
'Metrics').instance_schema)
self.assertTrue('title: kfp.ClassificationMetrics' in
'system.Metrics' in type_utils.get_artifact_type_schema_message(
'Metrics').schema_title)
self.assertTrue('system.ClassificationMetrics' in
type_utils.get_artifact_type_schema_message(
'ClassificationMetrics').instance_schema)
'ClassificationMetrics').schema_title)
self.assertTrue('title: kfp.SlicedClassificationMetrics' in
type_utils.get_artifact_type_schema_message(
'SlicedClassificationMetrics').instance_schema)
Expand Down
87 changes: 78 additions & 9 deletions sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from kfp.dsl import component_spec as dsl_component_spec
from kfp.dsl import dsl_utils
from kfp.dsl import importer_node
from kfp.dsl import io_types
from kfp.dsl import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2

Expand Down Expand Up @@ -66,8 +67,8 @@ def my_pipeline(a: int = 1, b: str = "default value"):
kfp.v2.compiler.Compiler().compile(my_pipeline, 'path/to/pipeline.json')
"""

def _get_groups_for_ops(
self, root_group: dsl.OpsGroup) -> Dict[str, List[dsl.OpsGroup]]:
def _get_groups_for_ops(self,
root_group: dsl.OpsGroup) -> Dict[str, List[str]]:
"""Helper function to get groups that contain the specified ops.
Each pipeline has a root group. Each group has a list of operators (leaf)
Expand Down Expand Up @@ -107,7 +108,7 @@ def _get_op_groups_helper(

#TODO: combine with the _get_groups_for_ops
def _get_groups_for_opsgroups(
self, root_group: dsl.OpsGroup) -> Dict[str, List[dsl.OpsGroup]]:
self, root_group: dsl.OpsGroup) -> Dict[str, List[str]]:
"""Helper function to get groups that contain the specified opsgroup.
Each pipeline has a root group. Each group has a list of operators (leaf)
Expand Down Expand Up @@ -160,8 +161,8 @@ def _get_groups_helper(group):

def _get_uncommon_ancestors(
self,
op_groups: Dict[str, List[dsl.OpsGroup]],
opsgroup_groups: Dict[str, List[dsl.OpsGroup]],
op_groups: Dict[str, List[str]],
opsgroup_groups: Dict[str, List[str]],
op1: dsl.BaseOp,
op2: dsl.BaseOp,
) -> Tuple[List[_GroupOrOp], List[_GroupOrOp]]:
Expand Down Expand Up @@ -260,8 +261,8 @@ def _get_inputs_outputs(
pipeline: dsl.Pipeline,
args: List[dsl.PipelineParam],
root_group: dsl.OpsGroup,
op_groups: Dict[str, List[dsl.OpsGroup]],
opsgroup_groups: Dict[str, List[dsl.OpsGroup]],
op_groups: Dict[str, List[str]],
opsgroup_groups: Dict[str, List[str]],
condition_params: Dict[str, dsl.PipelineParam],
op_name_to_for_loop_op: Dict[str, dsl.ParallelFor],
) -> Tuple[Dict[str, List[Tuple[dsl.PipelineParam, str]]], Dict[
Expand Down Expand Up @@ -405,8 +406,8 @@ def _get_dependencies(
self,
pipeline: dsl.Pipeline,
root_group: dsl.OpsGroup,
op_groups: Dict[str, dsl.OpsGroup],
opsgroups_groups: Dict[str, dsl.OpsGroup],
op_groups: Dict[str, List[str]],
opsgroups_groups: Dict[str, List[str]],
opsgroups: Dict[str, dsl.OpsGroup],
condition_params: Dict[str, dsl.PipelineParam],
) -> Dict[str, List[_GroupOrOp]]:
Expand Down Expand Up @@ -625,6 +626,60 @@ def _update_loop_specs(
dsl_component_spec.pop_input_from_component_spec(group_component_spec,
loop_argument_name)

def _populate_metrics_in_dag_outputs(
self,
ops: List[dsl.ContainerOp],
op_to_parent_groups: Dict[str, List[str]],
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
) -> None:
"""Populates metrics artifacts in dag outputs.
Args:
ops: The list of ops that may produce metrics outputs.
op_to_parent_groups: The dict of op name to parent groups. Key is the op's
name. Value is a list of ancestor groups including the op itself. The
list of a given op is sorted in a way that the farthest group is the
first and the op itself is the last.
pipeline_spec: The pipeline_spec to update in-place.
"""
for op in ops:
op_task_spec = getattr(op, 'task_spec',
pipeline_spec_pb2.PipelineTaskSpec())
op_component_spec = getattr(op, 'component_spec',
pipeline_spec_pb2.ComponentSpec())

# Get the component spec for all its parent groups.
parent_groups_component_specs = [pipeline_spec.root]
# skip the op itself and the root group which cannot be retrived via name.
for group_name in op_to_parent_groups[op.name][1:-1]:
component_name = dsl_utils.sanitize_component_name(group_name)
parent_groups_component_specs.append(
pipeline_spec.components[component_name])
# Reverse the order to make the farthest group in the end.
parent_groups_component_specs.reverse()

for output_name, artifact_spec in \
op_component_spec.output_definitions.artifacts.items():

if artifact_spec.artifact_type.WhichOneof(
'kind'
) == 'schema_title' and artifact_spec.artifact_type.schema_title in [
io_types.Metrics.TYPE_NAME,
io_types.ClassificationMetrics.TYPE_NAME,
]:
unique_output_name = '{}-{}'.format(op_task_spec.task_info.name,
output_name)

for group_component_spec in parent_groups_component_specs:
group_component_spec.output_definitions.artifacts[
unique_output_name].CopyFrom(artifact_spec)
group_component_spec.dag.outputs.artifacts[
unique_output_name].artifact_selectors.append(
pipeline_spec_pb2.DagOutputsSpec.ArtifactSelectorSpec(
producer_subtask=op_task_spec.task_info.name,
output_artifact_key=output_name,
))

def _group_to_dag_spec(
self,
group: dsl.OpsGroup,
Expand All @@ -634,6 +689,7 @@ def _group_to_dag_spec(
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
deployment_config: pipeline_spec_pb2.PipelineDeploymentConfig,
rootgroup_name: str,
op_to_parent_groups: Dict[str, List[str]],
) -> None:
"""Generate IR spec given an OpsGroup.
Expand All @@ -649,6 +705,10 @@ def _group_to_dag_spec(
deployment_config: The deployment_config to hold all executors.
rootgroup_name: The name of the group root. Used to determine whether the
component spec for the current group should be the root dag.
op_to_parent_groups: The dict of op name to parent groups. Key is the op's
name. Value is a list of ancestor groups including the op itself. The
list of a given op is sorted in a way that the farthest group is the
first and the op itself is the last.
"""
group_component_name = dsl_utils.sanitize_component_name(group.name)

Expand All @@ -664,6 +724,7 @@ def _group_to_dag_spec(
pipeline_spec_pb2.PipelineTaskSpec())
subgroup_component_spec = getattr(subgroup, 'component_spec',
pipeline_spec_pb2.ComponentSpec())

is_loop_subgroup = (isinstance(group, dsl.ParallelFor))
is_recursive_subgroup = (
isinstance(subgroup, dsl.OpsGroup) and subgroup.recursive_ref)
Expand Down Expand Up @@ -874,6 +935,13 @@ def _group_to_dag_spec(
pipeline_spec.deployment_spec.update(
json_format.MessageToDict(deployment_config))

# Surface metrics outputs to the top.
self._populate_metrics_in_dag_outputs(
group.ops,
op_to_parent_groups,
pipeline_spec,
)

def _create_pipeline_spec(
self,
args: List[dsl.PipelineParam],
Expand Down Expand Up @@ -940,6 +1008,7 @@ def _create_pipeline_spec(
pipeline_spec,
deployment_config,
root_group.name,
op_name_to_parent_groups,
)

return pipeline_spec
Expand Down
Loading

0 comments on commit a804211

Please sign in to comment.