# Customer segmentation using Vertex AI

## Feature engineering

In [None]:
# in case we need to install or update packages
# ! pip3 install --upgrade --quiet google-cloud-aiplatform \
#                                  google-cloud-storage \
#                                  kfp \
#                                  "numpy<2" \
#                                  google-cloud-pipeline-components

In [37]:
# libraries
import time
import os
import pandas as pd
import pandas_gbq
import joblib
from sklearn.cluster import KMeans
from google.cloud import aiplatform, bigquery, storage
from google.cloud.aiplatform import explain
from google.cloud.bigquery import Client, QueryJobConfig
from vertexai.resources.preview import feature_store
from typing import List, Dict, Optional, Sequence, NamedTuple

from google.cloud.aiplatform_v1 import (FeatureOnlineStoreAdminServiceClient,
                                        FeatureOnlineStoreServiceClient,
                                        FeatureRegistryServiceClient)
from google.cloud.aiplatform_v1.types import feature as feature_pb2
from google.cloud.aiplatform_v1.types import feature_group as feature_group_pb2
from google.cloud.aiplatform_v1.types import \
    feature_online_store as feature_online_store_pb2
from google.cloud.aiplatform_v1.types import \
    feature_online_store_admin_service as \
    feature_online_store_admin_service_pb2
from google.cloud.aiplatform_v1.types import \
    feature_online_store_service as feature_online_store_service_pb2
from google.cloud.aiplatform_v1.types import \
    feature_registry_service as feature_registry_service_pb2
from google.cloud.aiplatform_v1.types import feature_view as feature_view_pb2
from google.cloud.aiplatform_v1.types import \
    featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1.types import io as io_pb2


# pipelines
import kfp
from kfp import compiler, dsl
from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                     OutputPath, component)

pd.options.mode.copy_on_write = True

In [2]:
# Configs
PROJECT_ID = "prj-d2-data-poc-vivo"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}

API_ENDPOINT = f"{LOCATION}-aiplatform.googleapis.com"

BUCKET_URI = f"gs://rgc-aiplatform-{PROJECT_ID}-unique"  # @param {type:"string"}

# initializing the SDK
aiplatform.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)

In [3]:
# getting the data from BQ
client = Client()

# creating and executing the query
query = """SELECT msisdn, interest, weight FROM `prj-d2-data-poc-vivo.M2C_data_customer_segmentation.tb_navigation_data`"""
job = client.query(query)
df = job.to_dataframe()

In [4]:
df.head()

Unnamed: 0,msisdn,interest,weight
0,11 935390207,StreamingdeMsica,7741440.0
1,21 916980258,StreamingdeMsica,26369280.0
2,11 952102450,StreamingdeMsica,12015360.0
3,31 994956982,StreamingdeMsica,24030720.0
4,21 977840834,StreamingdeMsica,10321920.0


In [5]:
df.dtypes

Unnamed: 0,0
msisdn,object
interest,object
weight,float64


### Preprocessing and feature engineering

In [6]:
# doing some preprocessings
df_preprocessed = df.astype({'weight':'int'})

In [7]:
# Using One Hot Encoder
df_one_hot = pd.get_dummies(df_preprocessed, columns=['interest'], dtype="int")
df_one_hot.head()

Unnamed: 0,msisdn,weight,interest_ArmazenamentoemNuvem,interest_Compras,interest_Delivery,interest_Ecommerce,interest_Educao,interest_Emprego,interest_Entretenimento,interest_Esportes,...,interest_Mensageiros,interest_Notcias,interest_Produtividade,interest_SadeeBemEstar,interest_ServiosdeTransporte,interest_Streaming,interest_StreamingdeMsica,interest_Utilitrios,interest_Viagem,interest_subprotocolo
0,11 935390207,7741440,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0
1,21 916980258,26369279,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0
2,11 952102450,12015359,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0
3,31 994956982,24030720,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0
4,21 977840834,10321920,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0


In [9]:
# grouping by customer based on the number of session by interest
# prompt: group the df_nav_data Dataframes by nr_tlfn and ds_sbpt with the following columns: sum the session duration and get the min for the recency
df_one_hot_grouped = df_one_hot.groupby(by="msisdn", dropna=False).sum()
df_one_hot_grouped.head()

