In [1]:
from google.cloud import aiplatform
from datetime import datetime
# import kfp
import kfp.v2.dsl as dsl
from google_cloud_pipeline_components import aiplatform as gcc_aip

from google.cloud import bigquery
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np

from google.cloud import bigquery as bq

In [2]:
from google.oauth2 import service_account

BQ_DATA = "bigquery-public-data.ml_datasets.ulb_fraud_detection"
PROJECT_ID = "vertex-ai-tuto-380714"
REGION = "us-central1"
VAR_TARGET = "Class"
credentials = service_account.Credentials.from_service_account_file("vertex-ai-tuto-380714-811aa46f0cfa.json")


Clients

In [3]:
aiplatform.init(project=PROJECT_ID, location=REGION, credentials=credentials)
bigquery = bq.Client(credentials=credentials)

In [26]:
PROJECT_ID = "vertex-ai-tuto-380714"
REGION = 'us-central1'
DATANAME = 'fraud'
NOTEBOOK = '02c'

# Resources
DEPLOY_COMPUTE = 'n1-standard-2'

# Model Training
VAR_TARGET = 'Class'
# VAR_OMIT = 'transaction_id' # add more variables to the string with space delimiters

In [55]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = PROJECT_ID

URI = f"gs://vertex-tuto/{NOTEBOOK}"
DIR = f"temp/{NOTEBOOK}"

# Pipeline (KFP) Definition

Flow
- Create Vertex AI Dataset from link to BigQuery table
- Create Vertex AI AutoML Tabular Training Job
- Create Endpoint and Depoy trained model

In [81]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

# @kfp.dsl.pipeline(
@dsl.pipeline(
    name = f'kfp-{TIMESTAMP}',
    pipeline_root = URI+'/'+str(TIMESTAMP)+'/kfp/'
)
def pipeline(
    project: str,
#     dataname: str,
    display_name: str,
    deploy_machine: str,
    bq_source: str,
    var_target: str,
#     var_omit: str,
    features: dict,
    labels: dict 
):
    
    # dataset
    dataset = gcc_aip.TabularDatasetCreateOp(
        project = project,
        display_name = display_name,
        bq_source = bq_source,
        labels = labels
    )
    
    # training
    model = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project = project,
        display_name = display_name,
        optimization_prediction_type = "classification",
        optimization_objective = "maximize-au-prc",
        budget_milli_node_hours = 1000,
        disable_early_stopping=False,
        column_specs = features,
        dataset = dataset.outputs['dataset'],
        target_column = var_target,
#         predefined_split_column_name = 'splits',
        labels = labels
    )
    
    # Endpoint: Creation
    endpoint = gcc_aip.EndpointCreateOp(
        project = project,
        display_name = display_name,
        labels = labels
    )
    
    # Endpoint: Deployment of Model
    deployment = gcc_aip.ModelDeployOp(
        model = model.outputs["model"],
        endpoint = endpoint.outputs["endpoint"],
        dedicated_resources_min_replica_count = 1,
        dedicated_resources_max_replica_count = 1,
        traffic_split = {"0": 100},
        dedicated_resources_machine_type= deploy_machine
    )

# Compile pipeline

In [64]:
# Create DIR 
!mkdir -p "temp/02c"

In [82]:


kfp.v2.compiler.Compiler().compile(
    pipeline_func = pipeline,
    package_path = f"{DIR}/{NOTEBOOK}.json"
)

# Create Vertex AI Pipeline Job

In [4]:
# get feature names
# query = "SELECT * FROM bigquery-public-data.ml_datasets.ulb_fraud_detection.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'bigquery-public-data.ml_datasets.ulb_fraud_detection_prepped'"
query = f"SELECT * FROM bigquery-public-data.ml_datasets.ulb_fraud_detection"
schema = bigquery.query(query).to_dataframe()
OMIT = [VAR_TARGET] # VAR_OMIT.split() + [VAR_TARGET, 'splits']
features = schema.columns[~schema.columns.isin(OMIT)].tolist()
features = dict.fromkeys(features, 'auto')


