# Model Deployment on Ray Serve and Feast

![Ray Serve and Feast](images/joint_logo.png)

Welcome to this tutorial on model deployment on Ray Serve and Feast! This tutorial will show you how to

1. Deploy your ML model on Ray Serve.
2. Connect to your Feast feature store through a Ray Serve deployment.
3. Separate your I/O-heavy business logic from your compute-heavy model inference for maximum scalability.

If you have any questions about this tutorial, or any follow-up questions, please feel free to ask them in the [Ray discussion forum](https://discuss.ray.io/c/ray-serve/6) or the [Ray Slack](https://forms.gle/9TSdDYUgxYs8SA9e8).

# 1. Landscape of ML Serving Tools

Where does Ray Serve fit in the landscape of machine learning serving tools? 

Generally speaking, there's a spectrum of ML serving tools:
- People typically start with either framework-specific servers (e.g. TFServing, TorchServe) or generic web-serving frameworks (e.g. Flask, FastAPI) as an easy start to deploy a single model. 
- For more "production-readiness", people add custom toolings (e.g. Docker, K8s, Golang-based microservices). 
- But it's tough to maintain an ad-hoc pathwork of systems that are glued together. People are starting to look for special-purpose deployment tools (e.g. KubeFlow, KServe, Triton, etc.) to manage and deploy many models in production. 

In this spectrum of tools, users face tradeoffs between ease-of-development and production scalability. However, **Ray Serve lets you easily develop locally and then transparently scale to production**, giving you the best of both worlds!

![Serve aims at both ease of development and ready for production.](images/serve_position.svg)


# 2. Feature Stores

Features stores are systems that store your machine learning models' input data, also known as "features." Managing data pipelines for these features can be challenging. Machine learning engineers and data scientists can integrate feature stores like Feast into their pipelines to aggregate, organize, store, and serve features. This can be especially helpful when considering that features may need to be collected in real time and stored in a way that makes them useful for both online inference and offline training.

# 3. Example Model

In this tutorial you will deploy a machine learning model locally via Ray Serve. The model will also access features from a Feast feature store.

Let’s first take a look at how the model works, without using Ray Serve. Here's the the model and its supporting code:

In [None]:
# First, we specify metadata like the model checkpoint's filename
# and the features we'll use in the model.

MODEL_FILENAME = "model.pkl"
ENCODER_FILENAME = "encoder.bin"

categorical_features = [
    "person_home_ownership",
    "loan_intent",
    "city",
    "state",
    "location_type",
]

feast_features = [
    "zipcode_features:city",
    "zipcode_features:state",
    "zipcode_features:location_type",
    "zipcode_features:tax_returns_filed",
    "zipcode_features:population",
    "zipcode_features:total_wages",
    "credit_history:credit_card_due",
    "credit_history:mortgage_due",
    "credit_history:student_loan_due",
    "credit_history:vehicle_loan_due",
    "credit_history:hard_pulls",
    "credit_history:missed_payments_2y",
    "credit_history:missed_payments_1y",
    "credit_history:missed_payments_6m",
    "credit_history:bankruptcies",
]

In [None]:
import os
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb

import feast

from ray.ml.constants import MODEL_KEY as RAY_MODEL_KEY
from ray.ml.checkpoint import Checkpoint as RayCheckpoint

class CreditScoringModel:

    def __init__(self):
        # Load model
        print("Loading saved model...\n")
        self.checkpoint = RayCheckpoint(MODEL_FILENAME)

         # Load ordinal encoder
        self.encoder = joblib.load(ENCODER_FILENAME)

        # Set up Feast feature store
        self.fs = feast.FeatureStore(repo_path="feature_repo")

    def predict(self, model_request):
        # Preprocess the request
        features_array = self.preprocess(model_request)
        
        # Make prediction
        xgb_model = self._load_from_checkpoint(self.checkpoint)
        prediction = xgb_model.inplace_predict(features_array)

        # Return result of credit scoring
        return np.round(prediction)
    
    def preprocess(self, model_request):
        # Get online features from Feast
        feature_vector = self._get_online_features_from_feast(model_request)

        # Join features to request features
        features = model_request.copy()
        features.update(feature_vector)
        features_df = pd.DataFrame.from_dict(features)

        # Apply ordinal encoding to categorical features
        self._apply_ordinal_encoding(features_df)

        # Sort columns
        features_df = features_df.reindex(sorted(features_df.columns), axis=1)

        # Drop unnecessary columns
        features_df = features_df[features_df.columns.drop("zipcode").drop("dob_ssn")]
        print("================================")
        print(f"Inference request:")
        print("--------------------------------")
        print(features_df.T)
        print("================================\n")
        return np.array(features_df)
    
    def _apply_ordinal_encoding(self, requests):
        requests[categorical_features] = self.encoder.transform(
            requests[categorical_features]
        )
    
    def _get_online_features_from_feast(self, request):
        zipcode = request["zipcode"][0]
        dob_ssn = request["dob_ssn"][0]

        return self.fs.get_online_features(
            entity_rows=[{"zipcode": zipcode, "dob_ssn": dob_ssn}],
            features=feast_features,
        ).to_dict()
    
    def _load_from_checkpoint(self, checkpoint: RayCheckpoint):
        checkpoint_path = checkpoint.to_directory()
        xgb_model = xgb.Booster()
        xgb_model.load_model(os.path.join(checkpoint_path, RAY_MODEL_KEY))
        return xgb_model

model = CreditScoringModel()

The `CreditScoringModel` recommends loan approval or rejection using features that include info such as income and interest rates. During initialization, it loads a pre-trained model using the Ray checkpoint stored at `MODEL_FILENAME`. When it receives an inference request, it combines the request data with online features that it retrieves from a Feast feature store. Then, it runs its `predict()` method, which preprocesses the request and then runs inference on it. Based on the result, the code prints whether a loan should be approved or rejected. We can run a sample request to verify that the model works correctly locally. This request should print `Loan rejected!`:

In [None]:
loan_request = {
    "zipcode": [76104],
    "dob_ssn": ["19630621_4278"],
    "person_age": [133],
    "person_income": [59000],
    "person_home_ownership": ["RENT"],
    "person_emp_length": [123.0],
    "loan_intent": ["PERSONAL"],
    "loan_amnt": [35000],
    "loan_int_rate": [16.02],
}

result = model.predict(loan_request)[0]
print(f"Inference result: {result}\n")

if result == 0:
    print("Loan approved!")
elif result == 1:
    print("Loan rejected!")

# 4. Converting to a Ray Serve Deployment 


This tutorial’s goal is to deploy this model using Ray Serve, so it can be scaled up and queried over HTTP. We’ll start by converting the `CreditScoringModel` Python class into a Ray Serve deployment that can be launched locally on a laptop:

In [None]:
import os
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb

import feast

import ray
from ray import serve

from ray.ml.constants import MODEL_KEY as RAY_MODEL_KEY
from ray.ml.checkpoint import Checkpoint as RayCheckpoint


@serve.deployment
class CreditScoringModel:

    def __init__(self):
        # Load model
        print("Loading saved model...\n")
        self.checkpoint = RayCheckpoint(MODEL_FILENAME)

         # Load ordinal encoder
        self.encoder = joblib.load(ENCODER_FILENAME)

        # Set up Feast feature store
        self.fs = feast.FeatureStore(repo_path="feature_repo")
    
    async def __call__(self, http_request):
        body = await http_request.json()
        return self.predict(body)

    def predict(self, model_request):
        # Preprocess the request
        features_array = self.preprocess(model_request)
        
        # Make prediction
        xgb_model = self._load_from_checkpoint(self.checkpoint)
        prediction = xgb_model.inplace_predict(features_array)

        # Return result of credit scoring
        return np.round(prediction)
    
    def preprocess(self, model_request):
        # Get online features from Feast
        feature_vector = self._get_online_features_from_feast(model_request)

        # Join features to request features
        features = model_request.copy()
        features.update(feature_vector)
        features_df = pd.DataFrame.from_dict(features)

        # Apply ordinal encoding to categorical features
        self._apply_ordinal_encoding(features_df)

        # Sort columns
        features_df = features_df.reindex(sorted(features_df.columns), axis=1)

        # Drop unnecessary columns
        features_df = features_df[features_df.columns.drop("zipcode").drop("dob_ssn")]
        print("================================")
        print(f"Inference request:")
        print("--------------------------------")
        print(features_df.T)
        print("================================\n")
        return np.array(features_df)
    
    def _apply_ordinal_encoding(self, requests):
        requests[categorical_features] = self.encoder.transform(
            requests[categorical_features]
        )
    
    def _get_online_features_from_feast(self, request):
        zipcode = request["zipcode"][0]
        dob_ssn = request["dob_ssn"][0]

        return self.fs.get_online_features(
            entity_rows=[{"zipcode": zipcode, "dob_ssn": dob_ssn}],
            features=feast_features,
        ).to_dict()
    
    def _load_from_checkpoint(self, checkpoint: RayCheckpoint):
        checkpoint_path = checkpoint.to_directory()
        xgb_model = xgb.Booster()
        xgb_model.load_model(os.path.join(checkpoint_path, RAY_MODEL_KEY))
        return xgb_model

Notice that there were minimal code changes to convert this `CreditScoringModel` into a Ray Serve deployment. In fact, only two changes were made:

1. We added the `@serve.deployment` decorator, which converts the class into a Ray Serve `Deployment` object. This object can later be deployed using the `deploy()` method.
2. Then, we added a `__call__()` method. This is the method that will receive HTTP requests to our model. It unpacks the request's JSON body, and it calls the `CreditScoringModel`'s `predict()` method, which has no code changes.

Next, we can start a local Ray cluster (with `ray.init()`), and a Serve application (with `serve.start()`) on top of it. After that, we can deploy `CreditScoringModel`. First we make sure to shutdown any lingering Ray cluster that may already exist in the background from previous runs.

In [None]:
ray.init(runtime_env={"working_dir": "."})
serve.start()


CreditScoringModel.deploy()

**Note:** the `runtime_env={"working_dir": "."}` parameter that's passed into `ray.init()` packages this notebook's directory and uploads it to the Ray cluster. This allows `CreditScoringModel` to access the checkpoint files stored in this directory.

# 5. Testing the Ray Serve Deployment


Now that the `CreditScoringModel` is running in the Ray cluster as a Ray Serve deployment, we can query it over HTTP. We can use the Python `requests` library to send a POST request to the deployment, containing the same `loan_request` from our local test. We should again expect this program to print `Loan rejected!`.

In [None]:
import requests

loan_request = {
    "zipcode": [76104],
    "dob_ssn": ["19630621_4278"],
    "person_age": [133],
    "person_income": [59000],
    "person_home_ownership": ["RENT"],
    "person_emp_length": [123.0],
    "loan_intent": ["PERSONAL"],
    "loan_amnt": [35000],
    "loan_int_rate": [16.02],
}

response = requests.post("http://localhost:8000/CreditScoringModel", json=loan_request)
result = response.json()[0]
print(f"Inference result: {result}\n")

if result == 0:
    print("Loan approved!\n")
elif result == 1:
    print("Loan rejected!\n")

# 6. Increasing Scalability: Splitting Business Logic and ML Inference


Ray Serve allows you to easily scale your deployments using the `num_replicas` paramter. This parameter can be set in the `@serve.deployment` decorator, and it controls how many copies of the deployment process exists in the cluster. A higher number of `num_replicas` allows your deployment to serve more requests.

However, note that our model has two distinct steps: (1) preprocessing and (2) ML inference. Preprocessing is an I/O-heavy task, requiring communication with the feature store. ML inference is a compute-heavy task, requiring more CPU/GPU time compared to preprocessing. If our model becomes bottlenecked because of (1), then when we scale it, we waste CPU/GPU resources that get reserved by the extra replicas. On the other hand, if it's bottlenecked because of (2), then scaling it wastes memory.

<img src="images/resource_bounds.png" alt="Disparate resource consumption" width="500"/>

Ray Serve's flexible API makes it easy to split business logic (i.e. anything that's not the ML inference itself), such as preprocessing, from the ML inference step. By splitting these steps into different deployments, we gain finer-grained control over our deployment's scaling and resource consumption. We can split steps (1) and (2), so when we scale one deployment, we don't waste resources scaling the other. Here's an example:

