# Design Pattern 17 - Batch Serving(Chapter 5)

## Introduction to Design Pattern

The previous pattern (16 Stateless Function), looked at serving a model through a stateless function. The aim was to quickly (low-latency) serve realtime predictions from a machine learning model. Many applications required one prediction as quickly as possible, for example:
* credit card fraud detection
* medical diagnosis
* facial recognition for authentication
The stateless function can then seamlessly use web/cloud infrastrcuture to scale up to serving millions of customers simultaneously.

There are many application where we don't want or need to serve one result at a time as quickly as possible. Instead we need to do thouands or millions of predictions in large batches, sometimes according to a schedule. Examples of this include
* personalised playlist on music app - can be created for each user in a batch
* product recommendation - can be updated once a day/week based on recent trasactions, and then cached.
* weather forecasts?




## Key Features of solution
* Run queries in a distributed compute and data space. Examples could include:
  * Google Big Query. -see notebook example below
  * REDIS AI - 
  * dask-ml - 
  * ray batch -  https://docs.ray.io/en/latest/data/batch_inference.html#batch-inference-home 

key features

challenges and when not to use

In [None]:
import numpy as np
from sklearn.datasets import load_wine
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import load_model

# Load the Wine dataset
wine = load_wine()
X = wine.data
y = wine.target

# Standardize the features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Load the saved model
loaded_model = load_model("final_wine_model.h5")

# Perform inference on the dataset
predictions = loaded_model.predict(X_scaled)

# Convert predictions to class labels
predicted_classes = np.argmax(predictions, axis=1)

# Compare predicted classes with ground truth labels
correct_predictions = np.sum(predicted_classes == y)
accuracy = correct_predictions / len(y) * 100

print(f"Accuracy on the Wine dataset: {accuracy:.2f}%")


## Example python implementation

Example Google Cloud notebook: https://github.com/GoogleCloudPlatform/ml-design-patterns/blob/master/05_resilience/batch_serving.ipynb

The main problem is that the example is very Google Cloud centric, which not particularly 

## Create sample model

In [1]:
## Create sample model
import numpy as np
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import ModelCheckpoint

# Load the Wine dataset
wine = load_wine()
X = wine.data
y = wine.target

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Standardize the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Build the neural network model
model = Sequential([
    Dense(64, activation='relu', input_shape=(X_train_scaled.shape[1],)),
    Dense(32, activation='relu'),
    Dense(3, activation='softmax')
])

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Define a ModelCheckpoint callback to save the best model during training
checkpoint_callback = ModelCheckpoint("wine_model.h5", save_best_only=True, save_weights_only=False)

# Train the model
history = model.fit(X_train_scaled, y_train, epochs=50, batch_size=32, validation_split=0.2, callbacks=[checkpoint_callback])

# Evaluate the model on the test set
loss, accuracy = model.evaluate(X_test_scaled, y_test)
print("Test loss:", loss)
print("Test accuracy:", accuracy)

# Save the trained model to disk
model.save("final_wine_model.h5")

print("Model saved to disk.")



2023-08-14 13:05:33.152665: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-08-14 13:05:33.963445: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
Test loss: 0.017857268452644348
Test accuracy: 1.0
Model saved to disk.


In [None]:
import numpy as np
from sklearn.datasets import load_wine
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import load_model

# Load the Wine dataset
wine = load_wine()
X = wine.data
y = wine.target

# Standardize the features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Load the saved model
loaded_model = load_model("final_wine_model.h5")

# Perform inference on the dataset
predictions = loaded_model.predict(X_scaled)

# Convert predictions to class labels
predicted_classes = np.argmax(predictions, axis=1)

# Compare predicted classes with ground truth labels
correct_predictions = np.sum(predicted_classes == y)
accuracy = correct_predictions / len(y) * 100

print(f"Accuracy on the Wine dataset: {accuracy:.2f}%")


In [None]:
from typing import Dict
import numpy as np

import ray

# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.read_csv('wine_quality.csv')

# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class TFPredictor:
    def __init__(self):
        from tensorflow import keras
        from tensorflow.keras.models import load_model


        # Load a dummy neural network.
        # Set `self.model` to your pre-trained Keras model.
        input_layer = keras.Input(shape=(100,))
        output_layer = keras.layers.Dense(1, activation="sigmoid")
        self.model = keras.Sequential([input_layer, output_layer])

    # Logic for inference on 1 batch of data.
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        # Get the predictions from the input batch.
        return {"output": self.model(batch["data"]).numpy()}

# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(TFPredictor, compute=scale)
 # Step 4: Show one prediction output.
predictions.show(limit=1)

## Real world examples


Try to include some actual/possible examples of where this DP could be used in a weather and climate context.