In [1]:
from google.cloud import aiplatform
from google.cloud.aiplatform import gapic as aip

# Set hardware accelerators

In [38]:
TRAIN_GPU, TRAIN_NGPU = (aip.AcceleratorType.NVIDIA_TESLA_K80, 1)

DEPLOY_GPU, DEPLOY_NGPU = (aip.AcceleratorType.NVIDIA_TESLA_K80, 1)

# Set pre-built containers

In [45]:
TRAIN_VERSION = "tf2-cpu.2-7"
DEPLOY_VERSION = "tf2-cpu.2-7"

TRAIN_IMAGE = "us-docker.pkg.dev/vertex-ai/training/{}:latest".format(TRAIN_VERSION)
DEPLOY_IMAGE = "us-docker.pkg.dev/vertex-ai/prediction/{}:latest".format(DEPLOY_VERSION)

print("Training:", TRAIN_IMAGE, TRAIN_GPU, TRAIN_NGPU)
print("Deployment:", DEPLOY_IMAGE, DEPLOY_GPU, DEPLOY_NGPU)

Training: us-docker.pkg.dev/vertex-ai/training/tf2-cpu.2-7:latest AcceleratorType.NVIDIA_TESLA_K80 1
Deployment: us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-7:latest AcceleratorType.NVIDIA_TESLA_K80 1


# Set machine types

In [46]:
MACHINE_TYPE = "n1-standard"

VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

MACHINE_TYPE = "n1-standard"

VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

Train machine type n1-standard-4
Deploy machine type n1-standard-4


# Initialize SDK

In [47]:
aiplatform.init(
    # your Google Cloud Project ID or number
    # environment default used is not set
    project='tii-sac-platform-sandbox-alpha',

    # the Vertex AI region you will use
    # defaults to us-central1
    location='europe-west4',

    # Google Cloud Storage bucket in same region as location
    # used to stage artifacts
    staging_bucket='gs://vertexai_staging_bucket',

    # custom google.auth.credentials.Credentials
    # environment default creds used if not set
    # credentials=my_credentials,

    # customer managed encryption key resource name
    # will be applied to all Vertex AI resources if set
    # encryption_spec_key_name=my_encryption_key_name,

    # the name of the experiment to use to track
    # logged metrics and parameters
    experiment='titanic-classifier',

    # description of the experiment above
    experiment_description='VertexAI Demo for IIT'
)

# Create/Register a Vertex AI managed Tabular Dataset

In [48]:
# To create new
#my_dataset = aiplatform.TabularDataset.create(
#    display_name="Kaggle-titanic", gcs_source=['gs://vertex_ai_demos1/datasets/titanic/train.csv', 'gs://vertex_ai_demos1/datasets/titanic/test.csv'])

# To reference an existing one
dataset = aiplatform.TabularDataset(
    dataset_name='1405923528204615680',
    project='tii-sac-platform-sandbox-alpha',
    location='europe-west4')

# Define command args for training script

In [49]:
from datetime import datetime
JOB_PREFIX="xgboost-titanic-classifier-pkg-ar"
JOB_NAME=f"{JOB_PREFIX}-{datetime.now()}"
BUCKET_NAME="gs://vertexai_staging_bucket"

if not TRAIN_NGPU or TRAIN_NGPU < 2:
    TRAIN_STRATEGY = "single"
else:
    TRAIN_STRATEGY = "mirror"

CMDARGS = [
]

# Prepare your custom code to use a VertexAI managed dataset

## You need to rewrite your main function so that it reads the input data paths from environment variables

In [50]:
%%writefile ../trainer/main2.py
import argparse
import logging
import os
import sys
import traceback
from datetime import date

from loguru import logger
from six.moves import cPickle as pickle
from tensorflow.io import gfile

basepath = os.path.dirname(__file__)
trainer_path = os.path.abspath(os.path.join(basepath, ".."))
sys.path.append(trainer_path)

from trainer.model import Model

# This code does just the training of the model
# There is a seperate Vertex AI batch prediction job that handles
# Predicing on the test dataset
# Read environmental variables

training_data_uri = os.environ["AIP_TRAINING_DATA_URI"]
validation_data_uri = os.environ["AIP_VALIDATION_DATA_URI"]
test_data_uri = os.environ["AIP_TEST_DATA_URI"]
output_uri = os.environ["AIP_MODEL_DIR"]

