<a href="https://colab.research.google.com/github/sakagarwal/Appetizers4Days/blob/master/%5BPrivate_Preview%5D_Feature_Transform_Engine_with_XGBoost_%26_SKLearn_Trainer_Pipelines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Introduction

Vertex AI Tabular Feature Transform Engine (or FTE for short) allows users to transform their structured data's input features in a consistent fashion across training and prediction.

In this notebook, we demonstrate the basic functionalities of FTE and how to use FTE to apply row-level transformations on raw input and integrate with custom trainers (e.g., Xgboost, sklearn-based trainers) during training and prediction.

The notebook is organized as follows:
- [Setup environment](#setup): Install depdendencies, authenticate, and configure GCP project used in the notebook.
- [Configure transformations](#configure-transforms): Configure feature transformations using FTE's builtin transformations or your own custom transform functions.
- [Train new model](#train-new-model): Define a training pipeline that use FTE for feature transformation and custom trainer (e.g., Xgboost, scikit-learn) for training a new model. Run the training pipeline on Vertex AI Pipelines.
- [Test the trained model](#online-prediction): Deploy the newly trained model using FTE-supported prediciton server on Vertex AI and perform online (or batch) prediction with raw input seamlessly.

<a name="setup"></a>
## Setup

### Install dependencies

In [None]:
# Depending on the environment, this might throw a
# pip dependency resolver error. Please ignore it.
!pip3 install -U google-cloud-aiplatform -q
!pip3 install -U google-cloud-pipeline-components --force-reinstall

In [None]:
# Restart the kernel after pip installs. This can take a minute. 
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [None]:
# Import required modules
import json
import sys
import os
import tempfile
import uuid
from google.cloud import aiplatform
from google_cloud_pipeline_components.experimental.automl.tabular import utils
from typing import Any, Dict, List, Union

import tensorflow as tf

from google_cloud_pipeline_components.experimental.automl import tabular
from google_cloud_pipeline_components.aiplatform import ModelUploadOp
from google_cloud_pipeline_components.aiplatform import ModelBatchPredictOp
from google_cloud_pipeline_components.types import artifact_types
from kfp.v2.components import importer_node
from kfp import components
from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import Dataset
from kfp.v2.dsl import Input
from kfp.v2.dsl import InputPath
from kfp.v2.dsl import Model
from kfp.v2.dsl import Metrics
from kfp.v2.dsl import Output
from kfp.v2.dsl import OutputPath
from kfp.v2.dsl import Artifact

### Configure your GCP project

In [None]:
GCP_PROJECT = "cloud-automl-tables" #@param {type:"string"}
GCP_REGION = "us-east1" #@param {type:"string"}

# All training related files (TF model checkpoint, saved_model, etc) will
# be saved to this GCS bucket. The pipeline will not clean up the files since
# some of them might be useful for you, please make sure to clean up them if
# needed.
GCP_BASE_PATH = "gs://pvnguyen-us-central1/mp_notebook" #@param {type:"string"}

### Authenticate your GCP account

In [None]:
if 'google.colab' in sys.modules:
  if 'USE_AUTH_EPHEM' in os.environ:
    # revert to the old colab authentication module to prevent a bug.
    del os.environ['USE_AUTH_EPHEM']
  from google.colab import auth as google_auth
  google_auth.authenticate_user()
  !gcloud config set project {GCP_PROJECT}

aiplatform.init(
    project=GCP_PROJECT,
    location=GCP_REGION
)

### Enable APIs (one time setup)

This is not required if the APIs are already enabled.

The code below enables the following APIs:
- Vertex AI
- Dataflow


In [None]:
!gcloud services enable aiplatform.googleapis.com 
!gcloud services enable dataflow.googleapis.com 

<a name="configure-transforms"></a>
## Feature transformation

Before configuring feature transformation, let's define some helper functions that will be used throughout the notebook and configure the input dataset:

### Define helper functions

In [None]:
def get_task_detail(task_details: List[Dict[str, Any]], task_name: str) -> List[Dict[str, Any]]:
  for task_detail in task_details:
    if task_detail.task_name == task_name:
      return task_detail

def get_model_name(custom_job_id):
  pipeline_task_details = aiplatform.PipelineJob.get(custom_job_id).gca_resource.job_detail.task_details
  upload_task_details = get_task_detail(pipeline_task_details, 'model-upload')
  return upload_task_details.outputs['model'].artifacts[0].metadata['resourceName']

def write_text_to_file(text, filepath):
  with tf.io.gfile.GFile(filepath, 'w') as f:
    f.write(text)

def write_instances_to_jsonl(instances, filepath):
  with tf.io.gfile.GFile(filepath, "w") as f:
    for instance in instances:
        f.write(json.dumps(instance) + "\n")

### Configure dataset

In [None]:
# CSV data source
csv_filenames = 'gs://automl-tables-us-central1-resources/dataset/safe_driver_small.csv'

features = ['id', 'ps_ind_01', 'ps_ind_02_cat', 'ps_ind_03', 'ps_ind_04_cat', 'ps_ind_05_cat', 'ps_ind_06_bin', 'ps_ind_07_bin', 'ps_ind_08_bin', 'ps_ind_09_bin', 'ps_ind_10_bin', 'ps_ind_11_bin', 'ps_ind_12_bin', 'ps_ind_13_bin', 'ps_ind_14', 'ps_ind_15', 'ps_ind_16_bin', 'ps_ind_17_bin', 'ps_ind_18_bin', 'ps_reg_01', 'ps_reg_02', 'ps_reg_03', 'ps_car_01_cat', 'ps_car_02_cat', 'ps_car_03_cat', 'ps_car_04_cat', 'ps_car_05_cat', 'ps_car_06_cat', 'ps_car_07_cat', 'ps_car_08_cat', 'ps_car_09_cat', 'ps_car_10_cat', 'ps_car_11_cat', 'ps_car_11', 'ps_car_12', 'ps_car_13', 'ps_car_14', 'ps_car_15', 'ps_calc_01', 'ps_calc_02', 'ps_calc_03', 'ps_calc_04', 'ps_calc_05', 'ps_calc_06', 'ps_calc_07', 'ps_calc_08', 'ps_calc_09', 'ps_calc_10', 'ps_calc_11', 'ps_calc_12', 'ps_calc_13', 'ps_calc_14', 'ps_calc_15_bin', 'ps_calc_16_bin', 'ps_calc_17_bin', 'ps_calc_18_bin', 'ps_calc_19_bin', 'ps_calc_20_bin']

### Configure feature transformation

Transformations can be specified using FTE specific configurations. In the following, we provide some sample transform configurations to demonstrate FTE's capabilities:
- Full auto transformations (i.e., `auto_transform_config`): FTE automatically configure a set of built-in transformations for each input column based on its data statistics. 
- Fully specified transformations (i.e., `no_auto_transform_config`): All transformations on input columns are explicitly specified with FTE's built-in transformations. Chaining of multiple transformations on a single column is also supported.
- Mix of auto and explicit transformations (i.e., `mixed_transform_config`).
- Custom transformations (i.e., `transform_config_with_custom_transform`): A mixture of auto and explicit transformations and custom, bring-your-own transform function, where users can define and import their own transform function and use it with FTE's built-in transformations.

In [None]:
auto_transform_config = {'auto_transforms': features}

no_auto_transform_config = {
    'transforms': [{
        'transform': 'ZScaleTransform',
        'input_column_names': ['ps_reg_01']
    }, {
        'transform': 'ZScaleTransform',
        'input_column_names': ['ps_reg_02']
    }, {
        'transform': 'ZScaleTransform',
        'input_column_names': ['ps_reg_03']
    }, {
        'transform': 'VocabularyTransform',
        'input_column_names': ['ps_ind_10_bin']
    }, {
        'transform': 'VocabularyTransform',
        'input_column_names': ['ps_ind_11_bin']
    }, {
        'transform': 'VocabularyTransform',
        'input_column_names': ['ps_ind_12_bin']
    }, {
        'transform': 'VocabularyTransform',
        'input_column_names': ['target'],
        'output_column_names': ['target']
    }]
}

mixed_transform_config = {
    'auto_transforms': ['ps_reg_01', 'ps_reg_02', 'ps_reg_03'],
    'transforms': [{
        'transform': 'VocabularyTransform',
        'input_column_names': ['ps_ind_10_bin']
    }, {
        'transform': 'VocabularyTransform',
        'input_column_names': ['ps_ind_11_bin']
    }, {
        'transform': 'VocabularyTransform',
        'input_column_names': ['ps_ind_12_bin']
    }]
}

"""
$gsutil cat gs://pvnguyen-us-central1/mp_notebook/custom_transform_fn.py
import tensorflow.compat.v1 as tf


def plus_one_transform(x: tf.SparseTensor) -> tf.SparseTensor:
  return tf.SparseTensor(x.indices, tf.add(x.values, 1), x.dense_shape)
"""
transform_config_with_custom_transform = {
    'auto_transforms': ['ps_reg_02', 'ps_reg_03'],
    'modules': [{
        'transform': 'PlusOneTransform',
        'module_path': 'gs://pvnguyen-us-central1/mp_notebook/custom_transform_fn.py',
        'function_name': 'plus_one_transform'
    }],
    'transforms': [{
        'transform': 'CastToFloatTransform',
        'input_column_names': ['ps_reg_01'],
        'output_column_names': ['ps_reg_01']
    },{
        'transform': 'PlusOneTransform',
        'input_column_names': ['ps_reg_01']
    }, {
        'transform': 'VocabularyTransform',
        'input_column_names': ['ps_ind_10_bin']
    }, {
        'transform': 'VocabularyTransform',
        'input_column_names': ['ps_ind_11_bin']
    }, {
        'transform': 'VocabularyTransform',
        'input_column_names': ['ps_ind_12_bin']
    }]
}

Or customize your own tranform function:

In [None]:
func_str = """
import tensorflow as tf


def plus_one_transform(x: tf.SparseTensor) -> tf.SparseTensor:
  return tf.SparseTensor(x.indices, tf.add(x.values, 1), x.dense_shape)
"""

write_text_to_file(func_str, os.path.join(GCP_BASE_PATH, 'custom_transform_fn.py'))

Additional transformations to try out and their sample configurations:

* `DatetimeTransform`:
``` python
# Outputs columns with granular datetime information (year, month, day, etc.).
{
    'transform': 'DatetimeTransform',
    'input_column_names': ['feature_1'],
    'time_format': '%Y-%m-%d'  # time format of input column
}
```

* `LogTransform`:
``` python
# Outputs a column of the element-wise, natural logarithm of our input.
{
    'transform': 'LogTransform',
    'input_column_names': ['feature_1']
}
```

* `ZScaleTransform`:
``` python
# Outputs a z-scale normallized input column.
{
    'transform': 'ZScaleTransform',
    'input_column_names': ['feature_1']
}
```

* `NGramTransform`:
``` python
# Outputs a column containing the vocab lookup incidies of n-grams in our
# input.
{
    'transform': 'NGramTransform',
    'input_column_names': ['feature_1'],
    'min_ngram_size': 1,  # min number of tokens in our n-gram
    'max_ngram_size': 2,  # max number of tokens in our n-gram
    'separator': ' '  # seperator between tokens
  }
```
* `ClipTransform`:
``` python
# Outputs a column where all values < min_value are assigned min_value
# and all columns > max_value are assigned max_value.
{
    'transform': 'ClipTransform',
    'input_column_names': ['col1'],
    'output_column_names': ['col1_clipped'],
    'min_value': 1.,
    'max_value': 10.,
}
```
* `MaxAbsScaleTransform`:
``` python
# Outputs a column where all input elements are divided by abs(max(input)).
{
    'transform': 'MaxAbsScaleTransform',
    'input_column_names': ['col1'],
    'output_column_names': ['col1_max_abs_scaled']
}
```

<a name="train-new-model"></a>
## Train new model

### Define training pipelines

In [None]:
@component(
    base_image='us-docker.pkg.dev/vertex-ai/automl-tabular/custom-trainer:20220629_2125_RC00'
)
def custom_train(
    materialized_train_data: InputPath('MaterializedSplit'),
    materialized_test_data: InputPath('MaterializedSplit'),
    training_schema: InputPath('TrainingSchema'),
    transform_output: InputPath('TransformOutput'),
    model_type: str,
    target_column: str, 
    metrics: Output[Metrics],
    model: Output[Model]
) -> str:
    import os
    import pandas as pd
    import pandas_tfrecords
    import tempfile
    import yaml
    import tensorflow as tf
    from google.cloud import storage
    from joblib import dump
    from pathlib import Path
    from sklearn.metrics import accuracy_score
    from sklearn.tree import DecisionTreeClassifier
    from xgboost import XGBClassifier

    def get_bucket_name_and_path(uri):
      no_prefix_uri = uri[len('gs://'):]
      splits = no_prefix_uri.split('/')
      return splits[0], '/'.join(splits[1:])

    def download_gcs_directory(source_uri, dest_dir):
      storage_client = storage.Client()

      source_bucket_name, path = get_bucket_name_and_path(source_uri)
      source_bucket = storage_client.get_bucket(source_bucket_name)

      for blob in source_bucket.list_blobs(prefix=path):
        if blob.name.endswith("/"):
            continue
        directory = "/".join(blob.name.split("/")[0:-1])
        directory = directory.replace(path, dest_dir)
        Path(directory).mkdir(parents=True, exist_ok=True)
        destination_uri = os.path.join(directory, blob.name.split("/")[-1]) 
        blob.download_to_filename(destination_uri)
    
    with open(materialized_train_data, 'r') as f:
      train_file_pattern = f.read()
    train_paths = tf.io.gfile.glob(train_file_pattern)
    copy_paths = []
    for fn in train_paths:
      tmp_path = tempfile.mkstemp()[1]
      tf.io.gfile.copy(fn, tmp_path, overwrite=True)
      copy_paths.append(tmp_path)
    
    df_train = pandas_tfrecords.tfrecords_to_pandas(copy_paths, schema=None, 
                                                    compression_type='GZIP', cast=True)

    y_train = df_train.pop(target_column).tolist()
    df_train = df_train.reindex(sorted(df_train.columns), axis=1)
    x_train = df_train.values.tolist()

    with open(materialized_test_data, 'r') as f:
      test_file_pattern = f.read()
    test_paths = tf.io.gfile.glob(test_file_pattern)
    copy_paths = []
    for fn in test_paths:
      tmp_path = tempfile.mkstemp()[1]
      tf.io.gfile.copy(fn, tmp_path, overwrite=True)
      copy_paths.append(tmp_path)
    
    df_test = pandas_tfrecords.tfrecords_to_pandas(copy_paths, schema=None, 
                                                   compression_type='GZIP', cast=True)

    y_test = df_test.pop(target_column).tolist()
    df_test = df_test.reindex(sorted(df_test.columns), axis=1)
    x_test = df_test.values.tolist()

    model_artifact_path = "/".join(model.path.split("/")[0:-1])
    if model_type == 'xgboost':
      xgbmodel = XGBClassifier(n_jobs=10, n_estimators=30)
      xgbmodel.fit(x_train, y_train)

      y_pred = xgbmodel.predict(x_test)
      predictions = [round(value) for value in y_pred]

      accuracy = accuracy_score(y_test, predictions)
      metrics.log_metric("accuracy",(accuracy * 100.0))
      metrics.log_metric("framework", "XGBoost")
      xgbmodel.save_model(model.path + f".bst")

    elif model_type == 'sklearn':
      skmodel = DecisionTreeClassifier()
      skmodel.fit(x_train,y_train)
      score = skmodel.score(x_test,y_test)
      metrics.log_metric("accuracy",(score * 100.0))
      metrics.log_metric("framework", "Scikit Learn")
      dump(skmodel, model.path + ".joblib")

    with open(transform_output, 'r') as f:
      transform_output_path = f.read()
    local_transform_artifact_dir = os.path.join(model_artifact_path, "transform")
    download_gcs_directory(transform_output_path, local_transform_artifact_dir)

    training_schema_filepath = os.path.join(model_artifact_path, "training_schema.yaml")
    tf.io.gfile.copy(training_schema, training_schema_filepath)
    return "/".join(model.uri.split("/")[0:-1])


@dsl.pipeline(name='training-pipeline-with-fte')
def training_pipeline_with_fte(
    project: str,
    location: str,
    root_dir: str,
    target_column_name: str,
    prediction_type: str,
    transform_config_path: str,
    model_type: str,
    training_fraction: float,
    validation_fraction: float,
    test_fraction: float,
    csv_filenames: str,
    weight_column_name: str = '',
    dataflow_use_public_ips: bool = False,
    dataflow_subnetwork: str = ''):
  """Defines training pipeline with feature transform engine component."""
  import json
  import os

  stats_and_example_gen_task = tabular.StatsAndExampleGenOp(
      project=project,
      location=location,
      target_column_name=target_column_name,
      weight_column_name=weight_column_name,
      prediction_type=prediction_type,
      transformations='[]',
      training_fraction=training_fraction,
      validation_fraction=validation_fraction,
      test_fraction=test_fraction,
      data_source_csv_filenames=csv_filenames,
      request_type='COLUMN_STATS_ONLY_NO_TRANSFORM',
      dataflow_use_public_ips=dataflow_use_public_ips,
      dataflow_subnetwork=dataflow_subnetwork,
      root_dir=root_dir)
  
  #  pylint: disable=no-value-for-parameter
  generate_analyze_and_transform_data_task = tabular.GenerateAnalyzeAndTransformDataOp(
      train_split=stats_and_example_gen_task.outputs['train_split'],
      eval_split=stats_and_example_gen_task.outputs['eval_split'],
      test_split=stats_and_example_gen_task.outputs['test_split'])

  fte_transform_configure_task = tabular.TransformConfigurationPlannerOp(
      project=project,
      location=location,
      root_dir=root_dir,
      analyze_data=generate_analyze_and_transform_data_task
      .outputs['analyze_data'],
      prediction_type=prediction_type,
      target_column=target_column_name,
      weight_column=weight_column_name,
      transform_config=transform_config_path,
      dataset_stats=stats_and_example_gen_task.outputs['dataset_stats'])

  fte_task = tabular.FeatureTransformEngineOp(
      project=project,
      location=location,
      root_dir=root_dir,
      analyze_data=generate_analyze_and_transform_data_task
      .outputs['analyze_data'],
      transform_data=generate_analyze_and_transform_data_task
      .outputs['transform_data'],
      transform_config=fte_transform_configure_task
      .outputs['fte_transform_configuration_path'],
      dataflow_use_public_ips=dataflow_use_public_ips,
      dataflow_subnetwork=dataflow_subnetwork
  ).set_cpu_limit('8').set_memory_limit('30G')
  
  split_materialized_data_task = tabular.SplitMaterializedDataOp(
        fte_task.outputs['materialized_data'])
  
  train_task = custom_train(
      materialized_train_data=split_materialized_data_task.outputs['materialized_train_split'],
      materialized_test_data=split_materialized_data_task.outputs['materialized_test_split'],
      training_schema=fte_transform_configure_task.outputs['training_schema'],
      transform_output=fte_task.outputs['transform_output'],
      target_column=target_column_name, model_type=model_type).set_cpu_limit('16').set_memory_limit('128G')
      
  importer_spec = importer_node.importer(
      artifact_uri=train_task.outputs['output'],
      artifact_class=artifact_types.UnmanagedContainerModel,
      metadata={
          "containerSpec": {
              "imageUri": "gcr.io/cloud-automl-tables/fte-prediction-server:2022_06_27_07_08_41",
              "predictRoute": "/predict",
              "healthRoute": "/health"
          }
      })

  ModelUploadOp(
      project=project,
      location=location,
      display_name='custom-trained-model-with-fte-{{$.pipeline_job_uuid}}-{{$.pipeline_task_uuid}}',
      unmanaged_container_model=importer_spec.outputs['artifact']
  )
 

### Setup training configuration

In [None]:
prediction_type = 'classification'
target_column = 'target'
model_type = 'xgboost'  # sklearn or xgboost

# Fraction split
training_fraction = 0.8
validation_fraction = 0.1
test_fraction = 0.1

# Set feature transformation config
transform_config = auto_transform_config

# VPC-SC related config. If your project does not enable VPC-SC, you likely
# don't need to change the values below. If your project enables VPC-SC, please
# make sure:
# 1) The VPC is configured properly for Dataflow.
#    Reference: https://cloud.google.com/dataflow/docs/guides/routes-firewall
# 2) Set dataflow_use_public_ips to False.
# 3) Set the correct fully qualified subnetwork name for dataflow_subnetwork.
#    Reference: https://cloud.google.com/dataflow/docs/guides/specifying-networks#example_network_and_subnetwork_specifications

# Specifies whether Dataflow workers use public IP addresses.
dataflow_use_public_ips = True
# Dataflow's fully qualified subnetwork name (e.g., "regions/us-central1/subnetworks/default"), when empty the default subnetwork will be used.
dataflow_subnetwork = ""

### Create and run training pipeline

By default, the [service account](https://cloud.google.com/iam/docs/service-accounts) used for your pipeline run is your [default compute engine service account](https://cloud.google.com/compute/docs/access/service-accounts#default_service_account). However, you might want to run pipelines with permissions to access different roles than those configured for your default SA (e.g. perhaps using a more restricted set of permissions).

User can set custom service account via `service_account` option when triggering a pipeline run with `custom_job.run()` belows.

Please ensure that the service account used to run the pipeline has sufficient permissions to access GCS directory to read/write data, and launch Dataflow jobs.

In [None]:
job_id = str(uuid.uuid4())
root_dir = GCP_BASE_PATH + '/' + job_id

transform_config_path = os.path.join(root_dir, "transform_config.json")
write_text_to_file(json.dumps(transform_config), transform_config_path)

pipeline_definition_path = os.path.join(tempfile.mkdtemp(), 'pipeline.json')
compiler.Compiler().compile(training_pipeline_with_fte, 
                            pipeline_definition_path)
parameter_values = {
    'project': GCP_PROJECT,
    'location': GCP_REGION,
    'root_dir': root_dir,
    'target_column_name': target_column,
    'prediction_type': prediction_type,
    'transform_config_path': transform_config_path,
    'model_type': model_type, 
    'training_fraction': training_fraction,
    'validation_fraction': validation_fraction,
    'test_fraction': test_fraction,
    'csv_filenames': csv_filenames,
    'dataflow_use_public_ips': dataflow_use_public_ips,
    'dataflow_subnetwork': dataflow_subnetwork,
}

custom_job_id = 'feature-transform-engine-with-non-tf-trainer-{}'.format(job_id)
custom_job = aiplatform.PipelineJob(
    display_name=custom_job_id,
    template_path=pipeline_definition_path,
    job_id=custom_job_id,
    pipeline_root=root_dir,
    parameter_values=parameter_values,
    enable_caching=False,
)

custom_job.run()

<a name="online-prediction"></a>
## Test the trained model

### Deploy trained model with FTE-supported prediction server:

In [None]:
model = aiplatform.Model(get_model_name(custom_job_id))
endpoint = model.deploy(machine_type="n1-standard-8")

### Perform online prediction on raw input

Online prediction can be performed on raw input which is specified by a feature map of input columns. The input columns should cover all the input columns as specified in feature transformation configuration. Missing values can be indicated as `"null"`.

In [None]:
# Sample request instances when `auto_transform_config` and all input columns are used:
# instances = [
#   {"id": "1084", "ps_ind_01": "3", "ps_ind_02_cat": "1", "ps_ind_03": "3", "ps_ind_04_cat": "1", "ps_ind_05_cat": "0", "ps_ind_06_bin": "0", "ps_ind_07_bin": "0", "ps_ind_08_bin": "0", "ps_ind_09_bin": "1", "ps_ind_10_bin": "0", "ps_ind_11_bin": "0", "ps_ind_12_bin": "0", "ps_ind_13_bin": "0", "ps_ind_14": "0", "ps_ind_15": "4", "ps_ind_16_bin": "1", "ps_ind_17_bin": "0", "ps_ind_18_bin": "0", "ps_reg_01": "0.1", "ps_reg_02": "0.2", "ps_reg_03": "-1.0", "ps_car_01_cat": "4", "ps_car_02_cat": "1", "ps_car_03_cat": "-1", "ps_car_04_cat": "0", "ps_car_05_cat": "0", "ps_car_06_cat": "14", "ps_car_07_cat": "1", "ps_car_08_cat": "1", "ps_car_09_cat": "0", "ps_car_10_cat": "1", "ps_car_11_cat": "68", "ps_car_11": "3", "ps_car_12": "0.4", "ps_car_13": "0.936407766", "ps_car_14": "0.4074309757", "ps_car_15": "3.4641016150999997", "ps_calc_01": "0.6", "ps_calc_02": "0.1", "ps_calc_03": "0.3", "ps_calc_04": "1", "ps_calc_05": "1", "ps_calc_06": "9", "ps_calc_07": "3", "ps_calc_08": "10", "ps_calc_09": "3", "ps_calc_10": "8", "ps_calc_11": "4", "ps_calc_12": "2", "ps_calc_13": "2", "ps_calc_14": "4", "ps_calc_15_bin": "1", "ps_calc_16_bin": "0", "ps_calc_17_bin": "1", "ps_calc_18_bin": "0", "ps_calc_19_bin": "1", "ps_calc_20_bin": "0"},
#   {"id": "982", "ps_ind_01": "2", "ps_ind_02_cat": "2", "ps_ind_03": "2", "ps_ind_04_cat": "1", "ps_ind_05_cat": "0", "ps_ind_06_bin": "0", "ps_ind_07_bin": "0", "ps_ind_08_bin": "0", "ps_ind_09_bin": "1", "ps_ind_10_bin": "0", "ps_ind_11_bin": "0", "ps_ind_12_bin": "0", "ps_ind_13_bin": "0", "ps_ind_14": "0", "ps_ind_15": "6", "ps_ind_16_bin": "0", "ps_ind_17_bin": "1", "ps_ind_18_bin": "0", "ps_reg_01": "0.9", "ps_reg_02": "0.7", "ps_reg_03": "0.9013878189", "ps_car_01_cat": "6", "ps_car_02_cat": "1", "ps_car_03_cat": "-1", "ps_car_04_cat": "0", "ps_car_05_cat": "0", "ps_car_06_cat": "15", "ps_car_07_cat": "1", "ps_car_08_cat": "0", "ps_car_09_cat": "0", "ps_car_10_cat": "1", "ps_car_11_cat": "97", "ps_car_11": "2", "ps_car_12": "0.3605551275", "ps_car_13": "0.8865151386", "ps_car_14": "-1.0", "ps_car_15": "3.6055512755000003", "ps_calc_01": "0.4", "ps_calc_02": "0.7", "ps_calc_03": "0.5", "ps_calc_04": "1", "ps_calc_05": "1", "ps_calc_06": "9", "ps_calc_07": "4", "ps_calc_08": "8", "ps_calc_09": "2", "ps_calc_10": "7", "ps_calc_11": "5", "ps_calc_12": "0", "ps_calc_13": "5", "ps_calc_14": "8", "ps_calc_15_bin": "0", "ps_calc_16_bin": "1", "ps_calc_17_bin": "0", "ps_calc_18_bin": "0", "ps_calc_19_bin": "0", "ps_calc_20_bin": "0"},
#   {"id": "753", "ps_ind_01": "1", "ps_ind_02_cat": "1", "ps_ind_03": "5", "ps_ind_04_cat": "0", "ps_ind_05_cat": "0", "ps_ind_06_bin": "1", "ps_ind_07_bin": "0", "ps_ind_08_bin": "0", "ps_ind_09_bin": "0", "ps_ind_10_bin": "0", "ps_ind_11_bin": "0", "ps_ind_12_bin": "0", "ps_ind_13_bin": "0", "ps_ind_14": "0", "ps_ind_15": "5", "ps_ind_16_bin": "1", "ps_ind_17_bin": "0", "ps_ind_18_bin": "0", "ps_reg_01": "0.5", "ps_reg_02": "0.3", "ps_reg_03": "0.7088723439", "ps_car_01_cat": "7", "ps_car_02_cat": "1", "ps_car_03_cat": "-1", "ps_car_04_cat": "0", "ps_car_05_cat": "-1", "ps_car_06_cat": "0", "ps_car_07_cat": "1", "ps_car_08_cat": "1", "ps_car_09_cat": "2", "ps_car_10_cat": "1", "ps_car_11_cat": "32", "ps_car_11": "3", "ps_car_12": "0.316227766", "ps_car_13": "0.6698649179", "ps_car_14": "0.3615245497", "ps_car_15": "3.3166247904", "ps_calc_01": "0.9", "ps_calc_02": "0.6", "ps_calc_03": "0.2", "ps_calc_04": "2", "ps_calc_05": "3", "ps_calc_06": "9", "ps_calc_07": "3", "ps_calc_08": "10", "ps_calc_09": "3", "ps_calc_10": "12", "ps_calc_11": "5", "ps_calc_12": "3", "ps_calc_13": "2", "ps_calc_14": "6", "ps_calc_15_bin": "0", "ps_calc_16_bin": "1", "ps_calc_17_bin": "1", "ps_calc_18_bin": "1", "ps_calc_19_bin": "0", "ps_calc_20_bin": "0"}
# ]

# Sample request instances when other feature transform configurations and a subset of input columns are used:
instances=[
    {"ps_reg_01": "0.5", "ps_reg_02": "0.5", "ps_reg_03": "0.6", "ps_ind_10_bin": "1", "ps_ind_11_bin": "1", "ps_ind_12_bin": "0"},
    {"ps_reg_01": "0.5", "ps_reg_02": "0.5", "ps_reg_03": "0.6", "ps_ind_10_bin": "1", "ps_ind_11_bin": "1", "ps_ind_12_bin": "null"},
    {"ps_reg_01": "null", "ps_reg_02": "0.5", "ps_reg_03": "0.6", "ps_ind_10_bin": "1", "ps_ind_11_bin": "1", "ps_ind_12_bin": "0"}
]

endpoint.predict(instances=instances)

### Perform batch prediction

In [None]:
write_instances_to_jsonl(instances, os.path.join(root_dir, "bp_input.jsonl"))

batch_predict_job = model.batch_predict(
    job_display_name="bp_" + custom_job_id,
    gcs_source=os.path.join(root_dir, "bp_input.jsonl"),
    gcs_destination_prefix=root_dir,
    instances_format="jsonl",
    predictions_format="jsonl",
    model_parameters=None,
    machine_type="n1-standard-8",
    starting_replica_count=1,
    max_replica_count=1,
    sync=False,
)

batch_predict_job.wait()

## Clean-up resources

Clean up all Google Cloud resources used in this notebook

In [None]:
endpoint.undeploy_all()
endpoint.delete()
model.delete()