In [8]:
schema[schema.columns[~schema.columns.isin(OMIT)]]

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V20,V21,V22,V23,V24,V25,V26,V27,V28,Amount
0,8748.0,-1.070416,0.304517,2.777064,2.154061,0.254450,-0.448529,-0.398691,0.144672,1.070900,...,-0.195556,-0.122032,-0.182351,0.019576,0.626023,-0.018518,-0.263291,-0.198600,0.098435,0.00
1,27074.0,1.165628,0.423671,0.887635,2.740163,-0.338578,-0.142846,-0.055628,-0.015325,-0.213621,...,-0.177675,-0.081184,-0.025694,-0.076609,0.414687,0.631032,0.077322,0.010182,0.019912,0.00
2,28292.0,1.050879,0.053408,1.364590,2.666158,-0.378636,1.382032,-0.766202,0.486126,0.152611,...,-0.170425,0.083467,0.624424,-0.157228,-0.240411,0.573061,0.244090,0.063834,0.010981,0.00
3,28488.0,1.070316,0.079499,1.471856,2.863786,-0.637887,0.858159,-0.687478,0.344146,0.459561,...,-0.221677,0.048067,0.534713,-0.098645,0.129272,0.543737,0.242724,0.065070,0.023500,0.00
4,31392.0,-3.680953,-4.183581,2.642743,4.263802,4.643286,-0.225053,-3.733637,1.273037,0.015661,...,1.177573,0.649051,1.054124,0.795528,-0.901314,-0.425524,0.511675,0.125419,0.243671,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
284802,154599.0,0.667714,3.041502,-5.845112,5.967587,0.213863,-1.462923,-2.688761,0.677764,-3.447596,...,0.558425,0.329760,-0.941383,-0.006075,-0.958925,0.239298,-0.067356,0.821048,0.426175,6.74
284803,90676.0,-2.405580,3.738235,-2.317843,1.367442,0.394001,1.919938,-3.106942,-10.764403,3.353525,...,-2.140874,10.005998,-2.454964,1.684957,0.118263,-1.531380,-0.695308,-0.152502,-0.138866,6.99
284804,34634.0,0.333499,1.699873,-2.596561,3.643945,-0.585068,-0.654659,-2.275789,0.675229,-2.042416,...,0.329342,0.469212,-0.144363,-0.317981,-0.769644,0.807855,0.228164,0.551002,0.305473,18.96
284805,96135.0,-1.952933,3.541385,-1.310561,5.955664,-1.003993,0.983049,-4.587235,-4.892184,-2.516752,...,1.965030,-1.998091,1.133706,-0.041461,-0.215379,-0.865599,0.212545,0.532897,0.357892,18.96


In [11]:
# schema[schema.columns.difference([VAR_TARGET])]
schema

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,8748.0,-1.070416,0.304517,2.777064,2.154061,0.254450,-0.448529,-0.398691,0.144672,1.070900,...,-0.122032,-0.182351,0.019576,0.626023,-0.018518,-0.263291,-0.198600,0.098435,0.00,0
1,27074.0,1.165628,0.423671,0.887635,2.740163,-0.338578,-0.142846,-0.055628,-0.015325,-0.213621,...,-0.081184,-0.025694,-0.076609,0.414687,0.631032,0.077322,0.010182,0.019912,0.00,0
2,28292.0,1.050879,0.053408,1.364590,2.666158,-0.378636,1.382032,-0.766202,0.486126,0.152611,...,0.083467,0.624424,-0.157228,-0.240411,0.573061,0.244090,0.063834,0.010981,0.00,0
3,28488.0,1.070316,0.079499,1.471856,2.863786,-0.637887,0.858159,-0.687478,0.344146,0.459561,...,0.048067,0.534713,-0.098645,0.129272,0.543737,0.242724,0.065070,0.023500,0.00,0
4,31392.0,-3.680953,-4.183581,2.642743,4.263802,4.643286,-0.225053,-3.733637,1.273037,0.015661,...,0.649051,1.054124,0.795528,-0.901314,-0.425524,0.511675,0.125419,0.243671,0.00,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
284802,154599.0,0.667714,3.041502,-5.845112,5.967587,0.213863,-1.462923,-2.688761,0.677764,-3.447596,...,0.329760,-0.941383,-0.006075,-0.958925,0.239298,-0.067356,0.821048,0.426175,6.74,1
284803,90676.0,-2.405580,3.738235,-2.317843,1.367442,0.394001,1.919938,-3.106942,-10.764403,3.353525,...,10.005998,-2.454964,1.684957,0.118263,-1.531380,-0.695308,-0.152502,-0.138866,6.99,1
284804,34634.0,0.333499,1.699873,-2.596561,3.643945,-0.585068,-0.654659,-2.275789,0.675229,-2.042416,...,0.469212,-0.144363,-0.317981,-0.769644,0.807855,0.228164,0.551002,0.305473,18.96,1
284805,96135.0,-1.952933,3.541385,-1.310561,5.955664,-1.003993,0.983049,-4.587235,-4.892184,-2.516752,...,-1.998091,1.133706,-0.041461,-0.215379,-0.865599,0.212545,0.532897,0.357892,18.96,1


