In [None]:
# !pip install google-cloud-aiplatform --user --upgrade

# Sklearn with Pandas - Custom Prediction Routine to get `.predict_proba()`

This is similar to [the other notebook](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage6/get_started_with_cpr.ipynb) except we will be using pandas and bigquery

Topics covered
* Training sklearn locally, deploying to endpoint
* Saving data as CSV and doing batch predict from GCS
* Loading data to BQ, using BQ magics
* Running a batch prediction from BQ to BQ

In [1]:
# !gsutil mb -l us-central1 gs://jsw-model-artifacts

Creating gs://jsw-model-artifacts/...


In [1]:
PROJECT_ID = 'wortz-project-352116' #SET THIS TO YOUR PROJECT ID
BUCKET = "gs://jsw-model-artifacts" #BE SURE TO gsutil mb -l <REGION> <LOG_BUCKET> to create the bucket on GCP
REGION = "us-central1"

In [2]:
# generate synthetic data
import pandas as pd
import numpy as np #for the random integer example

# set seed

np.random.seed(1234)

x = np.random.randint(0.0,100.0,size=(10,3))
y = np.random.binomial(1, .25, size=(10,1))
df = pd.DataFrame(np.append(x, y, axis=1),
              index=range(10,20),
              columns=['col1','col2','col3','label'],
              dtype='float64')

In [3]:
df

Unnamed: 0,col1,col2,col3,label
10,47.0,83.0,38.0,0.0
11,53.0,76.0,24.0,1.0
12,15.0,49.0,23.0,0.0
13,26.0,30.0,43.0,0.0
14,30.0,26.0,58.0,1.0
15,92.0,69.0,80.0,0.0
16,73.0,47.0,50.0,0.0
17,76.0,37.0,34.0,1.0
18,38.0,67.0,11.0,0.0
19,0.0,75.0,80.0,1.0


In [39]:
from sklearn.ensemble import RandomForestClassifier

# Set the model parameters. 
n_estimators = 100
max_depth = 6
max_features = 3

rf = RandomForestClassifier(n_estimators = n_estimators, max_depth = max_depth, max_features = max_features)
rf.fit(df[['col1', 'col2', 'col3']], df['label'])

In [40]:
import os
import pickle
import joblib

artifact_filename = 'model.joblib' #has to be joblib to work with CPR

# Save model artifact to local filesystem (doesn't persist)

joblib.dump(rf, artifact_filename)

['model.joblib']

#### Upload the model pipeline to gcs

In [6]:
! gsutil cp $artifact_filename $BUCKET/model/

Copying file://model.joblib [Content-Type=application/octet-stream]...
/ [1 files][ 83.9 KiB/ 83.9 KiB]                                                
Operation completed over 1 objects/83.9 KiB.                                     


## Create a generic sklearn container that returns `predict_proba`

https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage6/get_started_with_cpr.ipynb

**highly recommend reviewing this notebook first as it breaks down the custom predictor interface**

In [7]:
! mkdir src

mkdir: cannot create directory ‘src’: File exists


In [8]:
%%writefile src/requirements.txt
fastapi
uvicorn
scikit-learn
google-cloud-storage
google-cloud-aiplatform[prediction]

Overwriting src/requirements.txt


In [9]:
%%writefile src/predictor.py
import joblib
import numpy as np
import pickle

from google.cloud import storage
from google.cloud.aiplatform.prediction.predictor import Predictor
from google.cloud.aiplatform.utils import prediction_utils

import json

class CprPredictor(Predictor):
    
    def __init__(self):
        return
    
    def load(self, gcs_artifacts_uri: str):
        """Loads the preprocessor artifacts."""
        prediction_utils.download_model_artifacts(gcs_artifacts_uri)
        # gcs_client = storage.Client()
        # with open("model.joblib", 'wb') as gcs_model:
        #     gcs_client.download_blob_to_file(
        #         gcs_artifacts_uri + "/model.joblib", gcs_model
        #     )

        with open("model.joblib", "rb") as f:
            self._model = joblib.load("model.joblib")

    
    def predict(self, instances):
        outputs = self._model.predict_proba(instances) 
        outputs = [list(output) for output in outputs] #convert array to list
        return {'predictions': outputs}

Overwriting src/predictor.py


#### Build a custom handler

In [10]:
%%writefile src/handler.py

import csv
from io import StringIO
import json

from fastapi import Response

from google.cloud.aiplatform.prediction.handler import PredictionHandler

class CprHandler(PredictionHandler):
    """Default prediction handler for the prediction requests sent to the application."""

    async def handle(self, request):
        """Handles a prediction request."""
        request_body = await request.body()
        request_body_dict = json.loads(request_body)
        instances=request_body_dict["instances"]
        prediction_results = self._predictor.predict(instances)

        return Response(content=json.dumps(prediction_results))
     

