## Workflow
The pipeline is structured into key phases to streamline the model lifecycle management:

### 1. Candidate Selection
- **Registry Query:** Fetch models tagged with the `series` label and the `default` version alias from the Vertex AI Model Registry.
- **Performance Evaluation:** Execute parallel assessments of candidates focusing on key metrics like accuracy, ROC curves, and confusion matrices.
- **Model Selection:** Identify the model that exhibits superior performance across the evaluation criteria.

### 2. Deployment and Monitoring
- **Endpoint Management:** Evaluate the existing deployment state; if no model is actively deployed, initialize a new endpoint.
- **Model Comparison:** For an active deployment, compare the newly selected model against the current one in terms of traffic and performance metrics.
- **Model Update:** Should the new model outperform the existing deployment, proceed to update the endpoint with the selected model, ensuring optimal performance.


---
## Setup

inputs:

In [85]:
import os
GOOGLE_APPLICATION_CREDENTIALS = "/Users/zacharynguyen/Documents/GitHub/2024/End-to-End-Vertex-AI-Pipeline-for-Fraud-Detection/key/e2e-fraud-detection-debf1c9863af.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GOOGLE_APPLICATION_CREDENTIALS

In [86]:
PROJECT_ID = 'e2e-fraud-detection'
REGION = 'us-central1'
EXPERIMENT = 'pipeline-bqml-best-model'
SERIES = 'bqml'

# source data
BQ_PROJECT = PROJECT_ID
BQ_DATASET = 'fraud_dataset'
BQ_TABLE = 'prepped-data'

# Model Training
VAR_TARGET = 'Class'
VAR_OMIT = 'transaction_id' # add more variables to the string with space delimiters

packages:

In [87]:
from google.cloud import aiplatform
from google.cloud import bigquery
from datetime import datetime

from typing import NamedTuple
from google_cloud_pipeline_components import v1
from kfp import dsl
from kfp.dsl import importer_node
from kfp import compiler
from kfp.dsl import Artifact, Input, Metrics, ClassificationMetrics, HTML, Output, component



clients:

In [88]:
aiplatform.init(project=PROJECT_ID, location=REGION)
bq = bigquery.Client()

parameters:

In [89]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = PROJECT_ID
URI = f"gs://{BUCKET}/{SERIES}/{EXPERIMENT}/pipelines"
DIR = f"temp/{EXPERIMENT}"

In [90]:
SERVICE_ACCOUNT = 'zacharynguyen@e2e-fraud-detection.iam.gserviceaccount.com'
SERVICE_ACCOUNT

'zacharynguyen@e2e-fraud-detection.iam.gserviceaccount.com'

List the service accounts current roles:

In [91]:
#!gcloud projects get-iam-policy $PROJECT_ID --filter="bindings.members:$SERVICE_ACCOUNT" --format='table(bindings.role)' --flatten="bindings[].members"

In [92]:
#!gcloud projects get-iam-policy fraud-mlops --format=json | grep -B 1 -A 2 "zacharynguyen@fraud-mlops.iam.gserviceaccount.com"


