# Model Serving with KFServing - Mnist
---

*INPUT --> TRANSFORMER --> ENRICHED INPUT --> MODEL --> PREDICTION*

<font color='red'><h3>This notebook requires KFServing</h3></font>

> **NOTE:** It is assumed that a model called *mnist* is already available in Hopsworks. An example of training a model for the *MNIST handwritten digit classification problem* is available in `Jupyter/experiment/Tensorflow/mnist.ipynb`

## Model Serving on [Hopsworks](https://github.com/logicalclocks/hopsworks)

![hops.png](../../../images/hops.png)

### The `hops` python library

`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on <a href="https://github.com/logicalclocks/hops-util-py">github</a>.

## Check Model Repository for best model based on accuracy

![Image7-Monitor.png](../../../images/models.gif)

## Query Model Repository for best mnist Model

In [1]:
from hops import model
from hops.model import Metric
MODEL_NAME="mnist"
EVALUATION_METRIC="accuracy"

In [2]:
best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX)

In [3]:
print('Model name: ' + best_model['name'])
print('Model version: ' + str(best_model['version']))
print(best_model['metrics'])

Model name: mnist
Model version: 1
{'accuracy': '0.65625'}


## Serve the Trained Model with a Transformer

To serve a model with a transformer, write a python script that implements the `Transformer` class and the methods `preprocess` and `postprocess`, like this:

```python
class Transformer(object):
    def __init__(self):
        print("[Transformer] Initializing...")
        # Initialization code goes here

    def preprocess(self, inputs):
        print("[Transformer] Preprocessing...")
        # Transform the requests inputs here. The object returned by this method will be used as model input.
        return inputs

    def postprocess(self, outputs):
        print("[Transformer] Postprocessing...")
        # Transform the predictions computed by the model before returning a response.
        return outputs
```


In [4]:
from hops import serving
from hops import hdfs

In [5]:
# Create serving instance
SERVING_NAME = MODEL_NAME
MODEL_PATH="/Models/" + best_model['name']
TRANSFORMER_PATH=hdfs.project_path() + "/Jupyter/serving/kfserving/tensorflow/transformer.py"

response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                                    MODEL_PATH, model_version=best_model['version'], # set the path and version of the model to be deployed
                                    kfserving=True, # whether to serve the model using KFServing or the default tool in the current Hopsworks version
                                    topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
                                    inference_logging="ALL", # with KFServing, select the type of inference data to log into Kafka, e.g MODEL_INPUTS, PREDICTIONS or ALL
                                    transformer=TRANSFORMER_PATH, 
                                    instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    transformer_instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    )

Inferring model server from artifact files: TENSORFLOW_SERVING
Creating serving mnist for artifact /Projects/demo_ml_meb10000//Models/mnist ...
Serving mnist successfully created


In [6]:
# List all available servings in the project
for s in serving.get_all():
    print(s.name)

mnist


In [7]:
# Get serving status
serving.get_status(SERVING_NAME)

'Stopped'

## Check Model Serving for active servings

![Image7-Monitor.png](../../../images/servings.gif)

## Start Model Serving Server

In [8]:
if serving.get_status(SERVING_NAME) == 'Stopped':
    serving.start(SERVING_NAME)

Starting serving with name: mnist...
Serving with name: mnist successfully started


In [9]:
import time
while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(5)

## Send Prediction Requests to the Served Model using Hopsworks REST API

In [10]:
import numpy as np
TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)
NUM_FEATURES=784

In [11]:
import json
for i in range(10):
    data = {
                "signature_name": "serving_default", "instances": [np.random.rand(NUM_FEATURES).tolist()]
            }
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)

{'predictions': [[0.0147801526, 0.00866303, 0.16441761, 0.0283872709, 0.316972435, 0.0116910841, 0.272938251, 0.0568961799, 0.0464549698, 0.0787989348]]}
{'predictions': [[0.0147954272, 0.0219566841, 0.121858038, 0.0260763094, 0.37078473, 0.0191048179, 0.187556162, 0.0905136541, 0.0653297752, 0.0820243508]]}
{'predictions': [[0.041775696, 0.0125995874, 0.151431233, 0.0301594064, 0.231847495, 0.0199446511, 0.218778029, 0.034365721, 0.0357867628, 0.22331138]]}
{'predictions': [[0.0118000535, 0.00634807, 0.0942876, 0.0280166529, 0.345347196, 0.0147242304, 0.290315151, 0.0584374703, 0.0406196378, 0.110103831]]}
{'predictions': [[0.0168230962, 0.0196851324, 0.136483476, 0.0223759525, 0.314222127, 0.0126352217, 0.195060492, 0.0946132, 0.101502478, 0.0865987837]]}
{'predictions': [[0.012531314, 0.00801518187, 0.0621500127, 0.0274804309, 0.409992367, 0.0117388759, 0.214463159, 0.10666319, 0.0539339446, 0.093031548]]}
{'predictions': [[0.0227185059, 0.00945187267, 0.103138261, 0.0240464956, 0.4

## Monitor Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model's performance and its predictions in a scalable manner.

In [12]:
from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup Kafka Consumer and Subscribe to the Topic containing the Inference Logs

In [13]:
config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read Kafka Avro Schema From Hopsworks and setup an Avro Reader

In [14]:
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read 10 Messages from the Kafka Topic, parse them with the Avro Schema and print the results

In [15]:
print_instances=False
print_predictions=True

for i in range(0, 10):
    msg = consumer.poll(timeout=1)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)  
            payload = json.loads(event_dict["payload"])
            
            if (event_dict['messageType'] == "request" and not print_instances) or \
                (event_dict['messageType'] == "response" and not print_predictions):
                continue
            
            print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
                  "requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
                       event_dict["servingId"],
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceId"],
                       event_dict["messageType"]))

            if event_dict['messageType'] == "request":
                print("Instances -> {}\n".format(payload['instances']))
                
            if event_dict['messageType'] == "response":
                print("Predictions -> {}\n".format(payload['predictions']))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")

timeout.. no more messages to read from topic
timeout.. no more messages to read from topic
INFO -> servingId: 8, modelName: mnist, modelVersion: 1,requestTimestamp: 1623685825, inferenceId:cdb85932-d69f-41a8-a34e-83ce7ec732b4, messageType:response
Predictions -> [[0.0147801526, 0.00866303, 0.16441761, 0.0283872709, 0.316972435, 0.0116910841, 0.272938251, 0.0568961799, 0.0464549698, 0.0787989348]]

INFO -> servingId: 8, modelName: mnist, modelVersion: 1,requestTimestamp: 1623685826, inferenceId:ec5c74d4-c1f6-42c2-a5c7-7dfcb55d3e64, messageType:response
Predictions -> [[0.0147954272, 0.0219566841, 0.121858038, 0.0260763094, 0.37078473, 0.0191048179, 0.187556162, 0.0905136541, 0.0653297752, 0.0820243508]]

INFO -> servingId: 8, modelName: mnist, modelVersion: 1,requestTimestamp: 1623685826, inferenceId:de054dd8-5518-4235-8d78-30718e5d9ad8, messageType:response
Predictions -> [[0.041775696, 0.0125995874, 0.151431233, 0.0301594064, 0.231847495, 0.0199446511, 0.218778029, 0.034365721, 0.035