In [None]:
import os
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb

import feast

import ray
from ray import serve

from ray.ml.constants import MODEL_KEY as RAY_MODEL_KEY
from ray.ml.checkpoint import Checkpoint as RayCheckpoint


@serve.deployment
class CreditScoringPreprocessor:

    def __init__(self):
         # Load ordinal encoder
        self.encoder = joblib.load(ENCODER_FILENAME)

        # Set up Feast feature store
        self.fs = feast.FeatureStore(repo_path="feature_repo")
    
    async def __call__(self, http_request):
        model_request = await http_request.json()
        features_array = self.preprocess(model_request)
        model_handle = serve.get_deployment("CreditScoringModel").get_handle()
        return ray.get(model_handle.predict.remote(features_array))
    
    def preprocess(self, model_request):
        # Get online features from Feast
        feature_vector = self._get_online_features_from_feast(model_request)

        # Join features to request features
        features = model_request.copy()
        features.update(feature_vector)
        features_df = pd.DataFrame.from_dict(features)

        # Apply ordinal encoding to categorical features
        self._apply_ordinal_encoding(features_df)

        # Sort columns
        features_df = features_df.reindex(sorted(features_df.columns), axis=1)

        # Drop unnecessary columns
        features_df = features_df[features_df.columns.drop("zipcode").drop("dob_ssn")]
        print("================================")
        print(f"Inference request:")
        print("--------------------------------")
        print(features_df.T)
        print("================================\n")
        return np.array(features_df)
    
    def _apply_ordinal_encoding(self, requests):
        requests[categorical_features] = self.encoder.transform(
            requests[categorical_features]
        )
    
    def _get_online_features_from_feast(self, request):
        zipcode = request["zipcode"][0]
        dob_ssn = request["dob_ssn"][0]

        return self.fs.get_online_features(
            entity_rows=[{"zipcode": zipcode, "dob_ssn": dob_ssn}],
            features=feast_features,
        ).to_dict()