Unnamed: 0_level_0,weight,interest_ArmazenamentoemNuvem,interest_Compras,interest_Delivery,interest_Ecommerce,interest_Educao,interest_Emprego,interest_Entretenimento,interest_Esportes,interest_Finanas,...,interest_Mensageiros,interest_Notcias,interest_Produtividade,interest_SadeeBemEstar,interest_ServiosdeTransporte,interest_Streaming,interest_StreamingdeMsica,interest_Utilitrios,interest_Viagem,interest_subprotocolo
msisdn,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
11 900000095,369734387,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
11 900000476,552867824,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
11 900000579,184262388,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
11 900000709,372556786,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
11 900000732,97413111,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1


In [13]:
# creating a pandas df that can be used as a feature view in the nexts steps
df_feature_view =  df_one_hot_grouped.reset_index().rename_axis('entity_id').reset_index()
df_feature_view.head()

Unnamed: 0,entity_id,msisdn,weight,interest_ArmazenamentoemNuvem,interest_Compras,interest_Delivery,interest_Ecommerce,interest_Educao,interest_Emprego,interest_Entretenimento,...,interest_Mensageiros,interest_Notcias,interest_Produtividade,interest_SadeeBemEstar,interest_ServiosdeTransporte,interest_Streaming,interest_StreamingdeMsica,interest_Utilitrios,interest_Viagem,interest_subprotocolo
0,0,11 900000095,369734387,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
1,1,11 900000476,552867824,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
2,2,11 900000579,184262388,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
3,3,11 900000709,372556786,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
4,4,11 900000732,97413111,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1


### "training" the kmeans model

In [14]:
# prompt: train a kmeans model with 20 clusters
kmeans = KMeans(n_clusters=20, random_state=0)
kmeans = kmeans.fit(df_one_hot_grouped)

In [16]:
# prompt: using the kmeans model apply it to the dataframe
df_one_hot_grouped['cluster'] = kmeans.predict(df_one_hot_grouped)
df_one_hot_grouped.head()

Unnamed: 0_level_0,weight,interest_ArmazenamentoemNuvem,interest_Compras,interest_Delivery,interest_Ecommerce,interest_Educao,interest_Emprego,interest_Entretenimento,interest_Esportes,interest_Finanas,...,interest_Notcias,interest_Produtividade,interest_SadeeBemEstar,interest_ServiosdeTransporte,interest_Streaming,interest_StreamingdeMsica,interest_Utilitrios,interest_Viagem,interest_subprotocolo,cluster
msisdn,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
11 900000095,369734387,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
11 900000476,552867824,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,17
11 900000579,184262388,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,16
11 900000709,372556786,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,7
11 900000732,97413111,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,8


### registering the model's versions

In [17]:
# exporting the model to joblib file format
artifact_filename = 'model.joblib'

# Save model artifact to local filesystem (doesn't persist)
local_path = artifact_filename
joblib.dump(kmeans, local_path)

# Upload model artifact to Cloud Storage
model_directory = f"{BUCKET_URI}/customer_segmentation"
storage_path = os.path.join(model_directory, artifact_filename)
blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
blob.upload_from_filename(local_path)

In [18]:
# uploading to model registry
model_v1 = aiplatform.Model.upload(
        display_name="customer_segmentation_model_sklearn_01",
        #model_id = "customer_segmentation_sklearn",
        version_aliases=["v1"],
        version_description="This is the INITIAL version of the model",
        artifact_uri=model_directory,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest",
        is_default_version=True)

model_v1.wait()

print(model_v1.display_name)
print(model_v1.resource_name)

INFO:google.cloud.aiplatform.models:Creating Model
INFO:google.cloud.aiplatform.models:Create Model backing LRO: projects/825995534865/locations/us-central1/models/2007977612667781120/operations/8707984379470675968
INFO:google.cloud.aiplatform.models:Model created. Resource name: projects/825995534865/locations/us-central1/models/2007977612667781120@1
INFO:google.cloud.aiplatform.models:To use this Model in another session:
INFO:google.cloud.aiplatform.models:model = aiplatform.Model('projects/825995534865/locations/us-central1/models/2007977612667781120@1')


