Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk] Cannot pass a GCSPath output as an input to a python function component yaml spec #8936

Closed
EdMorrell opened this issue Mar 7, 2023 · 4 comments
Assignees
Labels
area/sdk kind/bug lifecycle/stale The issue / pull request is stale, any activities remove this label.

Comments

@EdMorrell
Copy link

Environment

  • KFP version: 1.8.11

Steps to reproduce

I have a Docker container component with the following yaml specification:

name: Training
description: <Component-Description>
inputs:
  - {name: query_path, type: String}
  - {name: config_path, type: String, optional: True, default: "config.yaml"}
outputs:
  - {name: model_output_path, type: {GCSPath: {"openapi_schema_validator": {"type": "string", "pattern": "^gs://.*$"}}}}
implementation:
  container:
    image: <docker-image>
    command: [python3, train.py, --query_path, {inputValue: query_path}, --config_path, {inputValue: config_path}, --model_output_path, {outputPath: model_output_path}]

and a python function component with the following definition:

from kfp.v2 import dsl

@dsl.component(packages_to_install=["google-cloud-aiplatform==1.15.1"], base_image="python:3.8")
def upload_model(
    model_path: dsl.Input[dsl.Artifact],
    project_id: str,
    region: str,
    model_display_name: str,
    serving_container_uri: str,
    container_port: int,
    upload_new_model: bool = False,
) -> str:
    """Uploads a model input artifact to the Model registry. By default
    this component will upload the model as a new version of an existing
    model with the same display_name it there is one, or create a new
    model if not. Override this behaviour by setting upload_new_model
    to True to upload the model as new version.

    Args:
        model_path (dsl.Input[dsl.Artifact]): Input artifact with a URI attribute
            that points the model directory in GCS.
        project_id (str): Project to upload the model to.
        region (str): Region to upload the model in.
        model_display_name (str): Name of uploaded model. If uploading the model as
            a new version of an existing model then must have the same display name.
        serving_container_uri (str): URI of model serving container.
        container_port (int): Port to expose in the serving container.
        upload_new_model (bool, optional): Boolean indicating whether to upload
            the model as a new model or a new version of an existing model.
            Defaults to False (i.e. new version of existing model).

    Returns:
        str: Resource ID of uploaded model
    """
    from google.cloud import aiplatform

    aiplatform.init(project=project_id, location=region)

    # Check for existing parent model
    models = aiplatform.Model.list(filter=f"display_name={model_display_name}")

    if not models or upload_new_model:
        model = aiplatform.Model.upload(
            display_name=model_display_name,
            artifact_uri=model_path,
            serving_container_image_uri=serving_container_uri,
            serving_container_ports=[container_port],
        )
    else:
        model = aiplatform.Model.upload(
            display_name=model_display_name,
            artifact_uri=model_path,
            parent_model=models[0].resource_name,
            serving_container_image_uri=serving_container_uri,
            serving_container_ports=[container_port],
        )

    model.wait()

    return model.name

These two components are linked in a pipeline as follows:

@dsl.pipeline(name="<pipeline-name>")
def training_pipeline(
    project_id: str,
    region: str,
    ingestion_query_path: str,
    preprocessing_query_path: str,
    training_query_path: str,
    serving_container_image_uri: str,
    model_display_name: str,
    serving_container_port: int,
) -> None:
    """
    Args:
        project_id (str): ID of project to run pipeline in
        region (str): Region to run pipeline in
        ingestion_query_path (str): Path to query for ingesting source data from
            BigQuery
        preprocessing_query_path (str): Path to query for ingesting initially
            processed data from BigQuery
        training_query_path (str): Path to preprocessed data for training.
        serving_container_image_uri (str): Path to container in artifact registry
            for serving model predictions.
        model_display_name (str): Name to upload model to model registry under. If
            uploading the model as a new version of an existing model then use the
            same name as the existing model.
        serving_container_port (int): Port to expose in the serving container.
    """

    ingest_op = DATA_INGESTION_OP(query_path=ingestion_query_path)

    preprocess_op = PREPROCESS_OP(query_path=preprocessing_query_path).after(ingest_op)

    train_op = TRAIN_OP(query_path=training_query_path).after(preprocess_op)

    _ = UPLOAD_MODEL_OP(
        model_path=train_op.outputs["model_output_path"],
        project_id=project_id,
        region=region,
        model_display_name=model_display_name,
        serving_container_uri=serving_container_image_uri,
        container_port=serving_container_port,
        upload_new_model=False,
    )

Expected result

The above code works fine if I call the Python-function component directly, however if I compile it to a yaml file using create_component_from_func I get the following error:

TypeError: Input "model_path" with type "Artifact" cannot be paired with InputValuePlaceholder

I have tried in replace of the above type annotations:

  • GCSPath (from the dsl v1 types).
  • InputPath[]: Trialling every possible type I can think of (Artifact, GCSPath, Model)

Every possible combination has failed to work and I don't seem to be able to pass the training component GCSPath output to the next component. How is it possible to do this?

Unfortunately I need to rely on this component being saved as yaml so it can be used in another repo and so cannot simply import as a function.

Impacted by this bug? Give it a 👍.

@chensun
Copy link
Member

chensun commented Mar 16, 2023

Are you calling create_component_from_func on the pipeline function training_pipeline? That method is expected to work on Python function not decorated via any kfp decorator (dsl.component or dsl.pipeline)

Also how did you get UPLOAD_MODEL_OP, is that from upload_model function shown above? and what's the definition/assignment of UPLOAD_MODEL_OP look like?

@EdMorrell
Copy link
Author

Thanks for your reply. I've tried applying create_component_from_func to both decorated and non-decorated python functions but this doesn't make a difference either way.

To get UPLOAD_MODEL_OP from upload_model, I called kfp.components.load_component_from_file on the component yaml spec and UPLOAD_MODEL_OP is the output of that function call.

@github-actions
Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Aug 26, 2023
Copy link

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sdk kind/bug lifecycle/stale The issue / pull request is stale, any activities remove this label.
Projects
Status: Closed
Development

No branches or pull requests

2 participants