@serve.deployment
class CreditScoringModel:
    MODEL_FILENAME = "model.pkl"
    ENCODER_FILENAME = "encoder.bin"

    def __init__(self):
        # Load model
        print("Loading saved model...")
        self.checkpoint = ray.ml.checkpoint.Checkpoint(MODEL_FILENAME)

         # Load ordinal encoder
        self.encoder = joblib.load(ENCODER_FILENAME)

    def predict(self, features_array):
        # Make prediction
        print("Running inference...")
        xgb_model = self._load_from_checkpoint(self.checkpoint)
        prediction = xgb_model.inplace_predict(features_array)

        # Return result of credit scoring
        return np.round(prediction)
    
    def _load_from_checkpoint(self, checkpoint: RayCheckpoint):
        checkpoint_path = checkpoint.to_directory()
        xgb_model = xgb.Booster()
        xgb_model.load_model(os.path.join(checkpoint_path, RAY_MODEL_KEY))
        return xgb_model


CreditScoringModel.deploy()
CreditScoringPreprocessor.deploy()

In this setup, `CreditScorePreprocessor` receives the HTTP queries. It preprocesses the request and then forwards it to `CreditScoreModel`, which runs inference. `CreditScorePreprocessor` accesses `CreditScoreModel` using a **Ray Serve Handle**:

```python
model_handle = serve.get_deployment("CreditScoringModel").get_handle()
return ray.get(model_handle.predict.remote(features_array))
```

This handle lets you send requests between deployments Pythonically, as though you were passing requests directly between functions or classes. This makes for a more intuitive development and debugging process. This handle also enables straightforward **model composition**, a workload well-suited for Ray Serve. Model composition involves request being routed between multiple different ML models. This is a common pattern when developing aggregation pipelines or ensembles models. Instead of packaging each model in a separate microservice, Ray Serve allows you to develop and deploy them through a single codebase. This makes it easier for you to reason about your multi-model workloads and better understand how the different models interact.

Here are a few common multi-model patterns found in industry:

![Model composition in industry](images/model_composition.png)

We can test our new deployment setup using the same request. However, this time we'll direct our request to the `CreditScoringPreprocessor` endpoint since that handles preprocessing and calling the model's `predict()` function.

In [None]:
import requests

loan_request = {
    "zipcode": [76104],
    "dob_ssn": ["19630621_4278"],
    "person_age": [133],
    "person_income": [59000],
    "person_home_ownership": ["RENT"],
    "person_emp_length": [123.0],
    "loan_intent": ["PERSONAL"],
    "loan_amnt": [35000],
    "loan_int_rate": [16.02],
}

response = requests.post("http://localhost:8000/CreditScoringPreprocessor", json=loan_request)
result = response.json()[0]
print(f"Inference result: {result}\n")

if result == 0:
    print("Loan approved!\n")
elif result == 1:
    print("Loan rejected!\n")

# 7. FastAPI Integration

Ray Serve provides native FastAPI integration that allows you to scale your pre-existing FastAPI applications. The integations also allows you to extend you Ray Serve application with FastAPI, which provides some nice benefits, including automatic parameter validation, some HTTP preprocessing, and a UI to query your deployments. Let's adapt our Ray Serve deployments to use FastAPI:

In [None]:
import os
import joblib
import pydantic
import numpy as np
import pandas as pd
import xgboost as xgb
from fastapi import FastAPI
from typing import List

import feast

import ray
from ray import serve

from ray.ml.constants import MODEL_KEY as RAY_MODEL_KEY
from ray.ml.checkpoint import Checkpoint as RayCheckpoint

MODEL_FILENAME = "model.pkl"
ENCODER_FILENAME = "encoder.bin"

categorical_features = [
    "person_home_ownership",
    "loan_intent",
    "city",
    "state",
    "location_type",
]