Overwriting src/handler.py


### Build and push container to Artifact Registry
#### Build your container
To build a custom container, we also need to write an entrypoint of the image that starts the model server. However, with the Custom Prediction Routine feature, you don't need to write the entrypoint anymore. Vertex AI SDK will populate the entrypoint with the custom predictor you provide.

In [35]:
from google.cloud.aiplatform.prediction import LocalModel
from src.predictor import CprPredictor
from src.handler import CprHandler
# {import your predictor and handler}
REPOSITORY = "sklearn-preprocess"  # @param {type:"string"}
SERVER_IMAGE = "sklearn-cpr-preprocess"  # @param {type:"string"}
USER_SRC_DIR = 'src'

local_model = LocalModel.build_cpr_model(
    USER_SRC_DIR,
    f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{SERVER_IMAGE}",
    predictor=CprPredictor,
    handler=CprHandler,
    requirements_path=os.path.join(USER_SRC_DIR, "requirements.txt"),
)

  self.stdin = io.open(p2cwrite, 'wb', bufsize)
  self.stdout = io.open(c2pread, 'rb', bufsize)


### Test it out with a locally deployed endpoint
Need to generate credentials to test

In [13]:
local_model.get_serving_container_spec()

image_uri: "us-central1-docker.pkg.dev/wortz-project/sklearn-preprocess/sklearn-cpr-preprocess"
predict_route: "/predict"
health_route: "/health"

In [14]:
# ! gcloud services enable iam.googleapis.com

#### Only run once to generate creds

In [22]:
# ! gcloud iam service-accounts keys create credentials.json --iam-account=633325234048-compute@developer.gserviceaccount.com
CREDENTIALS_FILE = "/home/jupyter/.config/gcloud/application_default_credentials.json"

## Create example instances

In [23]:
INPUT_FILE = "instances.json"

In [24]:
%%writefile $INPUT_FILE
{
    "instances": [
        [61.7, 11.1, 41.7],
        [41.6, 31.1, 11.5]
    ]
}

Overwriting instances.json


In [25]:
with local_model.deploy_to_local_endpoint(
    artifact_uri=f".",
    credential_path=CREDENTIALS_FILE,  # Update this to the path to your credentials.
) as local_endpoint:
    predict_response = local_endpoint.predict(
        request_file=INPUT_FILE,
        headers={"Content-Type": "application/json"},
    )
    
    health_check_response = local_endpoint.run_health_check()

## Local results should show a n x 2 shaped return for binomial classification

In [26]:
predict_response.content

b'{"predictions": [[0.37, 0.63], [0.67, 0.33]]}'

### Create a repository to house your artifacts / images

In [44]:
# !gcloud services enable artifactregistry.googleapis.com

In [42]:
! gcloud artifacts repositories create {REPOSITORY} \
    --repository-format=docker \
    --location=$REGION

