## How to deploy a BQML prediction endpoint in Kubeflow (KF Serving)

This is based on:
* https://medium.com/google-cloud/using-bigquery-and-bigquery-ml-from-kubeflow-pipelines-991a2fa4bea8
    * To see how to run BQ from KFP
* https://towardsdatascience.com/how-to-do-online-prediction-with-bigquery-ml-db2248c0ae5
    * To see how to extract model weights and parametes to run the inference
* https://medium.com/@spryd/getting-started-with-bigquery-machine-learning-bqml-564264af0adc
    * Model idea

**New stuff:**
* Component to return a pandas df based on a BQ query - train model in BQ from KFP and export model HPs to files
* Wrapped the online prediction code from Lak in a container by launching kfserving.KFServer
* Deploy an online KFServing using a custom model Spec
* We get a nice model endpoint with a lot of serving features such canary rollouts, autoscaling ...

In [None]:
!pip install --user kfp

In [6]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from typing import NamedTuple
import json
import os
from kfp.components import InputPath, OutputPath

This is for an KFP deployed on GKE, adapt your enviroment

In [7]:
PROJECT_ID='' 
KFPHOST=''
CLIENT_ID = ''
OTHER_CLIENT_ID = ''
OTHER_CLIENT_SECRET = ''
NAMESPACE = 'kubeflow-velascoluis'

In [8]:
!bq show mlpatterns || bq mk mlpatterns

Dataset velascoluis-test:mlpatterns

   Last modified                              ACLs                             Labels  
 ----------------- ---------------------------------------------------------- -------- 
  21 Apr 06:55:06   Owners:                                                            
                      kf10rc4-user@velascoluis-test.iam.gserviceaccount.com,           
                      projectOwners                                                    
                    Writers:                                                           
                      projectWriters                                                   
                    Readers:                                                           
                      projectReaders                                                   



The inference code is located in an idependent container running the following code:
```
file:model.py
import kfserving
from typing import List, Dict
import pandas as pd
class KFServingBQPredict(kfserving.KFModel):
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.ready = False
    def predict(self, request: Dict) -> Dict:
        values = request["instances"][0]
        inputs = pd.DataFrame(values).T
        inputs.columns = ["pageviews","timeOnSite","isNewVisit","isMobile","isDesktop","isPaidTraffic"]
        numeric_weights = pd.read_csv("numeric.csv").drop(['Unnamed: 0'], axis=1)
        scaling_df = pd.read_csv("scaling.csv").drop(['Unnamed: 0'], axis=1)
        categorical_weights = pd.read_csv("categorical.csv").drop(['Unnamed: 0'], axis=1)
        pred = 0
        for column_name in numeric_weights['input'].unique():
            print(column_name)
            wt = numeric_weights[numeric_weights['input'] == column_name]['input_weight'].values[0]
            if column_name != '__INTERCEPT__':
                print("in loop")
                meanv = scaling_df[scaling_df['input'] == column_name]['mean'].values[0]
                stddev = scaling_df[scaling_df['input'] == column_name]['stddev'].values[0]
                scaled_value = (inputs[column_name] - meanv) / stddev
            else:
                scaled_value = 1.0
        contrib = wt * scaled_value
        pred = pred + contrib
        # categorical inputs
        for column_name in categorical_weights['input'].unique():
            category_weights = categorical_weights[categorical_weights['input'] == column_name]
            wt = \
            category_weights[category_weights['category_name'] == inputs[column_name]]['category_weight'].values[0]
            pred = pred + wt
        return {"predictions": pred}

if __name__ == "__main__":
    model = KFServingBQPredict("kfserving-custom-model")
    kfserving.KFServer(workers=1).start([model])
```

Once we have the code we build a container with the following spec:

```
FROM python:3.7-slim
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY requirements.txt ./
RUN pip install --no-cache-dir -r ./requirements.txt
COPY src/*   ./
CMD ["python", "model.py"]
```