def train_model(config):
    # Our model here is actually a class object
    # The Object holds all of the necessary functionalities to train a
    # XGBoost model eg. loading the training data is handled there
    try:
        model = Model()
        model.train(config)
        logger.info("Training job completed succesfully!")
    except Exception as e:
        # Write out an error file. This will be returned as the failureReason in the
        # DescribeTrainingJob result.
        trc = traceback.format_exc()
        # Printing this causes the exception to be in the training job logs, as well.
        logger.info("Exception during task: " +
                    str(e) + "\n" + trc, file=sys.stderr)
        # A non-zero exit code causes the training job to be marked as Failed.
        sys.exit(255)


if __name__ == "__main__":
    # Path for the local dummy data.
    # Normally input data is read from a cloud storage bucket
    # and output/results are also written to a bucket
    dummy_data_output_path = os.path.abspath(
        os.path.join(basepath, "..", "outputs")
    )
    dummy_data_input_path = os.path.abspath(
        os.path.join(basepath, "..", "inputs")
    )
    # Parse the command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--data_input_path",
        type=str,
        default=training_data_uri,
        help="Data input (bucket) location.",
    )
    parser.add_argument(
        "--trainer_output_path",
        type=str,
        default=output_uri,
        help="Trainer code Output/result (bucket) location.",
    )
    options, args = parser.parse_known_args()
    # Set logging level to ERROR by default, change to DEBUG for more robust logging
    logging.basicConfig(format="%(levelname)s:%(message)s",
                        level=logging.ERROR)
    train_model(options)


Overwriting ../trainer/main2.py


In [50]:
import os

In [51]:
!pwd

/home/jupyter/titanic_classifier/notebooks


# Run custom model training job in VertexAI

In [85]:
TRAIN_IMAGE = "europe-docker.pkg.dev/vertex-ai/training/xgboost-cpu.1-1:latest"
DEPLOY_IMAGE = "europe-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-1:latest"

In [86]:
job = aiplatform.CustomPythonPackageTrainingJob(
    display_name=JOB_NAME,
    python_package_gcs_uri="gs://vertex_ai_demos1/vertexai_pipelines/titanic_classifier/trainer_pkg/trainer2-0.1.tar.gz",
    python_module_name="trainer2.main",
    container_uri=TRAIN_IMAGE,
    model_serving_container_image_uri=DEPLOY_IMAGE
)

In [87]:
MODEL_DISPLAY_NAME = "titanic-classiciation-custom-training-" + str(datetime.now())

In [88]:
model = job.run(
        dataset=dataset,
        model_display_name=MODEL_DISPLAY_NAME,
        #bigquery_destination="bq://tii-sac-platform-sandbox-alpha:vertex_ai_demos",
        #gcs_destination="gs://vertex_ai_demos1/datasets/titanic",
        #args=CMDARGS,
        replica_count=1,
        machine_type=TRAIN_COMPUTE,
        accelerator_count=0,
    )

INFO:google.cloud.aiplatform.training_jobs:Training Output directory:
gs://vertexai_staging_bucket/aiplatform-custom-training-2022-01-05-09:04:32.273 
INFO:google.cloud.aiplatform.training_jobs:No dataset split provided. The service will use a default split.
INFO:google.cloud.aiplatform.training_jobs:View Training:
https://console.cloud.google.com/ai/platform/locations/europe-west4/training/5534785403573239808?project=427665163432
INFO:google.cloud.aiplatform.training_jobs:CustomPythonPackageTrainingJob projects/427665163432/locations/europe-west4/trainingPipelines/5534785403573239808 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:CustomPythonPackageTrainingJob projects/427665163432/locations/europe-west4/trainingPipelines/5534785403573239808 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.training_jobs:CustomPythonPackageTrainingJob projects/427665163432/locations/europe-west4/trainingPipelines/5534785403

In [91]:
model.name

'5811983280051847168'

# Get Batch Predictions with the model

In [92]:
from google.protobuf.json_format import MessageToJson, ParseDict
from google.protobuf.struct_pb2 import Struct, Value

In [93]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
REGION = "europe-west4"
PROJECT_ID = "tii-sac-platform-sandbox-alpha"
# API Endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

# Vertex AI location root path for your dataset, model and endpoint resources
PARENT = "projects/" + PROJECT_ID + "/locations/" + REGION

In [94]:
# client options same for all services
client_options = {"api_endpoint": API_ENDPOINT}


def create_model_client():
    client = aip.ModelServiceClient(client_options=client_options)
    return client


def create_endpoint_client():
    client = aip.EndpointServiceClient(client_options=client_options)
    return client