customer_segmentation_model_sklearn_01
projects/825995534865/locations/us-central1/models/2007977612667781120


In [19]:
# uloading a V2 model version just as an example
# uploading to model registry
model_v2 = aiplatform.Model.upload(
        display_name="customer_segmentation_model_sklearn_01",
        parent_model=model_v1.resource_name,
        version_aliases=["v2"],
        version_description="This is the SECOND version of the model",
        artifact_uri=model_directory,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest",
        is_default_version=True)

model_v2.wait()

print(model_v2.display_name)
print(model_v2.resource_name)

INFO:google.cloud.aiplatform.models:Creating Model
INFO:google.cloud.aiplatform.models:Create Model backing LRO: projects/825995534865/locations/us-central1/models/2007977612667781120/operations/2066300829006036992
INFO:google.cloud.aiplatform.models:Model created. Resource name: projects/825995534865/locations/us-central1/models/2007977612667781120@2
INFO:google.cloud.aiplatform.models:To use this Model in another session:
INFO:google.cloud.aiplatform.models:model = aiplatform.Model('projects/825995534865/locations/us-central1/models/2007977612667781120@2')


customer_segmentation_model_sklearn_01
projects/825995534865/locations/us-central1/models/2007977612667781120


### registering a feature view

In [22]:
# creating a table for the feature group in BQ
table_id = "M2C_data_customer_segmentation.tb_customer_segmentation_data_v2"   # @param {type:"string"}
pandas_gbq.to_gbq(df_feature_view, table_id, project_id=f"{PROJECT_ID}", if_exists='replace')

100%|██████████| 1/1 [00:00<00:00, 8050.49it/s]


In [23]:
# initialize clients for feature store
admin_client = FeatureOnlineStoreAdminServiceClient(
    client_options={"api_endpoint": API_ENDPOINT}
)
# registry_client = FeatureRegistryServiceClient(
#     client_options={"api_endpoint": API_ENDPOINT}
# )

In [25]:
# creating the feature view on the feature store
FEATURE_VIEW_ID = "customer_segmentation_features_04"  # @param {type:"string"}
CRON_SCHEDULE = "TZ=Europe/Madrid 56 * * * *"  # @param {type:"string"}
BQ_VIEW_ID = "M2C_data_customer_segmentation.tb_customer_segmentation_data_v2"  # @param {type:"string"}
FEATURE_ONLINE_STORE_ID = "online_store_01"  # @param {type:"string"}
BQ_VIEW_ID_FQN = f"{PROJECT_ID}.{BQ_VIEW_ID}"

# setting the source
big_query_source = feature_view_pb2.FeatureView.BigQuerySource(
    uri=f"bq://{BQ_VIEW_ID_FQN}", entity_id_columns=["entity_id"]
)

# sync method: if no sync > manual
#sync_config = feature_view_pb2.FeatureView.SyncConfig()

create_view_lro = admin_client.create_feature_view(
    feature_online_store_admin_service_pb2.CreateFeatureViewRequest(
        parent=f"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}",
        feature_view_id=FEATURE_VIEW_ID,
        feature_view=feature_view_pb2.FeatureView(
            big_query_source=big_query_source,
            #sync_config=sync_config,
        ),
    )
)

# Wait for LRO to complete and show result
print(create_view_lro.result())

name: "projects/825995534865/locations/us-central1/featureOnlineStores/online_store_01/featureViews/customer_segmentation_features_04"



In [26]:
# MANUALLY syncing the feature view (could be cron as well)
sync_response = admin_client.sync_feature_view(
    feature_view=f"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}/featureViews/{FEATURE_VIEW_ID}"
)

In [27]:
# checking the sync status
while True:
    feature_view_sync = admin_client.get_feature_view_sync(
        name=sync_response.feature_view_sync
    )
    if feature_view_sync.run_time.end_time.seconds > 0:
        status = "Succeed" if feature_view_sync.final_status.code == 0 else "Failed"
        print(f"Sync {status} for {feature_view_sync.name}.")
        break
    else:
        print("Sync ongoing, waiting for 30 seconds.")
    time.sleep(30)