In [10]:
def run_bigquery_ddl(project_id: str, query_string: str, location: str) -> NamedTuple(
    'DDLOutput', [('created_table', str), ('query', str)]):
    """
    Runs BigQuery query and returns a table/model name
    """
    print(query_string)
        
    from google.cloud import bigquery
    from google.api_core.future import polling
    from google.cloud import bigquery
    from google.cloud.bigquery import retry as bq_retry
    
    bqclient = bigquery.Client(project=project_id, location=location)
    job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
    job._retry = polling.DEFAULT_RETRY
    
    while job.running():
        from time import sleep
        sleep(0.1)
        print('Running ...')
        
    tblname = job.ddl_target_table
    print('tblname:{}'.format(tblname))
    tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
    print('{} created in {}'.format(tblname, job.ended - job.started))
    
    from collections import namedtuple
    result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
    return result_tuple(tblname, query_string)


def run_bigquery_sql_pandas(project_id: str, query_string: str, location: str, df_output_path: OutputPath(str)):
    """
    Runs BigQuery query and returns a pandas dataframe
    """
    print(query_string)
        
    from google.cloud import bigquery
   
    bqclient = bigquery.Client(project=project_id, location=location)
    df = bqclient.query(query_string).to_dataframe()
    df.to_csv (df_output_path)
    


def train_regression_model(ddlop, project_id):
    query = """CREATE OR REPLACE MODEL mlpatterns.buyer_predictor
            OPTIONS(model_type='logistic_reg',
            input_label_cols=['isBuyer'])
            AS
            SELECT
             IF(totals.transactions IS NULL, 0, 1) AS isBuyer,
             IFNULL(totals.pageviews, 0) AS pageviews,
             IFNULL(totals.timeOnSite, 0) AS timeOnSite,
             IFNULL(totals.newVisits, 0) AS isNewVisit,
             IF(device.deviceCategory = 'mobile', 1, 0) AS isMobile,
             IF(device.deviceCategory = 'desktop', 1, 0) AS isDesktop,
             IF(trafficSource.medium in ('affiliate', 'cpc', 'cpm'), 1, 0) AS isPaidTraffic
            FROM
             `bigquery-public-data.google_analytics_sample.ga_sessions_*`
            WHERE
             _TABLE_SUFFIX BETWEEN '20160801' AND '20170630'
 """    
    print(query)
    return ddlop(project_id, query, 'US')




def export_model_weights_numeric(sqlpandasop, project_id,model):
    query = """
        SELECT
          processed_input AS input,
          model.weight AS input_weight
        FROM
          ml.WEIGHTS(MODEL {0}) AS model
    """.format(model)
    print(query)
    return sqlpandasop(project_id, query, 'US')

def export_model_weights_scaling(sqlpandasop, project_id,model):
    query = """
        SELECT
          input, min, max, mean, stddev
        FROM
          ml.FEATURE_INFO(MODEL {0}) AS model
    """.format(model)
    print(query)
    return sqlpandasop(project_id, query, 'US')



def export_model_weights_categorical(sqlpandasop, project_id,model):
    query = """
    SELECT
      processed_input AS input,
      model.weight AS input_weight,
      category.category AS category_name,
      category.weight AS category_weight
    FROM
      ml.WEIGHTS(MODEL {0}) AS model,
  UNNEST(category_weights) AS category
    """.format(model)
    print(query)
    return sqlpandasop(project_id, query, 'US')



def deploy_model_kfserving():
    
    from kubernetes import client
    from kubernetes.client import V1Container
    from kfserving import KFServingClient
    from kfserving import constants
    from kfserving import utils
    from kfserving import V1alpha2EndpointSpec
    from kfserving import V1alpha2PredictorSpec
    from kfserving import V1alpha2InferenceServiceSpec
    from kfserving import V1alpha2InferenceService
    from kfserving import V1alpha2CustomSpec

    GCR_NAME="velascoluis-test"    
    api_version = constants.KFSERVING_GROUP + '/' + constants.KFSERVING_VERSION
    inference_service_name = "kfserving-custom-model"
    namespace = utils.get_default_target_namespace()
     
    api_version = constants.KFSERVING_GROUP + '/' + constants.KFSERVING_VERSION

    default_endpoint_spec = V1alpha2EndpointSpec(
                          predictor=V1alpha2PredictorSpec(
                              custom=V1alpha2CustomSpec(
                                  #This is the custom model a v1 k8s Container
                                  container=V1Container(
                                      name="kfserving-custom-model",
                                      image="gcr.io/velascoluis-test/kfserving-custom-model:latest"))))

    isvc = V1alpha2InferenceService(api_version=api_version,
                          kind=constants.KFSERVING_KIND,
                          metadata=client.V1ObjectMeta(
                              name=inference_service_name,
                              annotations=
                                            {
                                                'sidecar.istio.io/inject': 'false',
                                                'autoscaling.knative.dev/target': '1'
                                            },
                              namespace=namespace),
                          spec=V1alpha2InferenceServiceSpec(default=default_endpoint_spec))
    KFServing = KFServingClient()
    KFServing.create(isvc)
    
    

