Skip to content

Commit

Permalink
Now pipeline function takes direct default values rather than dsp.Pip…
Browse files Browse the repository at this point in the history
…elineParam. (#110)

* Now pipeline function takes direct default values rather than dsp.PipelineParam. It simplifies the sample code a lot.

* Remove extraneous parenthesis.

* Follow up CR comments.

* Change Dockerfile (not done).

* Fix dockerfile.

* Fix Dockerfile again.

* Remove unneeded installation of packages in Dockerfile.
  • Loading branch information
qimingj authored and k8s-ci-robot committed Nov 27, 2018
1 parent cc38ca0 commit 0b7120c
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 86 deletions.
18 changes: 14 additions & 4 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ COPY . .
RUN apk add --update gcc musl-dev
RUN go build -o /bin/apiserver backend/src/apiserver/*.go

FROM python:3.5.0-slim as compiler
FROM python:3.5 as compiler

RUN apt-get update -y && \
apt-get install --no-install-recommends -y -q default-jdk wget

RUN pip3 install setuptools==40.5.0

RUN wget http://central.maven.org/maven2/io/swagger/swagger-codegen-cli/2.3.1/swagger-codegen-cli-2.3.1.jar -O /tmp/swagger-codegen-cli.jar

WORKDIR /go/src/github.com/kubeflow/pipelines
COPY . .
WORKDIR /go/src/github.com/kubeflow/pipelines/sdk/python
RUN ./build.sh /kfp.tar.gz
RUN pip3 install /kfp.tar.gz

# This is hard coded to 0.0.26. Once kfp DSK release process is automated,
# we can dynamically refer to the version from same commit SHA.
RUN pip install https://storage.googleapis.com/ml-pipeline/release/0.0.26/kfp-0.0.26.tar.gz --upgrade
WORKDIR /samples
COPY ./samples .
RUN find . -maxdepth 2 -name "*.py" -exec dsl-compile --py {} --output {}.tar.gz \;
Expand Down
3 changes: 1 addition & 2 deletions samples/basic/exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
name='Exit Handler',
description='Download a message and print it out. Exit Handler will run at the end.'
)
def download_and_print(
url=dsl.PipelineParam(name='url', value='gs://ml-pipeline-playground/shakespeare1.txt')):
def download_and_print(url='gs://ml-pipeline-playground/shakespeare1.txt'):
"""A sample pipeline showing exit handler."""

exit_op = dsl.ContainerOp(
Expand Down
4 changes: 2 additions & 2 deletions samples/basic/parallel_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
description='Download two messages in parallel and print the concatenated result.'
)
def download_and_join(
url1=dsl.PipelineParam(name='url1', value='gs://ml-pipeline-playground/shakespeare1.txt'),
url2=dsl.PipelineParam(name='url2', value='gs://ml-pipeline-playground/shakespeare2.txt')):
url1='gs://ml-pipeline-playground/shakespeare1.txt',
url2='gs://ml-pipeline-playground/shakespeare2.txt'):
"""A three-step pipeline with first two running in parallel."""

download1 = dsl.ContainerOp(
Expand Down
3 changes: 1 addition & 2 deletions samples/basic/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
name='Sequential',
description='A pipeline with two sequential steps.'
)
def sequential_pipeline(
url=dsl.PipelineParam(name='url', value='gs://ml-pipeline-playground/shakespeare1.txt')):
def sequential_pipeline(url='gs://ml-pipeline-playground/shakespeare1.txt'):
"""A pipeline with two sequential steps."""

op1 = dsl.ContainerOp(
Expand Down
24 changes: 12 additions & 12 deletions samples/kubeflow-tf/kubeflow-training-classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ def confusion_matrix_op(predictions, output, step_name='confusionmatrix'):
name='Pipeline TFJob',
description='Demonstrate the DSL for TFJob'
)
def kubeflow_training( output: dsl.PipelineParam, project: dsl.PipelineParam,
evaluation: dsl.PipelineParam=dsl.PipelineParam(name='evaluation', value='gs://ml-pipeline-playground/flower/eval100.csv'),
train: dsl.PipelineParam=dsl.PipelineParam(name='train', value='gs://ml-pipeline-playground/flower/train200.csv'),
schema: dsl.PipelineParam=dsl.PipelineParam(name='schema', value='gs://ml-pipeline-playground/flower/schema.json'),
learning_rate: dsl.PipelineParam=dsl.PipelineParam(name='learningrate', value=0.1),
hidden_layer_size: dsl.PipelineParam=dsl.PipelineParam(name='hiddenlayersize', value='100,50'),
steps: dsl.PipelineParam=dsl.PipelineParam(name='steps', value=2000),
target: dsl.PipelineParam=dsl.PipelineParam(name='target', value='label'),
workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=0),
pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=0),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(name='preprocessmode', value='local'),
predict_mode: dsl.PipelineParam=dsl.PipelineParam(name='predictmode', value='local')):
def kubeflow_training(output, project,
evaluation='gs://ml-pipeline-playground/flower/eval100.csv',
train='gs://ml-pipeline-playground/flower/train200.csv',
schema='gs://ml-pipeline-playground/flower/schema.json',
learning_rate=0.1,
hidden_layer_size='100,50',
steps=2000,
target='label',
workers=0,
pss=0,
preprocess_mode='local',
predict_mode='local'):
# TODO: use the argo job name as the workflow
workflow = '{{workflow.name}}'

Expand Down
48 changes: 17 additions & 31 deletions samples/tfx/taxi-cab-classification-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,37 +119,23 @@ def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='dep
description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'
)
def taxi_cab_classification(
output: dsl.PipelineParam,
project: dsl.PipelineParam,

column_names: dsl.PipelineParam=dsl.PipelineParam(
name='column-names',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json'),
key_columns: dsl.PipelineParam=dsl.PipelineParam(
name='key-columns',
value='trip_start_timestamp'),
train: dsl.PipelineParam=dsl.PipelineParam(
name='train',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv'),
evaluation: dsl.PipelineParam=dsl.PipelineParam(
name='evaluation',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv'),
validation_mode: dsl.PipelineParam=dsl.PipelineParam(
name='validation-mode', value='local'),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(
name='preprocess-mode', value='local'),
preprocess_module: dsl.PipelineParam=dsl.PipelineParam(
name='preprocess-module',
value='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py'),
target: dsl.PipelineParam=dsl.PipelineParam(
name='target', value='tips'),
learning_rate: dsl.PipelineParam=dsl.PipelineParam(name='learning-rate', value=0.1),
hidden_layer_size: dsl.PipelineParam=dsl.PipelineParam(name='hidden-layer-size', value='1500'),
steps: dsl.PipelineParam=dsl.PipelineParam(name='steps', value=3000),
predict_mode: dsl.PipelineParam=dsl.PipelineParam(name='predict-mode', value='local'),
analyze_mode: dsl.PipelineParam=dsl.PipelineParam(name='analyze-mode', value='local'),
analyze_slice_column: dsl.PipelineParam=dsl.PipelineParam(
name='analyze-slice-column', value='trip_start_hour')):
output,
project,
column_names='gs://ml-pipeline-playground/tfx/taxi-cab-classification/column-names.json',
key_columns='trip_start_timestamp',
train='gs://ml-pipeline-playground/tfx/taxi-cab-classification/train.csv',
evaluation='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv',
validation_mode='local',
preprocess_mode='local',
preprocess_module='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py',
target='tips',
learning_rate=0.1,
hidden_layer_size='1500',
steps=3000,
predict_mode='local',
analyze_mode='local',
analyze_slice_column='trip_start_hour'):

validation_output = '%s/{{workflow.name}}/validation' % output
transform_output = '%s/{{workflow.name}}/transformed' % output
training_output = '%s/{{workflow.name}}/train' % output
Expand Down
10 changes: 1 addition & 9 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,6 @@ def _create_pipeline_workflow(self, args, pipeline):
workflow['spec']['volumes'] = volumes
return workflow

def _validate_args(self, argspec):
if argspec.defaults:
for value in argspec.defaults:
if not issubclass(type(value), dsl.PipelineParam):
raise ValueError(
'Default values of argument has to be type dsl.PipelineParam or its child.')

def _validate_exit_handler(self, pipeline):
"""Makes sure there is only one global exit handler.
Expand All @@ -471,7 +464,6 @@ def _compile(self, pipeline_func):
"""Compile the given pipeline function into workflow."""

argspec = inspect.getfullargspec(pipeline_func)
self._validate_args(argspec)

registered_pipeline_functions = dsl.Pipeline.get_pipeline_functions()
if pipeline_func not in registered_pipeline_functions:
Expand All @@ -494,7 +486,7 @@ def _compile(self, pipeline_func):
for arg_name in argspec.args]
if argspec.defaults:
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)):
arg.value = default.value
arg.value = default.value if isinstance(default, dsl.PipelineParam) else default

workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow
Expand Down
19 changes: 0 additions & 19 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,25 +168,6 @@ def test_composing_workflow(self):
shutil.rmtree(tmpdir)
# print(tmpdir)

def test_invalid_pipelines(self):
"""Test invalid pipelines."""

@dsl.pipeline(
name='name',
description='description'
)
def invalid_param_defaults(message, outputpath='something'):
pass

with self.assertRaises(ValueError):
compiler.Compiler()._compile(invalid_param_defaults)

def missing_decoration(message: dsl.PipelineParam):
pass

with self.assertRaises(ValueError):
compiler.Compiler()._compile(missing_decoration)

def test_package_compile(self):
"""Test compiling python packages."""

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/compiler/testdata/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, name):
name='Save Most Frequent',
description='Get Most Frequent Word and Save to GCS'
)
def save_most_frequent_word(message: dsl.PipelineParam, outputpath: dsl.PipelineParam):
def save_most_frequent_word(message: str, outputpath: str):
"""A pipeline function describing the orchestration of the workflow."""

exit_op = ExitHandlerOp('exiting')
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/compiler/testdata/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ def __init__(self, name, url):
name='Download and Save Most Frequent',
description='Download and Get Most Frequent Word and Save to GCS'
)
def download_save_most_frequent_word(url: dsl.PipelineParam, outputpath: dsl.PipelineParam):
def download_save_most_frequent_word(url: str, outputpath: str):
downloader = DownloadMessageOp('download', url)
save_most_frequent_word(downloader.output, outputpath)
3 changes: 1 addition & 2 deletions sdk/python/tests/compiler/testdata/default_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
name='Default Value',
description='A pipeline with parameter and default value.'
)
def default_value_pipeline(
url=dsl.PipelineParam(name='url', value='gs://ml-pipeline/shakespeare1.txt')):
def default_value_pipeline(url='gs://ml-pipeline/shakespeare1.txt'):

# "url" is a pipeline parameter, meaning users can provide values when running the
# pipeline using UI, CLI, or API to override the default value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ def __init__(self, name, url):
name='Download and Save Most Frequent',
description='Download and Get Most Frequent Word and Save to GCS'
)
def download_save_most_frequent_word(url: dsl.PipelineParam, outputpath: dsl.PipelineParam):
def download_save_most_frequent_word(url: str, outputpath: str):
downloader = DownloadMessageOp('download', url)
save_most_frequent_word(downloader.output, outputpath)

0 comments on commit 0b7120c

Please sign in to comment.