[1;31mERROR:[0m (gcloud.artifacts.repositories.create) ALREADY_EXISTS: the repository already exists


In [27]:
! gcloud auth configure-docker {REGION}-docker.pkg.dev --quiet


{
  "credHelpers": {
    "gcr.io": "gcloud",
    "us.gcr.io": "gcloud",
    "eu.gcr.io": "gcloud",
    "asia.gcr.io": "gcloud",
    "staging-k8s.gcr.io": "gcloud",
    "marketplace.gcr.io": "gcloud",
    "us-central1-docker.pkg.dev": "gcloud"
  }
}
Adding credentials for: us-central1-docker.pkg.dev
gcloud credential helpers already registered correctly.


## Upload the model to Vertex using new Prediction Route Serving Container

In [36]:
local_model.push_image() #push to container registry

  self.stdin = io.open(p2cwrite, 'wb', bufsize)
  self.stdout = io.open(c2pread, 'rb', bufsize)


In [38]:
from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=REGION)

MODEL_DISPLAY_NAME = 'pandas test CLASSIFICATION'

model = aiplatform.Model.upload(
    local_model=local_model,
    display_name=MODEL_DISPLAY_NAME,
    artifact_uri=f"{BUCKET}/model",
)

Creating Model
Create Model backing LRO: projects/679926387543/locations/us-central1/models/8285204944361881600/operations/6779270319531098112
Model created. Resource name: projects/679926387543/locations/us-central1/models/8285204944361881600@1
To use this Model in another session:
model = aiplatform.Model('projects/679926387543/locations/us-central1/models/8285204944361881600@1')


## Batch predictions with GCS / CSV
### Now we will create a different dataframe to make predictions on for batch predictions

In [42]:
df2 = pd.DataFrame(np.random.randint(0.0,100.0,size=(10,3)), # we will do batch predictions based on this
              index=range(10,20),
              columns=['col1','col2','col3'],
              dtype='float64')
rf.predict_proba(df2[['col1','col2','col3']])

array([[0.77, 0.23],
       [0.38, 0.62],
       [0.72, 0.28],
       [0.76, 0.24],
       [0.7 , 0.3 ],
       [0.23, 0.77],
       [0.77, 0.23],
       [0.76, 0.24],
       [0.38, 0.62],
       [0.77, 0.23]])

### Expected output
From documentation:
```
array([[0.8 , 0.2 ],
       [0.38, 0.62],
       [0.61, 0.39],
       [0.65, 0.35],
       [0.56, 0.44],
       [0.63, 0.37],
       [0.55, 0.45],
       [0.43, 0.57],
       [0.43, 0.57],
       [0.38, 0.62]])
```

#### Regular predictions

In [46]:
endpoint = model.deploy(machine_type="n1-standard-4")

Creating Endpoint
Create Endpoint backing LRO: projects/679926387543/locations/us-central1/endpoints/8664558446177157120/operations/6869342312078508032
Endpoint created. Resource name: projects/679926387543/locations/us-central1/endpoints/8664558446177157120
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/679926387543/locations/us-central1/endpoints/8664558446177157120')
Deploying model to Endpoint : projects/679926387543/locations/us-central1/endpoints/8664558446177157120
Deploy Endpoint model backing LRO: projects/679926387543/locations/us-central1/endpoints/8664558446177157120/operations/6382953552322494464
BatchPredictionJob projects/679926387543/locations/us-central1/batchPredictionJobs/5729541692042772480 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/679926387543/locations/us-central1/batchPredictionJobs/5729541692042772480 current state:
JobState.JOB_STATE_RUNNING
Endpoint model deployed. Resource name: projects/679926387

In [49]:
df2.values

array([[13., 56., 48.],
       [58., 94., 35.],
       [57., 60., 83.],
       [ 9., 60., 50.],
       [51., 49., 71.],
       [81.,  4.,  3.],
       [88., 46., 94.],
       [47., 63., 84.],
       [ 5., 83., 72.],
       [72., 63., 56.]])

In [52]:
endpoint.predict(df2.values.tolist())

Prediction(predictions=[[0.79, 0.21], [0.49, 0.51], [0.62, 0.38], [0.79, 0.21], [0.61, 0.39], [0.31, 0.69], [0.73, 0.27], [0.62, 0.38], [0.46, 0.54], [0.67, 0.33]], deployed_model_id='3675493648817979392', model_version_id='1', model_resource_name='projects/679926387543/locations/us-central1/models/8285204944361881600', explanations=None)

#### Batch predictions

In [43]:
from google.cloud import storage
import csv

# save the csv with the header, no index
df2.to_csv('df2.csv', index=False)

data_directory = BUCKET + "/data"
storage_path = os.path.join(data_directory, 'df2.csv')
blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
blob.upload_from_filename("df2.csv")

In [44]:
batch_prediction_job = model.batch_predict(
        job_display_name='pandas batch predict job sklearn - VALUES JSON',
        gcs_source=storage_path,
        gcs_destination_prefix=BUCKET+"/predictions",
        machine_type='n1-standard-2',
        instances_format='csv', #This is key to parsing CSV input
        # accelerator_count=accelerator_count,
        # accelerator_type=accelerator_type, #if you want gpus
        starting_replica_count=1,
        max_replica_count=2,
        sync=False,
    )

Creating BatchPredictionJob
BatchPredictionJob created. Resource name: projects/679926387543/locations/us-central1/batchPredictionJobs/5729541692042772480
To use this BatchPredictionJob in another session:
bpj = aiplatform.BatchPredictionJob('projects/679926387543/locations/us-central1/batchPredictionJobs/5729541692042772480')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/5729541692042772480?project=679926387543
BatchPredictionJob projects/679926387543/locations/us-central1/batchPredictionJobs/5729541692042772480 current state:
JobState.JOB_STATE_PENDING
BatchPredictionJob projects/679926387543/locations/us-central1/batchPredictionJobs/5729541692042772480 current state:
JobState.JOB_STATE_PENDING
BatchPredictionJob projects/679926387543/locations/us-central1/batchPredictionJobs/5729541692042772480 current state:
JobState.JOB_STATE_RUNNING


### When successful you should see this
```
{"instance": [16.0, 64.0, 61.0], "prediction": [0.63, 0.37]}
{"instance": [83.0, 27.0, 87.0], "prediction": [0.35, 0.65]}
{"instance": [96.0, 83.0, 57.0], "prediction": [0.68, 0.32]}
{"instance": [11.0, 62.0, 17.0], "prediction": [0.89, 0.11]}
{"instance": [61.0, 28.0, 1.0], "prediction": [0.36, 0.64]}
```

## Batch Prediction with BQ

In [None]:
#!pip install pandas_gbq --user

## Create an empty dataset to house the tables

In [43]:
!bq --location=location mk \
--dataset \
--description "test dataset" \
--location "US" \
$PROJECT_ID:TEST

Dataset 'wortz-project:TEST' successfully created.


In [None]:
# Load the table to BQ and make Batch predictions
from pandas_gbq import to_gbq

df2.to_gbq(destination_table=f"{PROJECT_ID}.TEST.df2", project_id=PROJECT_ID)

## Bigquery magic comes available by default

In [131]:
%%bigquery
select * from TEST.df2

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 865.70query/s] 
Downloading: 100%|██████████| 10/10 [00:01<00:00,  7.21rows/s]