Run The pipeline:

In [83]:
pipeline = aiplatform.PipelineJob(
    display_name = f'{NOTEBOOK}_{TIMESTAMP}',
    template_path = f"{URI}/{NOTEBOOK}.json",
    parameter_values = {
        "project" : PROJECT_ID,
        "display_name" : f'{NOTEBOOK}_{TIMESTAMP}',
        "deploy_machine" : DEPLOY_COMPUTE,
        "bq_source" : f'bq://{BQ_DATA}',
        "var_target" : VAR_TARGET,
#         "var_omit" : VAR_OMIT,
        "features" : features,
        "labels" : {'notebook': NOTEBOOK}       
    },
    labels = {'notebook': NOTEBOOK},
    enable_caching=False
)

In [53]:
f"{URI}/{NOTEBOOK}.json"

'gs://vertex-ai-tuto-380714/02c/02c.json'

In [84]:
SERVICE_ACCOUNT = "856301062069-compute@developer.gserviceaccount.com"

response = pipeline.run(
    service_account = SERVICE_ACCOUNT
)

Creating PipelineJob
PipelineJob created. Resource name: projects/856301062069/locations/us-central1/pipelineJobs/kfp-20230321180327-20230321180443
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/856301062069/locations/us-central1/pipelineJobs/kfp-20230321180327-20230321180443')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/kfp-20230321180327-20230321180443?project=856301062069
PipelineJob projects/856301062069/locations/us-central1/pipelineJobs/kfp-20230321180327-20230321180443 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/856301062069/locations/us-central1/pipelineJobs/kfp-20230321180327-20230321180443 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/856301062069/locations/us-central1/pipelineJobs/kfp-20230321180327-20230321180443 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/856301062069/locations/us-centra

In [95]:

from typing import Dict