>Note: If the resulting list is missing [roles/storage.objectAdmin](https://cloud.google.com/storage/docs/access-control/iam-roles) then [revisit the setup notebook](../00%20-%20Setup/00%20-%20Environment%20Setup.ipynb#permissions) and add this permission to the service account with the provided instructions.

environment:

In [93]:
!rm -rf {DIR}
!mkdir -p {DIR}

---

## Pipeline

This section follows the process I take to build a workflow as a Vertex AI Pipeline.  
1. Create an outline of the workflow
2. Prepare components, custom or prebuilt for each step
3. Define the pipeline
4. Compile and run the pipeline

The build process can be iterative where 2, 3, and 4 and created and tested as iterative steps.

### 1 - Outline Pipeline

Write down in words the flow you want to achieve along with any conditional elements:

- Candidate selection path:
    - Get list of candidate models: Vertex AI Model Registry where labels.series={SERIES} and version_alias=default
    - Loop (async) over list of candidate models
        - Gather Evaluation: log metrics: evaluation, confusion matrix, ROC curve
    - Pick the best candidate model
- Current model review path:
    - Check for endpoint, create if needed
    - Get the deployed model with most traffic 
    - Condition: if there is a deployed model
        - Gather Evaluation: log metrics: evaluation, confusion matrix, ROC curve
    - Condition: if there is not a deployed model
        - deploy best on endpoint
- Compare And update Path:
    - Condition: if best > deployed
        - deploy best on endpoint

### 2 - Prepare Components

#### Component: list_series_models
Get a list of BQML model names that are registred in the Vertex AI Model Registry for the Series

In [94]:
# from kfp.v2.dsl import Artifact, Input, Metrics, Output, component
@component(
    base_image = "python:3.9",
    packages_to_install = ["google-cloud-aiplatform"]
)
def list_series_models(
    project: str,
    region: str,
    series: str
) -> NamedTuple('outputs', [('candidates', list)]):

    # setup
    from collections import namedtuple
    result = namedtuple('outputs', ['candidates'])
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    # get list of candidate models for series
    #candidates = [f"{model.name}" for model in aiplatform.Model.list(filter = f"labels.series={series}")]
    candidates = [f"{model.labels['series']}_{model.labels['experiment']}_{model.labels['timestamp']}" for model in aiplatform.Model.list(filter = f"labels.series={series}")]

    return result(candidates)

#### Component: bqml_evaluate
Gather evaluation metrics for a specified BQML model: metrics, confusion matric, ROC curve

In [95]:
# from kfp.v2.dsl import Artifact, Input, Metrics, Output, component
@component(
    base_image = "python:3.9",
    packages_to_install = ['pandas', 'db-dtypes', 'google-cloud-bigquery', 'google-cloud-storage']
)
def bqml_evaluate(
    project: str,
    region: str,
    bq_project: str,
    bq_dataset: str,
    bq_model: str,
    bq_test_table: str,
    bq_results: str,
    best_metric: str,
    metrics: Output[Metrics],
    class_metrics: Output[ClassificationMetrics]
) -> NamedTuple('outputs', [('best_metric', float)]):

    # setup
    import json
    from google.cloud import bigquery
    bq = bigquery.Client(project = bq_project)
    from google.cloud import storage
    gcs = storage.Client(project = project)
    from collections import namedtuple
    result = namedtuple('outputs', ['best_metric'])
    
    # BQML: ML.EVALUTE
    query = f"""
    SELECT *
    FROM ML.EVALUATE (
        MODEL `{bq_project}.{bq_dataset}.{bq_model}`,
        (SELECT * FROM `{bq_test_table}`)
    )
    """
    bq_eval = bq.query(query = query).to_dataframe()
    bq_eval = bq_eval.to_dict(orient = 'records')[0]
    for key in bq_eval:
        metrics.log_metric(key, bq_eval[key])

    # BQML: ML.CONFUSION_MATRIX
    query = f"""
    SELECT *
    FROM ML.CONFUSION_MATRIX (
        MODEL `{bq_project}.{bq_dataset}.{bq_model}`,
        (SELECT * FROM `{bq_test_table}`)
    )
    """
    bq_cm = bq.query(query = query).to_dataframe()
    classes = ['Not Fraud', 'Fraud']
    # ignore the 'trial_id' column that is included when hyperparameter tuning is used and skip the first=label column
    matrix = bq_cm.loc[:, bq_cm.columns != 'trial_id'].iloc[:, 1:].values.tolist()
    class_metrics.log_confusion_matrix(classes, matrix)

    # BQML: ML.ROC_CURVE
    query = f"""
    SELECT *
    FROM ML.ROC_CURVE (
        MODEL `{bq_project}.{bq_dataset}.{bq_model}`,
        (SELECT * FROM `{bq_test_table}`)
    )
    """
    bq_roc = bq.query(query = query).to_dataframe()
    class_metrics.log_roc_curve(
        bq_roc['false_positive_rate'].tolist(),
        bq_roc['recall'].tolist(),
        bq_roc['threshold'].tolist()
    )
    
    # save bq_eval results to common file {'candidate': bq_eval}
    file = f"bq_eval_{bq_model}.json"
    bq_eval['candidate'] = bq_model
    with open(file, 'w') as fp:
        json.dump(bq_eval, fp)
    
    bucket = gcs.bucket(project)
    path = bq_results.split(f'gs://{project}/')[-1]
    blob = bucket.blob(f"{path}/{file}")
    blob.upload_from_filename(f"{file}")
    
    return result(bq_eval[best_metric])

#### Component: best_candidate

In [96]:
# from kfp.v2.dsl import Artifact, Input, Metrics, Output, component
from typing import NamedTuple

@component(
    base_image="python:3.9",
    packages_to_install=['pandas', 'google-cloud-storage', 'pretty_html_table']
)
def best_candidate(
    project: str,
    region: str,
    bq_results: str,
    best_metric: str,
    candidates: list,
    metrics: Output[Artifact]  # Adjusted from HTML to Artifact for generalization
) -> NamedTuple('outputs', [('best_candidate', str), ('best_metric', float)]):

    # setup
    import json
    import pandas as pd
    from pretty_html_table import build_table
    from google.cloud import storage
    from collections import namedtuple
    import os

    gcs = storage.Client(project=project)
    Result = namedtuple('outputs', ['best_candidate', 'best_metric'])

    # retrieve bq_results to a list of dictionaries: bq_evals = [{}, {}, ...]
    path = bq_results.split(f'gs://{project}/')[-1]
    bucket = gcs.bucket(project)
    blobs = list(bucket.list_blobs(prefix=path))

    if not blobs:
        raise ValueError(f"No files found in {bq_results}. Ensure the path is correct and files exist.")

    candidate_evals = []
    for blob in blobs:
        _, filename = os.path.split(blob.name)
        blob.download_to_filename(filename)
        with open(filename, 'r') as fp:
            evals = json.load(fp)
        # Ensure the file contains 'candidate' key and it's in the list of candidates
        if 'candidate' in evals and evals['candidate'] in candidates:
            candidate_evals.append(evals)

    if not candidate_evals:
        raise ValueError("No evaluation data found for the specified candidates.")

    # convert list of dictionaries to pandas dataframe:
    df_candidates = pd.DataFrame(candidate_evals)

    # Ensure best_metric column exists in the dataframe
    if best_metric not in df_candidates.columns:
        raise ValueError(f"The best_metric '{best_metric}' is not available in the evaluation data.")

    # pick best candidate based on best_metric:
    ascending = best_metric in ['log_loss']
    df_candidates['best'] = df_candidates[best_metric].rank(method='dense', ascending=ascending)
    best_row = df_candidates.loc[df_candidates['best'] == 1].iloc[0]

    # output evals to HTML Table in metrics:
    with open(metrics.path, 'w') as fp:
        fp.write(build_table(df_candidates.drop(columns=['best']), 'blue_light'))

    return Result(best_row['candidate'], best_row[best_metric])


#### Component: get_endpoint

Look for Vertex AI Endpoint for the series and if missing, create it:

In [97]:
# from kfp.v2.dsl import Artifact, Input, Metrics, Output, component
@component(
    base_image = "python:3.9",
    packages_to_install = ['google-cloud-aiplatform']
)
def get_endpoint(
    project: str,
    region: str,
    series: str,
    bq_dataset: str 
) -> NamedTuple('outputs', [('endpoint_resource_name', str)]):

    # setup
    from collections import namedtuple
    result = namedtuple('outputs', ['endpoint_resource_name'])
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    # retrieve endpoint
    endpoints = aiplatform.Endpoint.list(filter = f"labels.series={series}")
    if endpoints:
        endpoint = endpoints[0]
        print(f"Endpoint Exists: {endpoints[0].resource_name}")
    else:
        endpoint = aiplatform.Endpoint.create(
            display_name = f"{series}",
            labels = {'series' : f"{series}"}    
        )
        print(f"Endpoint Created: {endpoint.resource_name}")
    
    return result(endpoint.resource_name)

#### Component: get_deployed_model

Get the models deployed on the endpoint and return one with most/all traffic:

In [98]:
# from kfp.v2.dsl import Artifact, Input, Metrics, Output, component
@component(
    base_image = "python:3.9",
    packages_to_install = ['google-cloud-aiplatform']
)
def get_deployed_model(
    project: str,
    region: str,
    series: str,
    endpoint_resource_name: str,
) -> NamedTuple('outputs', [('model', str)]):

    # setup
    from collections import namedtuple
    result = namedtuple('outputs', ['model'])
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    # retrieve endpoint
    endpoint = aiplatform.Endpoint(endpoint_name = endpoint_resource_name)
    
    # retrieve deployed model with most traffic and get BQML model name
    traffic_split = endpoint.traffic_split
    if traffic_split:
        deployed_model_id = max(traffic_split, key = traffic_split.get)
        if deployed_model_id:
            for model in endpoint.list_models():
                if model.id == deployed_model_id:
                    deployed_model = model.model+f'@{model.model_version_id}'
            deployed_model = aiplatform.Model(model_name = deployed_model)
            bq_model = deployed_model.display_name+f"_{deployed_model.labels['timestamp']}"
        else: bq_model = 'none'
    else: bq_model = 'none'
    
    return result(bq_model)

#### Component: deploy_candidate

In [99]:
# from kfp.v2.dsl import Artifact, Input, Metrics, Output, component
@component(
    base_image = "python:3.9",
    packages_to_install = ['google-cloud-aiplatform']
)
def deploy_candidate(
    project: str,
    region: str,
    series: str,
    endpoint_resource_name: str,
    bq_model: str
) -> NamedTuple('outputs', [('model_version', str)]):

    DEPLOY_COMPUTE = 'n1-standard-4'
    
    # setup
    from collections import namedtuple
    result = namedtuple('outputs', ['model_version'])
    
    from google.cloud import aiplatform
    aiplatform.init(project = project, location = region)
    
    # retrieve endpoint
    endpoint = aiplatform.Endpoint(endpoint_name = endpoint_resource_name)
    
    # retrieve model
    model_display_name = ('_').join(bq_model.split('_')[0:-1])
    model_timestamp = bq_model.split('_')[-1]
    model_experiment = bq_model.split('_')[1]
    model = aiplatform.Model.list(filter = f"labels.series={series} AND labels.experiment={model_experiment}")[0]
    
    # get all versions of the model:
    client_options = {"api_endpoint": f"{region}-aiplatform.googleapis.com"}
    model_client = aiplatform.gapic.ModelServiceClient(client_options = client_options)
    model_versions = list(model_client.list_model_versions(name = model.resource_name))
    for version in model_versions:
        if version.labels['timestamp'] == model_timestamp:
           model = aiplatform.Model(model_name = version.name) 
    
    # deploy the candidate model to the endpoint with 100% traffic
    endpoint.deploy(
        model = model,
        deployed_model_display_name = model.display_name,
        traffic_percentage = 100,
        machine_type = DEPLOY_COMPUTE,
        min_replica_count = 1,
        max_replica_count = 1
    )
    
    # remove models without traffic
    for deployed_model in endpoint.list_models():
        if deployed_model.id in endpoint.traffic_split:
            print(f"Model {deployed_model.display_name} with version {deployed_model.model_version_id} has traffic = {endpoint.traffic_split[deployed_model.id]}")
        else:
            endpoint.undeploy(deployed_model_id = deployed_model.id)
            print(f"Undeploying {deployed_model.display_name} with version {deployed_model.model_version_id} because it has no traffic.")

    return result(model.versioned_resource_name)

### 3 - Define Pipeline

Recall the Outline, notice it is include as comments in the pipeline definition below:
- Candidate selection path:
    - Get list of candidate models: Vertex AI Model Registry where labels.series={SERIES} and version_alias=default
    - Loop (async) over list of candidate models
        - Gather Evaluation: log metrics: evaluation, confusion matrix, ROC curve
    - Pick the best candidate model
- Current model review path:
    - Check for endpoint, create if needed
    - Get the deployed model with most traffic 
    - Condition: if there is a deployed model
        - Gather Evaluation: log metrics: evaluation, confusion matrix, ROC curve
    - Condition: if there is not a deployed model
        - deploy best on endpoint
- Compare And update Path:
    - Condition: if best > deployed
        - deploy best on endpoint

In [100]:
# from kfp import dsl
@dsl.pipeline(
    name = f'series-{SERIES}-endpoint-update',
    description = 'Update endpoint with best model.'
)
def endpoint_update_pipeline(
    project: str,
    region: str,
    series: str,
    experiment: str,
    bq_project: str,
    bq_dataset: str,
    bq_test_table: str,
    bq_results: str,
    best_metric: str
):
    from google_cloud_pipeline_components.types import artifact_types
    from kfp.dsl import importer_node
    
# - Candidate selection path:
    
    candidate_models = list_series_models(
        project = project,
        region = region,
        series = series
    ).set_display_name('List Models in Series').set_caching_options(True)
    
    # - Loop (async) over list of candidate models
    with dsl.ParallelFor(candidate_models.outputs['candidates']) as candidate:
        
        # - Gather Evaluation: log metrics: evaluation, confusion matrix, ROC curve
        candidate_metrics = bqml_evaluate(
            project = project,
            region = region,
            bq_project = bq_project,
            bq_dataset = bq_dataset,
            bq_model = candidate,
            bq_test_table = bq_test_table,
            bq_results = bq_results,
            best_metric = best_metric
        ).set_display_name('Gather Candidate Metrics').set_caching_options(True)
    
    # - Pick the best candidate model    
    best = best_candidate(
        project = project,
        region = region,
        bq_results = bq_results,
        best_metric = best_metric,
        candidates = candidate_models.outputs['candidates']
    ).set_display_name('Pick The Best Candidate').set_caching_options(True)
    
    

# - #Current model review path:
#
    # - Check for endpoint, create if needed
    endpoint = get_endpoint(
        project = project,
        region = region,
        series = series,
        bq_dataset = bq_dataset 
    ).set_display_name('Get the Endpoint').set_caching_options(True)
    
    # - Get the deployed model with most traffic, if any
    current_model = get_deployed_model(
        project = project,
        region = region,
        series = 'bqml_',
        endpoint_resource_name = endpoint.outputs['endpoint_resource_name'] 
    ).set_display_name('Get The Deployed Model').set_caching_options(True)
    #
    # - Condition: if there is a deployed model
    with dsl.Condition(
        current_model.outputs['model'] != 'none',
        name = 'compare_models'
    ):
    
        # - Gather Evaluation: log metrics: evaluation, confusion matrix, ROC curve
        current_metrics = bqml_evaluate(
            project = project,
            region = region,
            bq_project = bq_project,
            bq_dataset = bq_dataset,
            bq_model = current_model.outputs['model'],
            bq_test_table = bq_test_table,
            bq_results = bq_results,
            best_metric = best_metric
        ).set_display_name('Gather Current Metrics').set_caching_options(True)
    
# - #Compare And update Path:
#
        # - Condition: if best > deployed
        with dsl.Condition(
            best.outputs['best_metric'] >= current_metrics.outputs['best_metric'],
            name = 'replace_deployed_model'
        ):
            
            # - deploy best on endpoint
            deploy = deploy_candidate(
                project = project,
                region = region,
                series = series,
                endpoint_resource_name = endpoint.outputs['endpoint_resource_name'],
                bq_model = best.outputs['best_candidate']
            ).set_display_name('Deploy The Candidate Model').set_caching_options(True)
            
    ## - Condition: if there is not a deployed model
    with dsl.Condition(
        current_model.outputs['model'] == 'none',
        name = 'deploy_model'
    ):
        
        # - deploy best on endpoint
        deploy = deploy_candidate(
            project = project,
            region = region,
            series = series,
            endpoint_resource_name = endpoint.outputs['endpoint_resource_name'],
            bq_model = best.outputs['best_candidate']
        ).set_display_name('Deploy The Candidate Model').set_caching_options(True)

  with dsl.Condition(
  with dsl.Condition(
  with dsl.Condition(


### 4 - Compile And Run Pipeline

#### Collect Inputs

In [101]:
bq_test_table = f"{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}_TEST"
query = f"""
CREATE OR REPLACE VIEW `{bq_test_table}` AS
    SELECT * EXCEPT({','.join(VAR_OMIT.split())}, splits),
    FROM `{BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}`
    WHERE splits = 'TEST'
"""
job = bq.query(query = query)
job.result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7f89411c4730>

In [102]:
parameter_values = {
    "project" : PROJECT_ID,
    "region" : REGION,
    "experiment" : EXPERIMENT,
    "series": SERIES,
    "bq_project": BQ_PROJECT,
    "bq_dataset": BQ_DATASET,
    "bq_test_table": bq_test_table,
    "bq_results": f"{URI}/bq_results",
    "best_metric": 'accuracy' # accuracy, precision, recall, f1_score, log_loss, roc_auc
}

#### Compile Pipeline

In [103]:
# from kfp.v2 import compiler
compiler.Compiler().compile(
    pipeline_func = endpoint_update_pipeline,
    package_path = f"{DIR}/{EXPERIMENT}.json"
)

#### Define Pipeline Job
Using compiled pipeline:

In [104]:
pipeline_job = aiplatform.PipelineJob(
    display_name = f"{EXPERIMENT}_tournament",
    template_path = f"{DIR}/{EXPERIMENT}.json",
    parameter_values = parameter_values,
    pipeline_root = f"{URI}/pipeline_root",
    enable_caching = True, # overrides all component/task settings
    labels = {'experiment': EXPERIMENT, 'series': SERIES},

)

#### Submit Pipeline Job

In [105]:
response = pipeline_job.submit(
    service_account = SERVICE_ACCOUNT
)

Creating PipelineJob
PipelineJob created. Resource name: projects/993073267534/locations/us-central1/pipelineJobs/series-bqml-endpoint-update-20240319113831
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/993073267534/locations/us-central1/pipelineJobs/series-bqml-endpoint-update-20240319113831')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/series-bqml-endpoint-update-20240319113831?project=993073267534


Using the following link to view the job in the GCP console:

In [106]:
print(f'The Dashboard can be viewed here:\n{pipeline_job._dashboard_uri()}')

The Dashboard can be viewed here:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/series-bqml-endpoint-update-20240319113831?project=993073267534


#### Wait On Pipeline Job

In [107]:
pipeline_job.wait()

PipelineJob projects/993073267534/locations/us-central1/pipelineJobs/series-bqml-endpoint-update-20240319113831 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob run completed. Resource name: projects/993073267534/locations/us-central1/pipelineJobs/series-bqml-endpoint-update-20240319113831


---
## Work In Progress

Use Vertex AI ML Metadata Artifact Types:
 - [Reference](https://cloud.google.com/vertex-ai/docs/pipelines/artifact-types)
 - [Consume or produce artifact in your component](https://cloud.google.com/vertex-ai/docs/pipelines/use-components#consume_or_produce_artifacts_in_your_component)

```
    # import prebuilt components
    from google_cloud_pipeline_components.v1.bigquery import (
        BigqueryEvaluateModelJobOp,
        BigqueryMLConfusionMatrixJobOp,
        BigqueryMLRocCurveJobOp
    )
    from google_cloud_pipeline_components.types import artifact_types
    from kfp.v2.components import importer_node

...

    with dsl.ParallelFor(candidate_models.outputs['candidates']) as candidate:
        #candidate.set_display_name(str(candidate_models.outputs['candidates']))
        
        bqml_model = importer_node.importer(
            artifact_uri = artifact_uri,
            artifact_class = artifact_types.BQMLModel,
            metadata = {
                'projectId': "statmike-mlops-349915",
                'datasetId': "fraud",
                'modelId': "03f_fraud_20220909135610"
            }
        )
        bq_eval = BigqueryEvaluateModelJobOp(
            project = project,
            location = region,
            model = bqml_model.outputs['artifact'],
            table_name = bq_test_table
        )
        bq_cm = BigqueryMLConfusionMatrixJobOp(
            project = project,
            location = region,
            model = bqml_model.outputs['artifact'],
            table_name = bq_test_table
        )
        bq_roc = BigqueryMLRocCurveJobOp(
            project = project,
            location = region,
            model = bqml_model.outputs['artifact'],
            table_name = bq_test_table
        ) 
```

---
## Remove Resources
see notebook "99 - Cleanup"