Skip to content

Latest commit

 

History

History
423 lines (311 loc) · 14 KB

model_file_customization.rst

File metadata and controls

423 lines (311 loc) · 14 KB

Customizing the Model

Customize score.py

Here is an example for preparing a model artifact for TensorFlow model which is trained on the minsit dataset. The final layer in the model produces 10 values corresponding to each digit. The default score.py will produce an array of 10 elements for each input vector. Suppose you want to change the default behavior of predict fucntion in score.py to return most likely digit instead of returning a probablity distribution over all the digits. To do so we can return the position corresponding to the maximum value within the output array. Here are the steps to customize the score.py -

Step1: Train your estimator and then generate the Model artifact as shown below -

from ads.catalog.model import ModelCatalog
from ads.model.framework.tensorflow_model import TensorFlowModel
import tempfile
import tensorflow as tf
from ads.common.model_metadata import UseCaseType

mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

tf_estimator = tf.keras.models.Sequential(
        [
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(10),
        ]
    )
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
tf_estimator.compile(optimizer="adam", loss=loss_fn, metrics=["accuracy"])
tf_estimator.fit(x_train, y_train, epochs=1)

tf_model = TensorFlowModel(tf_estimator, artifact_dir=tempfile.mkdtemp())

# Autogenerate score.py, pickled model, runtime.yaml, input_schema.json and output_schema.json
tf_model.prepare(inference_conda_env="generalml_p38_cpu_v1",
                    use_case_type=UseCaseType.MULTINOMIAL_CLASSIFICATION,
                    X_sample=trainx,
                    y_sample=trainy
                )

Verify the output produced by the autogenerated score.py by calling verify on Model object.

print(tensorflow_model.verify(testx[:3])['prediction'])

[[-2.9461750984191895, -5.293642997741699, 0.4030594229698181, 3.0270071029663086, -6.470805644989014, -2.07453989982605, -9.646402359008789, 9.256569862365723, -2.6433541774749756, -0.8167083263397217], [-3.4297854900360107, 2.4863781929016113, 8.968724250793457, 3.162344217300415, -11.153030395507812, 0.15335027873516083, -0.5451826453208923, -7.817524433135986, -1.0585914850234985, -10.736929893493652], [-4.420501232147217, 5.841022491455078, -0.17066864669322968, -1.0071465969085693, -2.261953592300415, -3.0983355045318604, -2.0874621868133545, 1.0745809078216553, -1.2511857748031616, -2.273810625076294]]

The default score.py in tf_model.artifact_dir location is -

import os
import sys
from functools import lru_cache
import pandas as pd
import numpy as np
import json
import tensorflow as tf
from io import BytesIO
import base64

model_name = 'model.h5'


"""
Inference script. This script is used for prediction by scoring server when schema is known.
"""


@lru_cache(maxsize=10)
def load_model(model_file_name=model_name):
    """
    Loads model from the serialized format

    Returns
    -------
    model:  a model instance on which predict API can be invoked
    """
    model_dir = os.path.dirname(os.path.realpath(__file__))
    if model_dir not in sys.path:
        sys.path.insert(0, model_dir)
    contents = os.listdir(model_dir)
    if model_file_name in contents:
        print(f'Start loading {model_file_name} from model directory {model_dir} ...')
        loaded_model = tf.keras.models.load_model(os.path.join(model_dir, model_file_name))

        print("Model is successfully loaded.")
        return loaded_model
    else:
        raise Exception(f'{model_file_name} is not found in model directory {model_dir}')