@dsl.pipeline(
    name='Cascade pipeline',
    description='Cascade pipeline'
)
def cascade_pipeline(
    project_id = PROJECT_ID
):
    #operations
    ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])
    sqlpandasop =   comp.func_to_container_op(run_bigquery_sql_pandas, packages_to_install=['google-cloud-bigquery', 'pandas']) 
    deployop = comp.func_to_container_op(deploy_model_kfserving, packages_to_install=['kfserving', 'kubernetes']) 
    
    #pipeline
    c1 = train_regression_model(ddlop, PROJECT_ID)
    c1_model_name = c1.outputs['created_table']
    export_wn_op = export_model_weights_numeric(sqlpandasop, PROJECT_ID, c1_model_name)
    export_ws_op = export_model_weights_scaling(sqlpandasop, PROJECT_ID, c1_model_name)
    export_wc_op = export_model_weights_categorical(sqlpandasop, PROJECT_ID, c1_model_name)
    deployer_kfserving_op = deployop().after(export_wc_op)
    
    

Then this pipelimne, build the model, export the metadata, then it deploys the pre-built image

In [11]:
pipeline_func = cascade_pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

#Specify pipeline argument values
arguments = {}

#Get or create an experiment and submit a pipeline run
client = kfp.Client(host=KFPHOST, client_id=CLIENT_ID, namespace=NAMESPACE, other_client_id=OTHER_CLIENT_ID,
                        other_client_secret=OTHER_CLIENT_SECRET)
experiment = client.create_experiment('cascade_experiment')

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

CREATE OR REPLACE MODEL mlpatterns.buyer_predictor
            OPTIONS(model_type='logistic_reg',
            input_label_cols=['isBuyer'])
            AS
            SELECT
             IF(totals.transactions IS NULL, 0, 1) AS isBuyer,
             IFNULL(totals.pageviews, 0) AS pageviews,
             IFNULL(totals.timeOnSite, 0) AS timeOnSite,
             IFNULL(totals.newVisits, 0) AS isNewVisit,
             IF(device.deviceCategory = 'mobile', 1, 0) AS isMobile,
             IF(device.deviceCategory = 'desktop', 1, 0) AS isDesktop,
             IF(trafficSource.medium in ('affiliate', 'cpc', 'cpm'), 1, 0) AS isPaidTraffic
            FROM
             `bigquery-public-data.google_analytics_sample.ga_sessions_*`
            WHERE
             _TABLE_SUFFIX BETWEEN '20160801' AND '20170630'
 

        SELECT
          processed_input AS input,
          model.weight AS input_weight
        FROM
          ml.WEIGHTS(MODEL {{pipelineparam:op=Run bigquery ddl;name=created_table}}) AS

Lets run some predictions now!

In [3]:
KFServing.get(inference_service_name, namespace=namespace, watch=True, timeout_seconds=120)

NAME                 READY      DEFAULT_TRAFFIC CANARY_TRAFFIC  URL                                               
kfserving-custom-... False                                                                                        
kfserving-custom-... False                                                                                        
kfserving-custom-... True       100                             http://kfserving-custom-model.kubeflow-velascol...


In [4]:
import json
import pandas as pd
CLUSTER_IP="34.76.151.35"
SERVICE_HOSTNAME="kfserving-custom-model.kubeflow-velascoluis.example.com"
data = {}
data = "{\"instances\":[[700, 1000,0,0,1,0]]}"
MODEL_NAME="kfserving-custom-model"
import requests
url = f"http://{CLUSTER_IP.strip()}/v1/models/{MODEL_NAME}:predict"
headers = {"Host": SERVICE_HOSTNAME.strip()}
result = requests.post(url, data=data, headers=headers)
print(result.content)

b'{"predictions": -4.328171424606712}'