def predict_tabular_classification_sample(
    project: str,
    endpoint_id: str,
    instance_dict: Dict,
    credentials : str,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.PredictionServiceClient(client_options=client_options, credentials=credentials)
    # for more info on the instance schema, please use get_model_sample.py
    # and look at the yaml found in instance_schema_uri
    instance = json_format.ParseDict(instance_dict, Value())
    instances = [instance]
    parameters_dict = {}
    parameters = json_format.ParseDict(parameters_dict, Value())
    endpoint = client.endpoint_path(
        project=project, location=location, endpoint=endpoint_id
    )
    response = client.predict(
        endpoint=endpoint, instances=instances, parameters=parameters
    )
    print("response")
    print(" deployed_model_id:", response.deployed_model_id)
    # See gs://google-cloud-aiplatform/schema/predict/prediction/tabular_classification_1.0.0.yaml for the format of the predictions.
    predictions = response.predictions
    for prediction in predictions:
        print(" prediction:", dict(prediction))


# [END aiplatform_predict_tabular_classification_sample]

In [110]:
# # Prepare instances 
 
schema['Time'] =schema['Time'].astype(float)
newobs = schema.to_dict(orient='records')
instances = newobs[:3]

In [31]:
# type(newobs[0])

In [111]:
predict_tabular_classification_sample(
    project="856301062069",
    endpoint_id="8997712668416540672",
    location="us-central1",
    credentials= credentials,
    instance_dict=newobs[0]
)

response
 deployed_model_id: 2643608583219970048
 prediction: {'classes': ['0', '1'], 'scores': [0.9996541738510132, 0.0003458340652287006]}


# Batch Predictions: BigQuery Source to BigQuery Destination, with Explanations

## Method 1 

## method 1 >> reading from Bigquery

In [116]:
endpoint = aiplatform.Endpoint.list(filter=f'labels.notebook={NOTEBOOK}')[0]
endpoint.list_models()[0].model

'projects/856301062069/locations/us-central1/models/8211031889952112640'

In [119]:
batch = aiplatform.BatchPredictionJob.create(
    credentials=credentials,
    job_display_name = f'{NOTEBOOK}_{TIMESTAMP}',
    model_name = endpoint.list_models()[0].model,
    instances_format = "bigquery",
    predictions_format = "bigquery",
    bigquery_source = f'bq://{BQ_DATA}',
    bigquery_destination_prefix = f"{PROJECT_ID}",
    generate_explanation=True,
    labels = {'notebook':f'{NOTEBOOK}'}
)

Creating BatchPredictionJob
BatchPredictionJob created. Resource name: projects/856301062069/locations/us-central1/batchPredictionJobs/4090178444963348480
To use this BatchPredictionJob in another session:
bpj = aiplatform.BatchPredictionJob('projects/856301062069/locations/us-central1/batchPredictionJobs/4090178444963348480')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/4090178444963348480?project=856301062069
BatchPredictionJob projects/856301062069/locations/us-central1/batchPredictionJobs/4090178444963348480 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/856301062069/locations/us-central1/batchPredictionJobs/4090178444963348480 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/856301062069/locations/us-central1/batchPredictionJobs/4090178444963348480 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/856301062069/locations/us-central1/batchPredictionJobs/

## method 1 >> reading from jsonl

In [124]:
schema.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,282.0,-0.356466,0.725418,1.971749,0.831343,0.369681,-0.107776,0.75161,-0.120166,-0.420675,...,0.020804,0.424312,-0.015989,0.466754,-0.809962,0.657334,-0.04315,-0.046401,0.0,0
1,14332.0,1.07195,0.340678,1.784068,2.846396,-0.751538,0.403028,-0.73492,0.205807,1.092726,...,-0.169632,-0.113604,0.067643,0.468669,0.223541,-0.112355,0.014015,0.021504,0.0,0
2,32799.0,1.153477,-0.047859,1.358363,1.48062,-1.222598,-0.48169,-0.654461,0.128115,0.907095,...,0.125514,0.480049,-0.025964,0.701843,0.417245,-0.257691,0.060115,0.035332,0.0,0
3,35799.0,-0.769798,0.622325,0.242491,-0.586652,0.527819,-0.104512,0.209909,0.669861,-0.304509,...,0.152738,0.255654,-0.130237,-0.660934,-0.493374,0.331855,-0.011101,0.049089,0.0,0
4,36419.0,1.04796,0.145048,1.624573,2.932652,-0.726574,0.690451,-0.627288,0.278709,0.318434,...,0.078499,0.658942,-0.06781,0.476882,0.52683,0.219902,0.070627,0.028488,0.0,0


In [127]:
# dataframe to json 

schema.to_json("data/fraud_detection_json.json")

In [129]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

batchWithGs = aiplatform.BatchPredictionJob.create(
    credentials=credentials,
    job_display_name = f'{NOTEBOOK}_{TIMESTAMP}',
    model_name = endpoint.list_models()[0].model,
#     instances_format = "jsonl",
#     predictions_format = "jsonl",
    gcs_source = f'gs://vertex-tuto/data/fraud_detection_json.json',
    gcs_destination_prefix = f"gs://vertex-tuto/output",
    generate_explanation=True,
    labels = {'notebook':f'{NOTEBOOK}'}
)

Creating BatchPredictionJob
BatchPredictionJob created. Resource name: projects/856301062069/locations/us-central1/batchPredictionJobs/4354883670327164928
To use this BatchPredictionJob in another session:
bpj = aiplatform.BatchPredictionJob('projects/856301062069/locations/us-central1/batchPredictionJobs/4354883670327164928')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/4354883670327164928?project=856301062069
BatchPredictionJob projects/856301062069/locations/us-central1/batchPredictionJobs/4354883670327164928 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/856301062069/locations/us-central1/batchPredictionJobs/4354883670327164928 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/856301062069/locations/us-central1/batchPredictionJobs/4354883670327164928 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/856301062069/locations/us-central1/batchPredictionJobs/

## Method 2

In [None]:
create_batch_prediction_job_sample(
    project= PROJECT_ID,
    display_name = f'{NOTEBOOK}_{TIMESTAMP}',
)

In [114]:
PROJECT_ID

'vertex-ai-tuto-380714'

In [113]:
def create_batch_prediction_job_sample(
    project: str,
    display_name: str,
    model_name: str,
    instances_format: str,
    gcs_source_uri: str,
    predictions_format: str,
    gcs_destination_output_uri_prefix: str,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.JobServiceClient(client_options=client_options)
    model_parameters_dict = {}
    model_parameters = json_format.ParseDict(model_parameters_dict, Value())

    batch_prediction_job = {
        "display_name": display_name,
        # Format: 'projects/{project}/locations/{location}/models/{model_id}'
        "model": model_name,
        "model_parameters": model_parameters,
        "input_config": {
            "instances_format": instances_format,
            "gcs_source": {"uris": [gcs_source_uri]},
        },
        "output_config": {
            "predictions_format": predictions_format,
            "gcs_destination": {"output_uri_prefix": gcs_destination_output_uri_prefix},
        },
        "dedicated_resources": {
            "machine_spec": {
                "machine_type": "n1-standard-2",
                "accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
                "accelerator_count": 1,
            },
            "starting_replica_count": 1,
            "max_replica_count": 1,
        },
    }
    parent = f"projects/{project}/locations/{location}"
    response = client.create_batch_prediction_job(
        parent=parent, batch_prediction_job=batch_prediction_job
    )
    print("response:", response)

In [12]:
schema.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,8748.0,-1.070416,0.304517,2.777064,2.154061,0.25445,-0.448529,-0.398691,0.144672,1.0709,...,-0.122032,-0.182351,0.019576,0.626023,-0.018518,-0.263291,-0.1986,0.098435,0.0,0
1,27074.0,1.165628,0.423671,0.887635,2.740163,-0.338578,-0.142846,-0.055628,-0.015325,-0.213621,...,-0.081184,-0.025694,-0.076609,0.414687,0.631032,0.077322,0.010182,0.019912,0.0,0
2,28292.0,1.050879,0.053408,1.36459,2.666158,-0.378636,1.382032,-0.766202,0.486126,0.152611,...,0.083467,0.624424,-0.157228,-0.240411,0.573061,0.24409,0.063834,0.010981,0.0,0
3,28488.0,1.070316,0.079499,1.471856,2.863786,-0.637887,0.858159,-0.687478,0.344146,0.459561,...,0.048067,0.534713,-0.098645,0.129272,0.543737,0.242724,0.06507,0.0235,0.0,0
4,31392.0,-3.680953,-4.183581,2.642743,4.263802,4.643286,-0.225053,-3.733637,1.273037,0.015661,...,0.649051,1.054124,0.795528,-0.901314,-0.425524,0.511675,0.125419,0.243671,0.0,0


In [19]:
# from google.cloud import storage
# schema.to_csv('data/iris.csv', index = False)

from google.cloud import bigquery
client = bigquery.Client(credentials=credentials)
bucket_name = 'vertex-tuto'
project = "bigquery-public-data"
dataset_id = "ml_datasets" 
table_id = "ulb_fraud_detection"

destination_uri = "gs://{}/{}/{}".format(bucket_name, "data", "iris.csv")
dataset_ref = bq.DatasetReference(project, dataset_id)
table_ref = dataset_ref.table(table_id)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    # Location must match that of the source table.
#     location="US",
)  # API request
extract_job.result()  # Waits for job to complete.

ExtractJob<project=vertex-ai-tuto-380714, location=US, id=9b609dba-73d0-4d59-9688-2cdf2e44f4f2>

In [29]:
df = pd.read_csv('gs://vertex-tuto/data/iris.csv')

_request non-retriable exception: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401
Traceback (most recent call last):
  File "C:\Users\adnan\.conda\envs\vertexenv\lib\site-packages\gcsfs\retry.py", line 114, in retry_request
    return await func(*args, **kwargs)
  File "C:\Users\adnan\.conda\envs\vertexenv\lib\site-packages\gcsfs\core.py", line 411, in _request
    validate_response(status, contents, path, args)
  File "C:\Users\adnan\.conda\envs\vertexenv\lib\site-packages\gcsfs\retry.py", line 101, in validate_response
    raise HttpError(error)
gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401


HttpError: Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist)., 401

In [28]:
# !pip install fsspec
# !pip install gcsfs