def create_prediction_client():
    client = aip.PredictionServiceClient(client_options=client_options)
    return client


def create_job_client():
    client = aip.JobServiceClient(client_options=client_options)
    return client

In [95]:
clients = {}
clients["model"] = create_model_client()
clients["endpoint"] = create_endpoint_client()
clients["prediction"] = create_prediction_client()
clients["job"] = create_job_client()

for client in clients.items():
    print(client)

('model', <google.cloud.aiplatform_v1.services.model_service.client.ModelServiceClient object at 0x7f6b947ca950>)
('endpoint', <google.cloud.aiplatform_v1.services.endpoint_service.client.EndpointServiceClient object at 0x7f6b947cac50>)
('prediction', <google.cloud.aiplatform_v1.services.prediction_service.client.PredictionServiceClient object at 0x7f6b947ca9d0>)
('job', <google.cloud.aiplatform_v1.services.job_service.client.JobServiceClient object at 0x7f6b947e6510>)


In [96]:
DEPLOY_IMAGE = "gcr.io/cloud-aiplatform/prediction/xgboost-cpu.1-1:latest"
model_artifact_dir = "gs://vertexai_staging_bucket/aiplatform-custom-training-2022-01-05-09:04:32.273/model"

model = {
    "display_name": "custom_job_XGB" + TIMESTAMP,
    "artifact_uri": model_artifact_dir,
    "container_spec": {"image_uri": DEPLOY_IMAGE, "ports": [{"container_port": 8080}]},
}

print(MessageToJson(aip.UploadModelRequest(parent=PARENT, model=model).__dict__["_pb"]))

{
  "parent": "projects/tii-sac-platform-sandbox-alpha/locations/europe-west4",
  "model": {
    "displayName": "custom_job_XGB20220105091427",
    "containerSpec": {
      "imageUri": "gcr.io/cloud-aiplatform/prediction/xgboost-cpu.1-1:latest",
      "ports": [
        {
          "containerPort": 8080
        }
      ]
    },
    "artifactUri": "gs://vertexai_staging_bucket/aiplatform-custom-training-2022-01-05-09:04:32.273/model"
  }
}


In [97]:
request = clients["model"].upload_model(parent=PARENT, model=model)

In [98]:
result = request.result()

print(MessageToJson(result.__dict__["_pb"]))

{
  "model": "projects/427665163432/locations/europe-west4/models/3506140270838153216"
}


In [99]:
# The full unique ID for the model version
model_id = result.model
print(model_id)

projects/427665163432/locations/europe-west4/models/3506140270838153216


# Do a batch prediction with the deployed model

The dataset used for batch predictions needs to preprocessed.

So it's good to test that your dataset works before submitting a batch prediction job.

In [163]:
import pandas as pd
test = pd.read_csv('../inputs/preprocessed_test.csv')
test.head()

Unnamed: 0.1,Unnamed: 0,Survived,Sex,Age,SibSp,Parch,Fare,Fsize,Single,SmallF,...,T_STONO,T_STONO2,T_STONOQ,T_SWPP,T_WC,T_WEP,T_X,Pc_1,Pc_2,Pc_3
0,881,2,0,39.0,0,0,0.0,1,1,0,...,0,0,0,0,0,0,1,1,0,0
1,882,2,0,49.0,0,0,0.0,1,1,0,...,0,0,0,0,0,0,1,1,0,0
2,883,2,0,46.0,0,0,3.258097,1,1,0,...,0,0,0,0,0,0,1,1,0,0
3,884,2,0,49.0,0,0,3.258097,1,1,0,...,0,0,0,0,0,0,1,1,0,0
4,885,2,0,25.0,0,0,3.258097,1,1,0,...,0,0,0,0,0,0,1,1,0,0


In [164]:
test.drop('Unnamed: 0', axis=1, inplace=True)
y_test = test.Survived.values
test.drop('Survived', axis=1, inplace=True)
test.head(1)

Unnamed: 0,Sex,Age,SibSp,Parch,Fare,Fsize,Single,SmallF,MedF,LargeF,...,T_STONO,T_STONO2,T_STONOQ,T_SWPP,T_WC,T_WEP,T_X,Pc_1,Pc_2,Pc_3
0,0,39.0,0,0,0.0,1,1,0,0,0,...,0,0,0,0,0,0,1,1,0,0


In [165]:
single_vector = test.head(1).values
single_vector

