In [1]:
from kfp.v2 import compiler
from kfp.v2.dsl import component, pipeline

In [2]:
from google.cloud import aiplatform as vertex
from google_cloud_pipeline_components import aiplatform as vc

In [3]:
BUCKET_NAME = 'gs://mitochondrion-bucket-sg'
PIPELINE_ROOT = f'{BUCKET_NAME}/pipelines'
PIPELINE_NAME = 'feature-store-automl'

PROJECT_ID = 'mitochondrion-project-344303'
REGION = 'asia-southeast1'

In [4]:
@component(base_image='google/cloud-sdk:latest',
            packages_to_install=['pandas==1.3.4',
                                'pandas-gbq==0.17.4',
                                'google-cloud-bigquery==2.34.2',
                                'google-cloud-aiplatform==1.11.0',
                                'fsspec==2022.3.0',
                                'gcsfs==2022.3.0'
            ]
)
def get_dataset_from_fs(current_date: str = '2022-01-01',
                        project_id: str ='mitochondrion-project-344303',
                        region: str ='asia-southeast1') -> str:

    from datetime import datetime, timedelta

    import pandas as pd
    from google.cloud import aiplatform as vertex

    start_date = (datetime.strptime(current_date, '%Y-%m-%d') - timedelta(days=7*4)).strftime('%Y-%m-%d')

    query = """
        SELECT
            user_id, created_date, conversion
        FROM
            `mitochondrion-project-344303.dataset.marketing`
        WHERE
            created_date >= '{start_date}'
            AND created_date <= '{current_date}'
    """.format(start_date=start_date, current_date=current_date)

    df = pd.read_gbq(query, project_id=project_id, dialect='standard')

    vertex.init(project=project_id, location=region)

    read_instance_df = pd.DataFrame()
    read_instance_df['users'] = df['user_id'].astype(str)
    read_instance_df['timestamp'] = [datetime.strptime(x, '%Y-%m-%d') for x in df['created_date']]

    # read_instance_df['users'] = [f'UID{i:04d}' for i in range(1, 101)]
    # read_instance_df['timestamp'] = [datetime.strptime('2022-03-31', '%Y-%m-%d')] * 100

    fs = vertex.featurestore.Featurestore(featurestore_name='marketing_featurestore')
    training_feature = {
        'users': ['channel', 'history', 'zip_code']
    }

    training_df = fs.batch_serve_to_df(
        serving_feature_ids=training_feature,
        read_instances_df=read_instance_df,
    )
    training_df['conversion'] = df['conversion']
    training_df = training_df.drop(columns=['entity_type_users', 'timestamp'])

    filepath = f'gs://mitochondrion-bucket-sg/datasets/from_fs.csv'
    training_df.to_csv(filepath, index=False)

    return filepath


In [5]:
@pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=PIPELINE_NAME,
)
def custom_pipeline(
    project: str = PROJECT_ID,
    current_date: str = '2022-01-01',
    display_name: str = PIPELINE_NAME,
):
    
    data = get_dataset_from_fs(current_date=current_date,
                                project_id=project)

    dataset = vc.TabularDatasetCreateOp(
        project=project,
        display_name=display_name,
        gcs_source=data.output
    )

    model = vc.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        dataset=dataset.outputs['dataset'],
        target_column='conversion',
        budget_milli_node_hours=1000,
        optimization_prediction_type='classification',
    )

    endpoint = vc.EndpointCreateOp(
        project=project,
        location=REGION,
        display_name=display_name,
    )

    vc.ModelDeployOp(
        model=model.outputs['model'],
        endpoint=endpoint.outputs['endpoint'],
        automatic_resources_min_replica_count=1,
        automatic_resources_max_replica_count=1
    )
    

In [6]:
JSON_FILE = 'feature_store_automl.json'

compiler.Compiler().compile(
    pipeline_func=custom_pipeline,
    package_path=JSON_FILE
)



In [7]:
from google.cloud import aiplatform as vertex

job = vertex.PipelineJob(
    display_name = PIPELINE_NAME,
    template_path = JSON_FILE,
    enable_caching = True,
    project = PROJECT_ID,
    location = REGION,
    parameter_values = {
        'current_date': '2022-03-31'
    }
)

# job.submit()
job.submit(service_account='default@mitochondrion-project-344303.iam.gserviceaccount.com')

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/832137092875/locations/asia-southeast1/pipelineJobs/feature-store-automl-20220422110912
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/832137092875/locations/asia-southeast1/pipelineJobs/feature-store-automl-20220422110912')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/asia-southeast1/pipelines/runs/feature-store-automl-20220422110912?project=832137092875