Sync ongoing, waiting for 30 seconds.
Sync Succeed for projects/prj-d2-data-poc-vivo/locations/us-central1/featureOnlineStores/online_store_01/featureViews/customer_segmentation_features_04/featureViewSyncs/3726991758395965440.


In [28]:
# getting the sync info once it's finished
admin_client.list_feature_view_syncs(
    parent=f"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}/featureViews/{FEATURE_VIEW_ID}"
)

ListFeatureViewSyncsPager<feature_view_syncs {
  name: "projects/prj-d2-data-poc-vivo/locations/us-central1/featureOnlineStores/online_store_01/featureViews/customer_segmentation_features_04/featureViewSyncs/3726991758395965440"
  create_time {
    seconds: 1730186507
    nanos: 582995000
  }
  final_status {
  }
  run_time {
    start_time {
      seconds: 1730186507
      nanos: 582995000
    }
    end_time {
      seconds: 1730186544
      nanos: 141185000
    }
  }
}
>

In [29]:
# getting
data_client = FeatureOnlineStoreServiceClient(
    client_options={"api_endpoint": API_ENDPOINT}
)

data_client.fetch_feature_values(
    request=feature_online_store_service_pb2.FetchFeatureValuesRequest(
        feature_view=f"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}/featureViews/{FEATURE_VIEW_ID}",
        data_key=feature_online_store_service_pb2.FeatureViewDataKey(key="28098"),
    )
)