Unnamed: 0,col1,col2,col3
0,20.0,12.0,62.0
1,14.0,18.0,79.0
2,56.0,19.0,81.0
3,58.0,91.0,74.0
4,77.0,49.0,99.0
5,50.0,26.0,38.0
6,49.0,23.0,69.0
7,78.0,23.0,96.0
8,98.0,36.0,15.0
9,79.0,82.0,33.0


## Now run batch predicitons on this bq table

Note you have to have write permissions on the dataset - you may see a error if you don't

In [24]:
batch_prediction_job = model.batch_predict(
        job_display_name='bigquery batch predict job sklearn',
        bigquery_source=f"bq://{PROJECT_ID}.TEST.df2",
        bigquery_destination_prefix=f'bq://{PROJECT_ID}', #this will create a seperate dataset with predictions
        machine_type='n1-standard-2',
        # accelerator_count=accelerator_count,
        # accelerator_type=accelerator_type, #if you want gpus
        starting_replica_count=1,
        max_replica_count=2,
        sync=False,
    ) 

# Output table will look something like this:  wortz-project.prediction_pandas_test_2022_04_22T11_32_14_834Z.predictions_2022_04_22T11_32_14_834Z 

INFO:google.cloud.aiplatform.jobs:Creating BatchPredictionJob
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob created. Resource name: projects/633325234048/locations/us-central1/batchPredictionJobs/8652859092701806592
INFO:google.cloud.aiplatform.jobs:To use this BatchPredictionJob in another session:
INFO:google.cloud.aiplatform.jobs:bpj = aiplatform.BatchPredictionJob('projects/633325234048/locations/us-central1/batchPredictionJobs/8652859092701806592')
INFO:google.cloud.aiplatform.jobs:View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/8652859092701806592?project=633325234048
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/633325234048/locations/us-central1/batchPredictionJobs/8652859092701806592 current state:
JobState.JOB_STATE_RUNNING
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/633325234048/locations/us-central1/batchPredictionJobs/8652859092701806592 current state:
JobState.JOB_STAT

### Final section - deploy to endpoint

In [101]:
endpoint = model.deploy(machine_type="n1-standard-4")

INFO:google.cloud.aiplatform.models:Creating Endpoint
INFO:google.cloud.aiplatform.models:Create Endpoint backing LRO: projects/633325234048/locations/us-central1/endpoints/7875643460085088256/operations/7350682251878727680
INFO:google.cloud.aiplatform.models:Endpoint created. Resource name: projects/633325234048/locations/us-central1/endpoints/7875643460085088256
INFO:google.cloud.aiplatform.models:To use this Endpoint in another session:
INFO:google.cloud.aiplatform.models:endpoint = aiplatform.Endpoint('projects/633325234048/locations/us-central1/endpoints/7875643460085088256')
INFO:google.cloud.aiplatform.models:Deploying model to Endpoint : projects/633325234048/locations/us-central1/endpoints/7875643460085088256
INFO:google.cloud.aiplatform.models:Deploy Endpoint model backing LRO: projects/633325234048/locations/us-central1/endpoints/7875643460085088256/operations/4151437666585411584
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/633325234048/locations/us-central1

In [102]:
endpoint.predict(instances=[[6.7, 3.1, 4.7], [4.6, 3.1, 1.5]])

Prediction(predictions=[[0.65, 0.35], [0.65, 0.35]], deployed_model_id='6711621286084214784', explanations=None)

INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/633325234048/locations/us-central1/batchPredictionJobs/8388483720826322944 current state:
JobState.JOB_STATE_SUCCEEDED
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob run completed. Resource name: projects/633325234048/locations/us-central1/batchPredictionJobs/8388483720826322944