@lru_cache(maxsize=1)
def fetch_data_type_from_schema(input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
    """
    Returns data type information fetch from input_schema.json.

    Parameters
    ----------
    input_schema_path: path of input schema.

    Returns
    -------
    data_type: data type fetch from input_schema.json.

    """
    data_type = {}
    if os.path.exists(input_schema_path):
        schema = json.load(open(input_schema_path))
        for col in schema['schema']:
            data_type[col['name']] = col['dtype']
    else:
        print("input_schema has to be passed in in order to recover the same data type. pass `X_sample` in `ads.model.framework.tensorflow_model.TensorFlowModel.prepare` function to generate the input_schema. Otherwise, the data type might be changed after serialization/deserialization.")
    return data_type


def deserialize(data, input_schema_path):
    """
    Deserialize json-serialized data to data in original type when sent to
predict.

    Parameters
    ----------
    data: serialized input data.
    input_schema_path: path of input schema.

    Returns
    -------
    data: deserialized input data.

    """
    data_type = data.get('data_type', '')
    json_data = data.get('data', data)

    if "numpy.ndarray" in data_type:
        load_bytes = BytesIO(base64.b64decode(json_data.encode('utf-8')))
        return np.load(load_bytes, allow_pickle=True)
    if "pandas.core.series.Series" in data_type:
        return pd.Series(json_data)
    if "pandas.core.frame.DataFrame" in data_type:
        return pd.read_json(json_data, dtype=fetch_data_type_from_schema(input_schema_path))
    if "tensorflow.python.framework.ops.EagerTensor" in data_type:
        load_bytes = BytesIO(base64.b64decode(json_data.encode('utf-8')))
        return tf.convert_to_tensor(np.load(load_bytes, allow_pickle=True))

    return json_data

def pre_inference(data, input_schema_path):
    """
    Preprocess json-serialized data to feed into predict function.

    Parameters
    ----------
    data: Data format as expected by the predict API of the core estimator.
    input_schema_path: path of input schema.

    Returns
    -------
    data: Data format after any processing.
    """
    data = deserialize(data, input_schema_path)

    # Add further data preprocessing if needed
    return data

def post_inference(yhat):
    """
    Post-process the model results.

    Parameters
    ----------
    yhat: Data format after calling model.predict.

    Returns
    -------
    yhat: Data format after any processing.

    """

    return yhat.numpy().tolist()

def predict(data, model=load_model(), input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
    """
    Returns prediction given the model and data to predict.

    Parameters
    ----------
    model: Model instance returned by load_model API
    data: Data format as expected by the predict API of the core estimator.
    input_schema_path: path of input schema.

    Returns
    -------
    predictions: Output from scoring server
        Format: {'prediction': output from model.predict method}

    """
    inputs = pre_inference(data, input_schema_path)

    yhat = post_inference(
        model(inputs)
    )
    return {'prediction': yhat}

Step 2: Update post_inference method in score.py to find the index corresponding the maximum value and return. We can use argmax function from tensorflow to achieve that. Here is the modified code -

import os
import sys
from functools import lru_cache
import pandas as pd
import numpy as np
import json
import tensorflow as tf
from io import BytesIO
import base64

model_name = 'model.h5'


"""
Inference script. This script is used for prediction by scoring server when schema is known.
"""


@lru_cache(maxsize=10)
def load_model(model_file_name=model_name):
    """
    Loads model from the serialized format

    Returns
    -------
    model:  a model instance on which predict API can be invoked
    """
    model_dir = os.path.dirname(os.path.realpath(__file__))
    if model_dir not in sys.path:
        sys.path.insert(0, model_dir)
    contents = os.listdir(model_dir)
    if model_file_name in contents:
        print(f'Start loading {model_file_name} from model directory {model_dir} ...')
        loaded_model = tf.keras.models.load_model(os.path.join(model_dir, model_file_name))

        print("Model is successfully loaded.")
        return loaded_model
    else:
        raise Exception(f'{model_file_name} is not found in model directory {model_dir}')


@lru_cache(maxsize=1)
def fetch_data_type_from_schema(input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
    """
    Returns data type information fetch from input_schema.json.

    Parameters
    ----------
    input_schema_path: path of input schema.

    Returns
    -------
    data_type: data type fetch from input_schema.json.

    """
    data_type = {}
    if os.path.exists(input_schema_path):
        schema = json.load(open(input_schema_path))
        for col in schema['schema']:
            data_type[col['name']] = col['dtype']
    else:
        print("input_schema has to be passed in in order to recover the same data type. pass `X_sample` in `ads.model.framework.tensorflow_model.TensorFlowModel.prepare` function to generate the input_schema. Otherwise, the data type might be changed after serialization/deserialization.")
    return data_type


def deserialize(data, input_schema_path):
    """
    Deserialize json-serialized data to data in original type when sent to
predict.

    Parameters
    ----------
    data: serialized input data.
    input_schema_path: path of input schema.

    Returns
    -------
    data: deserialized input data.

    """
    data_type = data.get('data_type', '')
    json_data = data.get('data', data)

    if "numpy.ndarray" in data_type:
        load_bytes = BytesIO(base64.b64decode(json_data.encode('utf-8')))
        return np.load(load_bytes, allow_pickle=True)
    if "pandas.core.series.Series" in data_type:
        return pd.Series(json_data)
    if "pandas.core.frame.DataFrame" in data_type:
        return pd.read_json(json_data, dtype=fetch_data_type_from_schema(input_schema_path))
    if "tensorflow.python.framework.ops.EagerTensor" in data_type:
        load_bytes = BytesIO(base64.b64decode(json_data.encode('utf-8')))
        return tf.convert_to_tensor(np.load(load_bytes, allow_pickle=True))

    return json_data

def pre_inference(data, input_schema_path):
    """
    Preprocess json-serialized data to feed into predict function.

    Parameters
    ----------
    data: Data format as expected by the predict API of the core estimator.
    input_schema_path: path of input schema.

    Returns
    -------
    data: Data format after any processing.
    """
    data = deserialize(data, input_schema_path)

    # Add further data preprocessing if needed
    return data

def post_inference(yhat):
    """
    Post-process the model results.

    Parameters
    ----------
    yhat: Data format after calling model.predict.

    Returns
    -------
    yhat: Data format after any processing.

    """
    yhat = tf.argmax(yhat, axis=1) # Get the index of the max value
    return yhat.numpy().tolist()

def predict(data, model=load_model(), input_schema_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_schema.json")):
    """
    Returns prediction given the model and data to predict.

    Parameters
    ----------
    model: Model instance returned by load_model API
    data: Data format as expected by the predict API of the core estimator.
    input_schema_path: path of input schema.

    Returns
    -------
    predictions: Output from scoring server
        Format: {'prediction': output from model.predict method}

    """
    inputs = pre_inference(data, input_schema_path)

    yhat = post_inference(
        model(inputs)
    )
    return {'prediction': yhat}

Step 3: Verify the changes

print(tensorflow_model.verify(testx[:3])['prediction'])

Start loading model.h5 from model directory /tmp/tmppkco6xrt ... Model is successfully loaded. [7, 2, 1]

Step 4: Register the model

model_id = tensorflow_model.save()

Step 5: Deploy and generate the endpoint

>>> # Deploy and create an endpoint for the TensorFlow model
>>> tensorflow_model.deploy(
        display_name="TensorFlow Model For Classification",
         deployment_log_group_id = "ocid1.loggroup.oc1.xxx.xxxxx",
         deployment_access_log_id = "ocid1.log.oc1.xxx.xxxxx",
         deployment_predict_log_id = "ocid1.log.oc1.xxx.xxxxx"
    )
>>> print(f"Endpoint: {tensorflow_model.model_deployment.url}")
https://modeldeployment.{region}.oci.customer-oci.com/ocid1.datasciencemodeldeployment.oc1.xxx.xxxxx

Step 6: Run prediction from the endpoint

print(tensorflow_model.predict(testx[:3])['prediction'])

[7, 2, 1]