array([[ 0., 39.,  0.,  0.,  0.,  1.,  1.,  0.,  0.,  0.,  0.,  0.,  1.,
         0.,  0.,  0.,  1.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  1.,
         0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
         0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
         0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  1.,  1.,  0.,
         0.]])

In [159]:
import xgboost as xgb
bst = xgb.Booster()  # init model
bst.load_model('../notebooks/model.bst')  # load data

In [160]:
import numpy as np
test_single = np.array(single_vector)
test_DMatrix = xgb.DMatrix(np.vstack(test_single))

In [162]:
bst.predict(test_DMatrix)

array([0.49629012], dtype=float32)

In [171]:
test.to_csv('../inputs/dropped_f_test.csv')

In [None]:
# write test data as json to a bucket
import json

import tensorflow as tf

gcs_input_uri = "gs://vertexai_batch_prediction_results/titanic/test.jsonl"
with tf.io.gfile.GFile(gcs_input_uri, "w") as f:
    for i in test.values:
        f.write(str(i) + "\n")

#! gsutil cat $gcs_input_uri

In [168]:
with open('../inputs/test.jsonl', "w") as f:
    for i in test.values:
        f.write(str(i) + "\n")

In [170]:
with open('../inputs/test.jsonl', "rb") as f:
    data = [json.loads(line) for line in f]
print(data)

JSONDecodeError: Expecting ',' delimiter: line 1 column 4 (char 3)

In [113]:
gcs_input_uri = "gs://vertex_ai_demos1/datasets/titanic/test.jsonl"
gcs_input_uri

'gs://vertex_ai_demos1/datasets/titanic/test.jsonl'

In [114]:
model_id = "projects/427665163432/locations/europe-west4/models/5811983280051847168"
model_parameters = Value(
    struct_value=Struct(
        fields={
            "confidence_threshold": Value(number_value=0.5),
            "max_predictions": Value(number_value=10000.0),
        }
    )
)
gcs_input_uri = ""
batch_prediction_job = {
    "display_name": "custom_job_XGB" + TIMESTAMP,
    "model": model_id,
    "input_config": {
        "instances_format": "jsonl",
        "gcs_source": {"uris": ["gs://vertexai_batch_prediction_results/titanic/test.jsonl"]},
    },
    "model_parameters": model_parameters,
    "output_config": {
        "predictions_format": "jsonl",
        "gcs_destination": {
            "output_uri_prefix": "gs://vertexai_batch_prediction_results/titanic"
        },
    },
    "dedicated_resources": {
        "machine_spec": {"machine_type": "n1-standard-2"},
        "starting_replica_count": 1,
        "max_replica_count": 1,
    },
}

print(
    MessageToJson(
        aip.CreateBatchPredictionJobRequest(
            parent=PARENT, batch_prediction_job=batch_prediction_job
        ).__dict__["_pb"]
    )
)

{
  "parent": "projects/tii-sac-platform-sandbox-alpha/locations/europe-west4",
  "batchPredictionJob": {
    "displayName": "custom_job_XGB20220105091427",
    "model": "projects/427665163432/locations/europe-west4/models/5811983280051847168",
    "inputConfig": {
      "instancesFormat": "jsonl",
      "gcsSource": {
        "uris": [
          "gs://vertexai_batch_prediction_results/titanic/test.jsonl"
        ]
      }
    },
    "modelParameters": {
      "max_predictions": 10000.0,
      "confidence_threshold": 0.5
    },
    "outputConfig": {
      "predictionsFormat": "jsonl",
      "gcsDestination": {
        "outputUriPrefix": "gs://vertexai_batch_prediction_results/titanic"
      }
    },
    "dedicatedResources": {
      "machineSpec": {
        "machineType": "n1-standard-2"
      },
      "startingReplicaCount": 1,
      "maxReplicaCount": 1
    }
  }
}


In [115]:
request = clients["job"].create_batch_prediction_job(
    parent=PARENT, batch_prediction_job=batch_prediction_job
)

In [116]:
print(MessageToJson(request.__dict__["_pb"]))

