Example that shows how `pseudo_tuple_component.py` works.

In [4]:
import json
from datetime import datetime
from typing import NamedTuple

from google.cloud import aiplatform
from google.oauth2 import service_account
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import Artifact, Input, Output, pipeline

from pseudo_tuple_component import PseudoTuple, pseudo_tuple_component


with open("vertex_config.json", "r") as f:
    gcp_cfg = json.load(f)  # I put GCP related stuff in here
credentials = service_account.Credentials.from_service_account_file(
    gcp_cfg["credentials_path"]
)

### Does not work:

```python
@dsl.component
def my_transformer_op(item: str) -> str:
    return item + "_transformed"
 
@dsl.component
def my_aggregator_op(args: list) -> str:
    return " ".join(args)

@dsl.pipeline("aggtest", "agg test")
def dynamic_pipeline():
    transformed_vals = []
    for x in ["a", "b", "c"]:
        transformed_vals.append(my_transformer_op(x))
    my_aggregator_op([x.output for x in transformed_vals])

compiler.Compiler().compile(pipeline_func=dynamic_pipeline, package_path="my_pipeline.yaml")
```

Output:

```
TypeError: Object of type PipelineParam is not JSON serializable
```

### The `pseudo_tuple_component` workaround does work:

In [5]:
MY_LIST = ["a", "b", "c"]
PIPELINE_NAME = "pseudo-tuple-example"

@dsl.component
def my_transformer_op(item: str) -> str:
    return item + "_transformed"
 
@pseudo_tuple_component(globals_=globals(), locals_=locals())
def my_aggregator_op(args: PseudoTuple(len(MY_LIST), str)) -> str:
    return " ".join(args)

@dsl.pipeline("aggtest", "agg test")
def dynamic_pipeline():
    transformed_vals = []
    for x in MY_LIST:
        transformed_vals.append(my_transformer_op(x).output)
    my_aggregator_op(*transformed_vals)

compiler.Compiler().compile(pipeline_func=dynamic_pipeline, package_path=f"{PIPELINE_NAME}.json")

In [23]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [None]:
job = aiplatform.PipelineJob(
    display_name=f"{PIPELINE_NAME}_job",
    credentials=credentials,
    template_path=f"{PIPELINE_NAME}.json",
    job_id=f"{PIPELINE_NAME}-{TIMESTAMP}",
    pipeline_root=gcp_cfg["pipeline_root"],
    enable_caching=True,
    project=gcp_cfg["project_id"],
    location=gcp_cfg["region"],
)
job.submit(
    service_account=gcp_cfg["service_account"], experiment=gcp_cfg["experiment_name"]
)

<p align="center">
  <img src="./pseudo_tuple_example.png" width=800>
</p>
