# Data Enrichment for ML Model Deployments

In the [previous blog post](https://www.tekhnoal.com/ml-model-decorators.html) we introduced the decorator pattern for ML model deployments and then showed how to use the pattern to build extensions for machine learning models. The extensions that we showed in the previous post were added without having to modify the machine learning model code at all, we were able to do it by creating a decorator class that wrapped the model. In this blog post we’ll use decorators to add data enrichment capabilities to an ML model.

## Introduction

Machine learning models need data to make predictions. When deploying a model to a production setting, this data is not necessarily available from the client system that is requesting the prediction. When this happens, some other source is needed for the data fields that are required by the model but not provided by the client system. The process of accessing the data and joining it to the client's prediction request is called data enrichment. In all cases, the model itself should not need to be modified in order to do data enrichment, the process should be transparent to the model.

Data enrichment is often done because the client system does not have access to the data that the model needs to make a prediction. In this case, the client must provide a field that the model can use to find the fields that it needs to make a prediction. Once the data is loaded from a data source, the model can be called to make a prediction using the fields that it expects. 

Other times, the client system is simply not the right place to manage the data that the model needs for predictions because of it's complexity. In this case, we would like to prevent the client system from having to manage data that really does not fall within it's natural responsabilities. In order to still allow the client system to still use the model, we can add data enrichment capabilities to the model deployment.

Data enrichment simplifies the work of the client system because a client system can simply provide a way to find the correct data to the deployed ML model. The model deployment is then responsible for going and fetching the correct record, join it to the data provided by the client system, and make a prediction. Data enrichment also prevents the client system from having to manage the data needed by the model, which keeps the two systems from becoming too coupled. By having the model access the data that it needs to make a prediction, the model can change the fields it needs without having to coordinate with the client system at all. 

In this blog post, we’ll show how to create a simple decorator that is able to access a database in order to do data enrichment for an ML model that is deployed to a production system. We'll also show how to deploy the decorator along with the model to a RESTful service, and how to create the necessary database to hold the data.

## Software Architecture

The decorator that we will be building requires an outside database in order to access data to do data enrichment. The software architecture will be a little more complicated because we’ll have to deploy a service for the model as well as a database for the data.

![Software Architecture]({attach}software_architecture.png){ width=100% }

The client system accesses the model by reaching out to the model service which hosts both the model and the decorator that we will be building in this blog post. The decorator is the software component that does the data enrichment needed by the model. The decorator reaches out to the database to access data needed by the model. 

To keep the data that we want to use for enrichment, we’ll use a PostgreSQL database.

# Installing a Model

To make this blog post a little shorter we won't train a completely new model. Instead we'll install a model that we've built in the past.

To install the model, we can use the pip command and point it at the github repo of the model.

In [1]:
from IPython.display import clear_output

!pip install -e git+https://github.com/schmidtbri/regression-model#egg=insurance_charges_model
  
clear_output()

The model is used to estimate insurance charges and we built it in [a previous blog post](https://www.tekhnoal.com/regression-model.html). The code for the model is in [this github repository](https://github.com/schmidtbri/regression-model).

To make a prediction with the model, we'll import the model's class.

In [2]:
from insurance_charges_model.prediction.model import InsuranceChargesModel

clear_output()

Now we can instantiate the model:

In [3]:
model = InsuranceChargesModel()

To make a prediction, we'll need to use the model's input schema class.

In [4]:
from insurance_charges_model.prediction.schemas import InsuranceChargesModelInput, \
    SexEnum, RegionEnum

model_input = InsuranceChargesModelInput(
    age=42, 
    sex=SexEnum.female,
    bmi=24.0,
    children=2,
    smoker=False,
    region=RegionEnum.northwest)

Now we can make a prediction with the model by calling the predict() method with the input.

In [5]:
prediction = model.predict(model_input)

prediction

InsuranceChargesModelOutput(charges=8640.78)

The model predicts that the charges will be $8640.78.

When deploying the model we’ll pretend that the age, sex, bmi, children, smoker, and region fields are not available from the client system that is calling the model. Because of this, we’ll need to add it to the model input by loading the data from the database.

In [6]:
model.input_schema.schema()

{'title': 'InsuranceChargesModelInput',
 'description': "Schema for input of the model's predict method.",
 'type': 'object',
 'properties': {'age': {'title': 'Age',
   'description': 'Age of primary beneficiary in years.',
   'minimum': 18,
   'maximum': 65,
   'type': 'integer'},
  'sex': {'title': 'Sex',
   'description': 'Gender of beneficiary.',
   'allOf': [{'$ref': '#/definitions/SexEnum'}]},
  'bmi': {'title': 'Body Mass Index',
   'description': 'Body mass index of beneficiary.',
   'minimum': 15.0,
   'maximum': 50.0,
   'type': 'number'},
  'children': {'title': 'Children',
   'description': 'Number of children covered by health insurance.',
   'minimum': 0,
   'maximum': 5,
   'type': 'integer'},
  'smoker': {'title': 'Smoker',
   'description': 'Whether beneficiary is a smoker.',
   'type': 'boolean'},
  'region': {'title': 'Region',
   'description': 'Region where beneficiary lives.',
   'allOf': [{'$ref': '#/definitions/RegionEnum'}]}},
 'required': ['age', 'sex', 'bmi',

## Creating the Data Enrichment Decorator

A decorator needs to inherit from the MLModelDecorator base class, which requires a specific set of methods and properties be implemented. A decorator that can access PostgreSQL looks like this:

In [308]:
from typing import List
from pydantic import BaseModel, create_model
import psycopg2
from ml_base.decorator import MLModelDecorator


class PostgreSQLEnrichmentDecorator(MLModelDecorator):
    """Decorator to do data enrichment using a PostgreSQL database."""

    def __init__(self, host: str, port: str, username: str, password: str, database: str, 
                 table: str, index_field_name: str, index_field_type: str, 
                 enrichment_fields: List[str]) -> None:
        super().__init__(host=host, port=port, username=username, password=password, 
                         database=database, table=table, index_field_name=index_field_name, 
                         index_field_type=index_field_type, enrichment_fields=enrichment_fields)
        self.__dict__["_connection"] = None

    @property
    def input_schema(self) -> BaseModel:
        # converting the index field type from a string to a class
        index_field_type = getattr(__builtins__, 
                                   self._configuration["index_field_type"])
        
        input_schema = self._model.input_schema

        # adding index field to schema because it is required in order to retrieve 
        # the right record in the database
        fields = {
            self._configuration["index_field_name"]: (index_field_type, ...)
        }
        for field_name, schema in input_schema.__fields__.items():
            # remove enrichment_fields from schema because they'll be added from the 
            # database and dont need to be provided by the client
            if field_name not in self._configuration["enrichment_fields"]:
                if schema.required:
                    fields[field_name] = (schema.type_, ...)
                else:
                    fields[field_name] = (schema.type_, schema.default)
        
        new_input_schema = create_model(
            input_schema.__name__,
            **fields
        )
        return new_input_schema

    def predict(self, data):
        # create a connection to the database, if it doesn't exist already
        if self.__dict__["_connection"] is None:
            self.__dict__["_connection"] = psycopg2.connect(
                host=self._configuration["host"],
                port=self._configuration["port"],
                database=self._configuration["database"],
                user=self._configuration["username"],
                password=self._configuration["password"])
        cursor = self.__dict__["_connection"].cursor()

        # build a SELECT statement using the index_field and the enrichment_fields
        enrichment_fields = ", ".join(self._configuration["enrichment_fields"])
        sql_statement = "SELECT {} FROM {} WHERE {} = %s;".format(
            enrichment_fields,
            self._configuration["table"],
            self._configuration["index_field_name"])
        
        # executing the SELECT statement
        cursor.execute(sql_statement, 
                       (getattr(data, self._configuration["index_field_name"]), ))
        records = cursor.fetchall()
        cursor.close()
        
        if len(records) == 0:
            raise ValueError("Could not find a record for data enrichment.")
        elif len(records) == 1:
            record = records[0]
        else:
            raise ValueError("Query returned more than one record.")
            
        # creating an instance of the model's input schema using the fields that 
        # came back from the database and fields that are provided by calling code
        input_schema = self.input_schema
        enriched_data = {}
        for field_name in self._model.input_schema.__fields__.keys():
            if field_name == self._configuration["index_field_name"]:
                pass
            elif field_name in self._configuration["enrichment_fields"]:
                field_index = self._configuration["enrichment_fields"].index(field_name)
                enriched_data[field_name] = record[field_index]
            elif field_name in data.dict().keys():
                enriched_data[field_name] = getattr(data, field_name)            
            else:
                raise ValueError("Could not find value for field '{}'.".format(field_name))
        
        # making a prediction with the model, using the enriched fields
        enriched_data = self._model.input_schema(**enriched_data)
        prediction = self._model.predict(data=enriched_data)

        return prediction
    
    def __del__(self):
        if self._connection is not None:
            self._connection.close()


The \_\_init\_\_() method accepts configuration that is used to customize the way that the decorator finds data in the database. The decorator accepts these parameters:

- host: URL for connecting to the database server
- port: port for connecting to the database server
- username: username for accessing the database
- password: password for accessing the database
- table: name of the table in the database where data used for enrichment is found
- index_field_name: name of the field used for selecting record
- index_field_type: type of the index field
- enrichment_fields: names of the fields that will be added to the prediction request sent to the model

The configuration is saved by passing it up to the super class using the super().\_\_init\_\_() method. The configuration values can then be accessed inside of the decorator instance in the self.\_configuration attribute, which is a dictionary.

When the decorator is applied to a model, it modifies the input_schema of the model. It removes the enrichment_fields from the input schema because these fields are going to be added from the database. This means that the client does not need to provide values for them anymore. It also adds the index_field to the input schema because the decorator needs to use this field to access the correct record in the database table. The index_field is added as a required field in the model’s input_schema because the decorator always needs it.

When a prediction request is made to the decorator, it uses the value in the index_field to access the record in the database table. If the decorator finds the record in the table, it selects the enrichment fields and creates a new input for the model and sends it to the model. If the record is not found, the decorator raises an exception. The index_field is actually not sent to the model at all, it is used purely to access the data needed by the model in the database. If more than one record is returned from the database, an exception is raised.

The SQL statement is built dynamically based on the fields required by the model and the index field selected through configuration. For example, if we wanted to do enrichment with all of the input fields of the InsuranceChargesModel, the SELECT statement would look like this:

```sql
SELECT age, sex, bmi, children, smoker, region
FROM clients
WHERE ssn = '123-45-6789'
```

In this case we would be accessing a client record by using their social security number.

## Decorating the Model

To test out the decorator we’ll first instantiate the model object that we want to use with the decorator.

In [205]:
model = InsuranceChargesModel()

Then, we’ll instantiate the decorator with the parameters.

In [206]:
decorator = PostgreSQLEnrichmentDecorator(
    host="", 
    port="",
    username="", 
    password="", 
    database="", 
    table="",
    index_field_name="ssn", 
    index_field_type="str", 
    enrichment_fields=["age", "sex", "bmi", "children", "smoker", "region"])

We won't fill in the database details because we don't have a database to connect to yet. However, we can still see how the model's input and output schemas change because of the decorator.

We can add the model instance to the decorator after it’s been instantiated like this:

In [207]:
decorated_model = decorator.set_model(model)

We can see the decorator and the model objects by printing the reference to the decorator:

In [208]:
decorated_model

PostgreSQLEnrichmentDecorator(InsuranceChargesModel)

Now we’ll try to use the decorator and the model together by doing a few things. First, we’ll look at the model input schema:

In [209]:
decorated_model.input_schema.schema()

{'title': 'InsuranceChargesModelInput',
 'type': 'object',
 'properties': {'ssn': {'title': 'Ssn', 'type': 'string'}},
 'required': ['ssn']}

As we can see, the input schema is not the same as what the model exposed, all of the model’s input fields are now removed because they are being provided by the decorator by accessing the database. The user of the model is not expected to provide a value for those fields. However, there is a new field in the schema, the “ssn” field. This field is used by the decorator to select the correct record in the database.

We can also use a few fields from the database and require the client to provide the rest. To do this we'll instantiate the decorator with a few, but not all, of the fields required by the model as enrichment fields.

In [210]:
decorator = PostgreSQLEnrichmentDecorator(
    host="", 
    port="",
    username="", 
    password="", 
    database="", 
    table="",
    index_field_name="ssn", 
    index_field_type="str", 
    enrichment_fields=["age", "sex", "smoker", "region"])

decorated_model = decorator.set_model(model)

decorated_model.input_schema.schema()

{'title': 'InsuranceChargesModelInput',
 'type': 'object',
 'properties': {'ssn': {'title': 'Ssn', 'type': 'string'},
  'bmi': {'title': 'Bmi', 'minimum': 15.0, 'maximum': 50.0, 'type': 'number'},
  'children': {'title': 'Children',
   'minimum': 0,
   'maximum': 5,
   'type': 'integer'}},
 'required': ['ssn', 'bmi', 'children']}

The model's input schema now requires the fields that are not listed as enrichment fields to be provided by the client. The "ssn" field is still added because the decorator needs it in order to retrieve the enrichment fields from the database.

Next, we’ll look at the decorated model’s output schema:

In [211]:
output_schema = decorated_model.output_schema.schema()

output_schema

{'title': 'InsuranceChargesModelOutput',
 'description': "Schema for output of the model's predict method.",
 'type': 'object',
 'properties': {'charges': {'title': 'Charges',
   'description': 'Individual medical costs billed by health insurance to customer in US dollars.',
   'type': 'number'}},
 'required': ['charges']}

The output schema has not changed at all, the decorator does not modify the prediction result.

## Creating a Database

Now that we have a model and a decorator that can add data to the input of the model, we need to create a database table to pull data from. To do this we’ll first start a PostgreSQL instance in a local docker image.

In [181]:
!docker run --name postgres \
    -p 5432:5432 \
    -e POSTGRES_USER=data_enrichment_user \
    -e POSTGRES_PASSWORD=data_enrichment_password \
    -e POSTGRES_DB=data_enrichment \
    -d postgres

faab5c57cf4ba3450a10822cf649539503349f769ff199cbefde678593359e4a


We can connect to the database by starting a client within the same container.

In [182]:
!CONNECTION_STRING="postgresql://data_enrichment_user:data_enrichment_password@localhost:5432/data_enrichment"

!docker run -it --rm \
    --network host postgres \
    psql "$CONNECTION_STRING" \
    -c "SELECT current_database();"

 current_database 
------------------
 data_enrichment
(1 row)



The current database within the server is called "data_enrichment" and it was created when the docker image started. We saved the connection string to the CONNECTION_STRING variable so we won't need to keep adding it.

Next we'll execute a SQL statement that creates a table within the database.

In [183]:
!docker run -it --rm \
    --network host postgres \
    psql "$CONNECTION_STRING" \
    -c "CREATE TABLE clients ( \
    ssn         varchar(11) PRIMARY KEY, \
    first_name  varchar(30) NOT NULL, \
    last_name   varchar(30) NOT NULL, \
    age         integer     NOT NULL, \
    sex         varchar(6)  NOT NULL, \
    bmi         integer     NOT NULL, \
    children    integer     NOT NULL, \
    smoker      boolean     NOT NULL, \
    region      varchar(10) NOT NULL \
);"

CREATE TABLE


The table has been created, we can see the table schema looks like this:

In [184]:
!docker run -it --rm \
    --network host postgres \
    psql "$CONNECTION_STRING" \
    -c "\d clients"

                       Table "public.clients"
   Column   |         Type          | Collation | Nullable | Default 
------------+-----------------------+-----------+----------+---------
 ssn        | character varying(11) |           | not null | 
 first_name | character varying(30) |           | not null | 
 last_name  | character varying(30) |           | not null | 
 age        | integer               |           | not null | 
 sex        | character varying(6)  |           | not null | 
 bmi        | integer               |           | not null | 
 children   | integer               |           | not null | 
 smoker     | boolean               |           | not null | 
 region     | character varying(10) |           | not null | 
Indexes:
    "clients_pkey" PRIMARY KEY, btree (ssn)



The table has columns for all of the fields that the model requires to make a prediction plus two columns for the first and last name. It also has an index field called “ssn” because we’ll be referencing each record using a fake Social Security number. The ssn field is the unique identifier for each record and is a good way to correlate data from different systems. 

Then we’ll run a some code that connects to the database and inserts fake data into the table. To do this we'll use the faker package, so we'll need to install it.

In [185]:
!pip install Faker

clear_output()

To add data to the table, we'll just generate some data for each column in the database table.

In [186]:
from faker import Faker

fake = Faker()

records = list()
for _ in range(1000):
    record = {
        "ssn": fake.ssn(),
        "age": fake.random_int(min=18, max=80),
        "sex": fake.random_choices(elements=("male", "female"), length=1)[0],
        "bmi": fake.random_int(min=15, max=60),
        "children": fake.random_int(min=0, max=5),
        "smoker": fake.boolean(),
        "region": fake.random_choices(elements=("southwest", "southeast", "northwest", "northeast"), length=1)[0],
        "first_name": fake.first_name_male() if sex =="male" else fake.first_name_female(),
        "last_name": fake.last_name()
    }
    records.append(record)

Notice that each field is generating data that does not necessarily fit the schema of the model. For example, the maximum value allowed by the model for the "age" field is 65, but the faker data can go up to 80. We'll use records that do not match the model's schema to test the decorator later.

Let's take a look at the first record:

In [187]:
records[0]

{'ssn': '463-29-7900',
 'age': 55,
 'sex': 'male',
 'bmi': 40,
 'children': 1,
 'smoker': True,
 'region': 'southwest',
 'first_name': 'Kathy',
 'last_name': 'Hernandez'}

Now let's find a record that does not fit the model's schema so we can use it later:

In [188]:
next(record for record in records if record["age"] > 65)

{'ssn': '391-98-4846',
 'age': 76,
 'sex': 'male',
 'bmi': 24,
 'children': 2,
 'smoker': True,
 'region': 'southwest',
 'first_name': 'Michelle',
 'last_name': 'Murphy'}

We'll use the ssn later to test out the decorator's error handling.

Now we can put the 1000 fake records generated in the database.

In [192]:
connection = psycopg2.connect(
    host="localhost",
    port="5432",
    database="data_enrichment",
    user="data_enrichment_user",
    password="data_enrichment_password")

cursor = connection.cursor()

for record in records:
    cursor.execute("INSERT INTO clients (ssn, first_name, last_name, age, sex, bmi, children, smoker, region)"
                   "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);",
                   (record["ssn"], record["first_name"], record["last_name"], record["age"], record["sex"], 
                    record["bmi"], record["children"], record["smoker"], record["region"]))
    connection.commit()

cursor.close()
connection.close()               

The database now has a table that has records that we can use to try out the model using the decorator.

We'll access a some records to see the data:

In [193]:
!docker run -it --rm \
    --network host postgres \
    psql "$CONNECTION_STRING" \
    -c "SELECT ssn, first_name, last_name FROM clients LIMIT 5;"

     ssn     | first_name | last_name 
-------------+------------+-----------
 463-29-7900 | Kathy      | Hernandez
 391-98-4846 | Michelle   | Murphy
 892-98-2455 | Michaela   | Jackson
 485-35-8464 | Carol      | Lane
 691-81-4261 | Sara       | Castro
(5 rows)



## Trying out the Decorator

Now that we have some data in the database, we can try to make predictions with the model.

In [298]:
decorator = PostgreSQLEnrichmentDecorator(
    host="localhost",
    port="5432",
    username="data_enrichment_user", 
    password="data_enrichment_password", 
    database="data_enrichment", 
    table="clients",
    index_field_name="ssn", 
    index_field_type="str", 
    enrichment_fields=["age", "sex", "bmi", "children", "smoker", "region"])

decorated_model = decorator.set_model(model)

In [299]:
model_input = decorated_model.input_schema(ssn="463-29-7900")

prediction = decorated_model.predict(model_input)

prediction

InsuranceChargesModelOutput(charges=46526.58)

We provided a value for the ssn field and the decorator was able to retrieve the value for the other fields for the model to use.

Next, we'll see what happens when we try to do data enrichment with a record that does not exist in the database.

In [300]:
model_input = decorated_model.input_schema(ssn="123-45-6789")

try:
    decorated_model.predict(model_input)
except ValueError as e:
    print(e)

Could not find a record for data enrichment.


The decorator raised a ValueError exception because it could not find the needed record.

We can also leave some fields for the client of the model to provide and pull all other fields from the database. We just need to instantiate the decorator a little differently.

In [309]:
decorator = PostgreSQLEnrichmentDecorator(
    host="localhost",
    port="5432", 
    username="data_enrichment_user", 
    password="data_enrichment_password", 
    database="data_enrichment", 
    table="clients",
    index_field_name="ssn", 
    index_field_type="str", 
    enrichment_fields=["age", "sex", "bmi", "region"])

decorated_model = decorator.set_model(model)

To see which fields are now required by the model, we'll take a look at the input schema of the decorated model.

In [310]:
input_schema = decorated_model.input_schema.schema()

input_schema

{'title': 'InsuranceChargesModelInput',
 'type': 'object',
 'properties': {'ssn': {'title': 'Ssn', 'type': 'string'},
  'children': {'title': 'Children',
   'minimum': 0,
   'maximum': 5,
   'type': 'integer'},
  'smoker': {'title': 'Smoker', 'type': 'boolean'}},
 'required': ['ssn', 'children', 'smoker']}

The decorator has removed the age, sex, bmi, and region fields from the input schema. It has left the smoker and children fields in place, and it has added the ssn field as we expected.

Now we can try the decorator with this new input schema:

In [311]:
model_input = decorated_model.input_schema(ssn="463-29-7900", children=2, smoker=False)

prediction = decorated_model.predict(model_input)

prediction

InsuranceChargesModelOutput(charges=14110.42)

The decorator was able to bring in the values from the database and the client in order to make a prediction. 

We're done with the local database so we'll shut down the docker container.

In [314]:
!docker kill postgres

!docker rm postgres

Error response from daemon: Cannot kill container: postgres: No such container: postgres
Error: No such container: postgres


## Adding a Decorator to a Deployed Model

In which we use the DataEnrichmentDecorator alongside the model…

## Deploying the Service

In which we deploy a PostgreSQL database alongside the model service with some data…

Creating a PostgreSQL service in Kubernetes…

Adding a table to the db instance…

Adding data to the db instance…

Creating a model service in Kubernetes…

Deploying the resources…

Testing the service...


## Closing

In this blog post, we showed how to use decorators to perform data enrichment for machine learning models. Data enrichment is a common requirement across many different ML model deployments. By using a decorator that accesses the database for data, we’re able to reuse common code. 

Upsides 
One of the benefits of using a decorator for the ML model is that we keep the model prediction code and the data access code separate from each other…

Another benefit is that we are able to reuse the decorator to do data enrichment for any ML model that needs to do data enrichment from a PostgreSQL database…

We can do this because we parameterized the decorator object so that it can handle any number of fields...

Downsides

Improvements
The data generation and data insertion that we did for this blog post is not a realistic way to do it. In production systems there would be a dedicated service that maintains the data in the table up to date. We did it this way in order to keep the post short.

The index field and the enrichment fields where required to be named exactly the same in the database table. We can do a simple mapping that would allow variance...

The schema creation does not allow for more complex schemas like default factories, aliases, etc...