feast_features = [
    "zipcode_features:city",
    "zipcode_features:state",
    "zipcode_features:location_type",
    "zipcode_features:tax_returns_filed",
    "zipcode_features:population",
    "zipcode_features:total_wages",
    "credit_history:credit_card_due",
    "credit_history:mortgage_due",
    "credit_history:student_loan_due",
    "credit_history:vehicle_loan_due",
    "credit_history:hard_pulls",
    "credit_history:missed_payments_2y",
    "credit_history:missed_payments_1y",
    "credit_history:missed_payments_6m",
    "credit_history:bankruptcies",
]

class LoanRequest(pydantic.BaseModel):
    zipcode: List[int]
    dob_ssn: List[str]
    person_age: List[int]
    person_income: List[int]
    person_home_ownership: List[str]
    person_emp_length: List[float]
    loan_intent: List[str]
    loan_amnt: List[int]
    loan_int_rate: List[float]

class LoanDecision(pydantic.BaseModel):
    decision: float

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class CreditScoringPreprocessor:

    def __init__(self):
         # Load ordinal encoder
        self.encoder = joblib.load(ENCODER_FILENAME)

        # Set up Feast feature store
        self.fs = feast.FeatureStore(repo_path="feature_repo")
    
    @app.post("/", response_model=LoanDecision)
    async def route(self, loan_request: LoanRequest):
        features_array = self.preprocess(loan_request.dict())
        model_handle = serve.get_deployment("CreditScoringModel").get_handle()
        a = ray.get(model_handle.predict.remote(features_array))
        return {"decision": a}
    
    def preprocess(self, model_request):
        # Get online features from Feast
        feature_vector = self._get_online_features_from_feast(model_request)

        # Join features to request features
        features = model_request.copy()
        features.update(feature_vector)
        features_df = pd.DataFrame.from_dict(features)

        # Apply ordinal encoding to categorical features
        self._apply_ordinal_encoding(features_df)

        # Sort columns
        features_df = features_df.reindex(sorted(features_df.columns), axis=1)

        # Drop unnecessary columns
        features_df = features_df[features_df.columns.drop("zipcode").drop("dob_ssn")]
        print("================================")
        print(f"Inference request:")
        print("--------------------------------")
        print(features_df.T)
        print("================================\n")
        return np.array(features_df)
    
    def _apply_ordinal_encoding(self, requests):
        requests[categorical_features] = self.encoder.transform(
            requests[categorical_features]
        )
    
    def _get_online_features_from_feast(self, request):
        zipcode = request["zipcode"][0]
        dob_ssn = request["dob_ssn"][0]

        return self.fs.get_online_features(
            entity_rows=[{"zipcode": zipcode, "dob_ssn": dob_ssn}],
            features=feast_features,
        ).to_dict()


@serve.deployment
class CreditScoringModel:
    MODEL_FILENAME = "model.pkl"
    ENCODER_FILENAME = "encoder.bin"

    def __init__(self):
        # Load model
        print("Loading saved model...")
        self.checkpoint = ray.ml.checkpoint.Checkpoint(MODEL_FILENAME)

         # Load ordinal encoder
        self.encoder = joblib.load(ENCODER_FILENAME)

    def predict(self, features_array):
        # Make prediction
        print("Running inference...")
        xgb_model = self._load_from_checkpoint(self.checkpoint)
        prediction = xgb_model.inplace_predict(features_array)

        # Return result of credit scoring
        return np.round(prediction)
    
    def _load_from_checkpoint(self, checkpoint: RayCheckpoint):
        checkpoint_path = checkpoint.to_directory()
        xgb_model = xgb.Booster()
        xgb_model.load_model(os.path.join(checkpoint_path, RAY_MODEL_KEY))
        return xgb_model

ray.init(runtime_env={"working_dir": "."})
serve.start()

CreditScoringModel.deploy()
CreditScoringPreprocessor.deploy()


# *************************************************************************************


# Request that gets loan rejected:

# loan_request = {
#     "zipcode": [76104],
#     "dob_ssn": ["19630621_4278"],
#     "person_age": [133],
#     "person_income": [59000],
#     "person_home_ownership": ["RENT"],
#     "person_emp_length": [123.0],
#     "loan_intent": ["PERSONAL"],
#     "loan_amnt": [35000],
#     "loan_int_rate": [16.02],
# }

# Interactive docs: http://localhost:8000/CreditScoringPreprocessor/docs


# 8. Additional Resources


Congratulations on finishing this notebook! If you're interested in learning more, check out these additional resources:

* [Ray Serve Documentation](rayserve.org)
* [Ray Homepage](ray.io)
* [Ray Documentation](https://docs.ray.io/en/latest/)

Also, join the Ray community at the [Ray discussion forum](https://discuss.ray.io/c/ray-serve/6) and the [Ray Slack](https://forms.gle/9TSdDYUgxYs8SA9e8)!