{
  "name": "projects/427665163432/locations/europe-west4/batchPredictionJobs/4440621800354742272",
  "displayName": "custom_job_XGB20220105091427",
  "model": "projects/427665163432/locations/europe-west4/models/5811983280051847168",
  "inputConfig": {
    "instancesFormat": "jsonl",
    "gcsSource": {
      "uris": [
        "gs://vertexai_batch_prediction_results/titanic/test.jsonl"
      ]
    }
  },
  "modelParameters": {
    "max_predictions": 10000.0,
    "confidence_threshold": 0.5
  },
  "outputConfig": {
    "predictionsFormat": "jsonl",
    "gcsDestination": {
      "outputUriPrefix": "gs://vertexai_batch_prediction_results/titanic"
    }
  },
  "dedicatedResources": {
    "machineSpec": {
      "machineType": "n1-standard-2"
    },
    "startingReplicaCount": 1,
    "maxReplicaCount": 1
  },
  "manualBatchTuningParameters": {},
  "state": "JOB_STATE_PENDING",
  "createTime": "2022-01-05T12:33:17.872385Z",
  "updateTime": "2022-01-05T12:33:17.872385Z"
}


In [117]:
# The fully qualified ID for the batch job
batch_job_id = request.name
# The short numeric ID for the batch job
batch_job_short_id = batch_job_id.split("/")[-1]

print(batch_job_id)

projects/427665163432/locations/europe-west4/batchPredictionJobs/4440621800354742272


In [118]:
request = clients["job"].get_batch_prediction_job(name=batch_job_id)
print(MessageToJson(request.__dict__["_pb"]))

{
  "name": "projects/427665163432/locations/europe-west4/batchPredictionJobs/4440621800354742272",
  "displayName": "custom_job_XGB20220105091427",
  "model": "projects/427665163432/locations/europe-west4/models/5811983280051847168",
  "inputConfig": {
    "instancesFormat": "jsonl",
    "gcsSource": {
      "uris": [
        "gs://vertexai_batch_prediction_results/titanic/test.jsonl"
      ]
    }
  },
  "modelParameters": {
    "max_predictions": 10000.0,
    "confidence_threshold": 0.5
  },
  "outputConfig": {
    "predictionsFormat": "jsonl",
    "gcsDestination": {
      "outputUriPrefix": "gs://vertexai_batch_prediction_results/titanic"
    }
  },
  "dedicatedResources": {
    "machineSpec": {
      "machineType": "n1-standard-2"
    },
    "startingReplicaCount": 1,
    "maxReplicaCount": 1
  },
  "manualBatchTuningParameters": {},
  "state": "JOB_STATE_RUNNING",
  "createTime": "2022-01-05T12:33:17.872385Z",
  "startTime": "2022-01-05T12:33:17.899468Z",
  "updateTime": "2022-0

In [119]:
import time
def get_latest_predictions(gcs_out_dir):
    """ Get the latest prediction subfolder using the timestamp in the subfolder name"""
    folders = !gsutil ls $gcs_out_dir
    latest = ""
    for folder in folders:
        subfolder = folder.split("/")[-2]
        if subfolder.startswith("prediction-"):
            if subfolder > latest:
                latest = folder[:-1]
    return latest


while True:
    response = clients["job"].get_batch_prediction_job(name=batch_job_id)
    if response.state != aip.JobState.JOB_STATE_SUCCEEDED:
        print("The job has not completed:", response.state)
        if response.state == aip.JobState.JOB_STATE_FAILED:
            break
    else:
        folder = get_latest_predictions(
            response.output_config.gcs_destination.output_uri_prefix
        )
        ! gsutil ls $folder/prediction*

        ! gsutil cat -h $folder/prediction*
        break
    time.sleep(60)

The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: JobState.JOB_STATE_RUNNING
The job has not completed: J

KeyboardInterrupt: 

In [None]:
DEPLOYED_NAME = "titanic_classifier_deployed-" + TIMESTAMP

TRAFFIC_SPLIT = {"0": 100}

MIN_NODES = 1
MAX_NODES = 1

if not DEPLOY_GPU:
    endpoint = model.deploy(
        deployed_model_display_name=DEPLOYED_NAME,
        traffic_split=TRAFFIC_SPLIT,
        machine_type=DEPLOY_COMPUTE,
        #accelerator_type=DEPLOY_GPU.name,
        #accelerator_count=DEPLOY_NGPU,
        min_replica_count=MIN_NODES,
        max_replica_count=MAX_NODES,
    )
else:
    endpoint = model.deploy(
        deployed_model_display_name=DEPLOYED_NAME,
        traffic_split=TRAFFIC_SPLIT,
        machine_type=DEPLOY_COMPUTE,
        #accelerator_type=DEPLOY_COMPUTE.name,
        #accelerator_count=0,
        min_replica_count=MIN_NODES,
        max_replica_count=MAX_NODES,
    )