key_values {
  features {
    name: "msisdn"
    value {
      string_value: "11 907086385"
    }
  }
  features {
    name: "weight"
    value {
      int64_value: 94429430
    }
  }
  features {
    name: "interest_ArmazenamentoemNuvem"
    value {
      int64_value: 1
    }
  }
  features {
    name: "interest_Compras"
    value {
      int64_value: 1
    }
  }
  features {
    name: "interest_Delivery"
    value {
      int64_value: 1
    }
  }
  features {
    name: "interest_Ecommerce"
    value {
      int64_value: 1
    }
  }
  features {
    name: "interest_Educao"
    value {
      int64_value: 1
    }
  }
  features {
    name: "interest_Emprego"
    value {
      int64_value: 1
    }
  }
  features {
    name: "interest_Entretenimento"
    value {
      int64_value: 1
    }
  }
  features {
    name: "interest_Esportes"
    value {
      int64_value: 1
    }
  }
  features {
    name: "interest_Finanas"
    value {
      int64_value: 1
    }
  }
  features {
    name: "inte

### Vertex AI pipelines

In [42]:
PIPELINE_ROOT = "{}/pipeline_root/shakespeare".format(BUCKET_URI)

In [74]:
# load data from bigquery
@component(
    packages_to_install=[
        "pandas",
        "google-cloud-bigquery",
        "google-cloud-storage",
        "db-dtypes"
    ],
    base_image="python:3.10",
)
def load_data_from_bigquery(query: str, project_id: str, output_data_path: OutputPath(Dataset)):

    from google.cloud import bigquery

    bq_client = bigquery.Client(project=project_id)
    df = bq_client.query(query).to_dataframe()
    df.to_csv(output_data_path, index=False)

In [75]:
# doing preprocessing on data
@component(
    packages_to_install=[
        "pandas",
        "google-cloud-storage"
    ],
    base_image="python:3.10",
)
def preprocessing_data(
    input_data_path: InputPath(Dataset),
    output_data_path: OutputPath(Dataset)
):
    import pandas as pd

    def preprocessing_ops(df):
        df = df.copy()

        # doing some preprocessings
        df_preprocessed = df.astype({'weight':'int'})

        # Using One Hot Encoder
        df_one_hot = pd.get_dummies(df_preprocessed, columns=['interest'], dtype="int")
        #df_one_hot.head()

        # grouping by customer based on the number of session by interest
        # prompt: group the df_nav_data Dataframes by nr_tlfn and ds_sbpt with the following columns: sum the session duration and get the min for the recency
        df_one_hot_grouped = df_one_hot.groupby(by="msisdn", dropna=False).sum()
        #df_one_hot_grouped.head()

        # creating a pandas df that can be used as a feature view in the nexts steps
        # df_feature_view =  df_one_hot_grouped.reset_index().rename_axis('entity_id').reset_index()
        # df_feature_view.head()

        return df_one_hot_grouped

    df = pd.read_csv(input_data_path)
    processed_data = preprocessing_ops(df)
    processed_data.to_csv(output_data_path, index=False)

In [125]:
# "fitting" and exporting the model
@component(
    packages_to_install=[
        "pandas",
        "scikit-learn",
        "google-cloud-storage"
    ],
    base_image="python:3.10",
)
def model_fit_and_export(
    PROJECT_ID: str,
    BUCKET_URI: str,
    input_data_path: InputPath(Dataset),
    model: Output[Model]
):

    import pandas as pd
    from sklearn.cluster import KMeans

    df = pd.read_csv(input_data_path)

    # prompt: train a kmeans model with 20 clusters
    kmeans = KMeans(n_clusters=20, random_state=0)
    kmeans = kmeans.fit(df)
    model = kmeans

In [126]:
# uploading the model
@component(
    packages_to_install=[
        "pandas",
        "google-cloud-storage",
        "google-cloud-aiplatform",
        "joblib"
    ],
    base_image="python:3.10",
)
def upload_model_to_registry(
    BUCKET_URI: str,
    model: Model
):

    from google.cloud import aiplatform, storage
    import os
    import joblib

    # exporting the model to joblib file format
    artifact_filename = 'model.joblib'

    # Save model artifact to local filesystem (doesn't persist)
    local_path = artifact_filename
    joblib.dump(model, artifact_filename)

    # Upload model artifact to Cloud Storage
    model_directory = f"{BUCKET_URI}/customer_segmentation"
    storage_path = os.path.join(model_directory, artifact_filename)
    blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
    blob.upload_from_filename(local_path)

    # uploading to model registry
    model_v1 = aiplatform.Model.upload(
            display_name="customer_segmentation_model_sklearn_pipeline_01",
            #model_id = "customer_segmentation_sklearn",
            version_aliases=["v1"],
            version_description="This is the INITIAL version of the model",
            artifact_uri=model_directory,
            serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest",
            is_default_version=True)

    model_v1.wait()

    print(model_v1.display_name)
    print(model_v1.resource_name)

In [130]:
@dsl.pipeline(
    name="customer_segmentation_pipeline",
    pipeline_root=f"{BUCKET_URI}/pipeline-root-rca",
)
def customer_segmentation_pipeline(
    PROJECT_ID: str = "prj-d2-data-poc-vivo",
    LOCATION: str = "us-central1",
    API_ENDPOINT:str = f"{LOCATION}-aiplatform.googleapis.com",
    BUCKET_URI: str = f"gs://rgc-aiplatform-{PROJECT_ID}-unique"
):

    load_data = load_data_from_bigquery(
        project_id=PROJECT_ID,
        query=query
    )

    data_processed = preprocessing_data(
        input_data_path=load_data.outputs["output_data_path"],
    )

    model_fitted = model_fit_and_export(
        PROJECT_ID=PROJECT_ID,
        BUCKET_URI=BUCKET_URI,
        input_data_path=data_processed.outputs["output_data_path"]
    )

    model_uploaded = upload_model_to_registry(
        BUCKET_URI=BUCKET_URI,
        model=model_fitted.outputs["model"]
    )

In [131]:
compiler.Compiler().compile(
    pipeline_func=customer_segmentation_pipeline,
    package_path="customer_segmentation_pipeline.yml",
)

In [132]:
DISPLAY_NAME = "customer_segmentation_pipeline"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=f"{DISPLAY_NAME}.yml",
)

job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/825995534865/locations/us-central1/pipelineJobs/customer-segmentation-pipeline-20241029102427
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/825995534865/locations/us-central1/pipelineJobs/customer-segmentation-pipeline-20241029102427')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/customer-segmentation-pipeline-20241029102427?project=825995534865
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/825995534865/locations/us-central1/pipelineJobs/customer-segmentation-pipeline-20241029102427 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob pr