In [1]:
#!pip install great_expectations

# ML Engineering


Here we will be discussing ML Engineering topics and how to get models into prod.



## Data Engineering

Data Engineering is the corner stone on model buiding, deployment and serving.
Except the traditional ETL pipeline however, ML Engineering needs to address several other issues before serving data to a model for prediction.

This can be thought as a "Data Unit Testing" or in a sense, some form of anomaly detection in data.


### Checks on input


Every ML algorithm comes with certain assumptions on the form and the underlying distribution in the data it has been trained on. Sometimes, due to several reasons, data is broken or invalid. 

From a ML Engineering team perspective, we would like to have all the checkpoints in place to detect problems in the data **before** we serve them to the algorith.

This process essentially acts as an anomaly detection filter on top of the data pipelines and it is typically a good practice in every ETL/Analytics pipeline, not just for ML problems.

There are several checks one may want to apply on the data before served to the algo/transformation pipelines:


- Data type consistency:
    - Real numbers are real numbers
    - Integers
    - Strings
    - Sets/Dictionaries
    - Categorical data are fixed and identical to what the model has been trained on  etc 
    


For numerical data, there is a full set of potenial checks, application-specific that one can apply before feeding the dataset to the algo:

- Data type validation and concistency
    - Are reals, reals?
    - Are integers, integers?
    
- Statistical Checks
    - Mean
    - Median
    - Standard deviation
    - Percentiles 
    - Maximum values/minimum values
    - Entropy
    
- Strings:
    - Sanity checks on the input strings
    - Vocabulary consistency 
    - Entropy 
- Categorical data
    - Consistency checks on the input values 
    - Missing data 


There are a few libraries one can use in this process, namely `Dequee` from Amazon and `Great Expectations` (python).


### Deequ

Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets. 


Deequ's purpose is to "unit-test" data to find errors early, before the data gets fed to consuming systems or machine learning algorithms.


Let's define a class of data

```Scala
case class Item(
  id: Long,
  productName: String,
  description: String,
  priority: String,
  numViews: Long
)
 
val rdd = spark.sparkContext.parallelize(Seq(
  Item(1, "Thingy A", "awesome thing.", "high", 0),
  Item(2, "Thingy B", "available at http://thingb.com", null, 0),
  Item(3, null, null, "low", 5),
  Item(4, "Thingy D", "checkout https://thingd.ca", "low", 10),
  Item(5, "Thingy E", null, "high", 12)))

val data = spark.createDataFrame(rdd)

```


most applications that work with data have implicit assumptions about that data, e.g., that attributes have certain types, do not contain NULL values, and so on. If these assumptions are violated, your application might crash or produce wrong outputs. The idea behind deequ is to explicitly state these assumptions in the form of a "unit-test" for data, which can be verified on a piece of data at hand. If the data has errors, we can "quarantine" and fix it, before we feed to an application.

The main entry point for defining how you expect your data to look is the VerificationSuite from which you can add Checks that define constraints on attributes of the data. In this example, we test for the following properties of our data:

- there are 5 rows in total
- values of the id attribute are never NULL and unique
- values of the productName attribute are never NULL
- the priority attribute can only contain "high" or "low" as value
- numViews should not contain negative values
- at least half of the values in description should contain a url
- the median of numViews should be less than or equal to 10


In code this looks as follows:


```Scala
import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}


val verificationResult = VerificationSuite()
  .onData(data)
  .addCheck(
    Check(CheckLevel.Error, "unit testing my data")
      .hasSize(_ == 5) // we expect 5 rows
      .isComplete("id") // should never be NULL
      .isUnique("id") // should not contain duplicates
      .isComplete("productName") // should never be NULL
      // should only contain the values "high" and "low"
      .isContainedIn("priority", Array("high", "low"))
      .isNonNegative("numViews") // should not contain negative values
      // at least half of the descriptions should contain a url
      .containsURL("description", _ >= 0.5)
      // half of the items should have less than 10 views
      .hasApproxQuantile("numViews", 0.5, _ <= 10))
    .run()
    
``` 


After calling run, deequ translates your test to a series of Spark jobs, which it executes to compute metrics on the data. Afterwards it invokes your assertion functions (e.g., _ == 5 for the size check) on these metrics to see if the constraints hold on the data. We can inspect the VerificationResult to see if the test found errors:

```Scala
import com.amazon.deequ.constraints.ConstraintStatus


if (verificationResult.status == CheckStatus.Success) {
  println("The data passed the test, everything is fine!")
} else {
  println("We found errors in the data:\n")

  val resultsForAllConstraints = verificationResult.checkResults
    .flatMap { case (_, checkResult) => checkResult.constraintResults }

  resultsForAllConstraints
    .filter { _.status != ConstraintStatus.Success }
    .foreach { result => println(s"${result.constraint}: ${result.message.get}") }
}
```


Which will produce the following output:


```We found errors in the data:

CompletenessConstraint(Completeness(productName)): Value: 0.8 does not meet the requirement!
PatternConstraint(containsURL(description)): Value: 0.4 does not meet the requirement!
```

Deequ is also available in PySpark:

```python
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("b", lambda x: x == 0) \
        .isComplete("c")  \
        .isUnique("a")  \
        .isContainedIn("a", ["foo", "bar", "baz"]) \
        .isNonNegative("b")) \
    .run()
    
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()
```


### GreatExpectations


Great Expectations [...] by offering a unique approach to automated testing: pipeline tests. Pipeline tests are applied to data (instead of code) and at batch time (instead of compile or deploy time). Pipeline tests are like unit tests for datasets: they help you guard against upstream data changes and monitor data quality.


In [42]:
import great_expectations as ge
import pandas as pd

data = [
    ("1", "male", 1, 1334),
    ("2", "female", 2, 643),
    ("3", "male", 2, 645),
    ("4", "male", 1, 234),
    ("5", "female", 2, 23),
    ("6", "male", 1, 454),
    ("6", "male", 1, 454),

]

df = pd.DataFrame( data = data , columns = ["id", "gender", "col1", "col2"])

ge_df = ge.from_pandas( df )



ge_df.expect_column_distinct_values_to_be_in_set( "gender", ["male", "female"])
ge_df.expect_column_mean_to_be_between( column="col1", min_value=1,  max_value=3)
ge_df.expect_column_max_to_be_between( column="col2", min_value=100, max_value=2000)
ge_df.expect_column_mean_to_be_between( column="col2", min_value=100, max_value=1000 )

{
  "meta": {},
  "result": {
    "observed_value": 541.0,
    "element_count": 7,
    "missing_count": null,
    "missing_percent": null
  },
  "success": true,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [43]:
ge_df.get_expectation_suite( discard_failed_expectations  = True)

{
  "data_asset_type": "Dataset",
  "expectation_suite_name": "default",
  "meta": {
    "great_expectations_version": "0.13.2"
  },
  "expectations": [
    {
      "meta": {},
      "kwargs": {
        "column": "gender",
        "value_set": [
          "male",
          "female"
        ]
      },
      "expectation_type": "expect_column_distinct_values_to_be_in_set"
    },
    {
      "meta": {},
      "kwargs": {
        "column": "col1",
        "min_value": 1,
        "max_value": 3
      },
      "expectation_type": "expect_column_mean_to_be_between"
    },
    {
      "meta": {},
      "kwargs": {
        "column": "col2",
        "min_value": 100,
        "max_value": 2000
      },
      "expectation_type": "expect_column_max_to_be_between"
    },
    {
      "meta": {},
      "kwargs": {
        "column": "col2",
        "min_value": 100,
        "max_value": 1000
      },
      "expectation_type": "expect_column_mean_to_be_between"
    }
  ]
}

## Model Persistence and Versioning


### sklearn 
In the sklearn world, the more common scenario is to use joblib to store the binary model after training and model evaluation and then, in serving time, load the model and serve accordingly.


### pytorch 
In pytorch we have the following options:


- torch.save: Saves a serialized object to disk. This function uses Python’s pickle utility for serialization. Models, tensors, and dictionaries of all kinds of objects can be saved using this function.

- torch.load: Uses pickle’s unpickling facilities to deserialize pickled object files to memory. This function also facilitates the device to load the data into (see Saving & Loading Model Across Devices).

- torch.nn.Module.load_state_dict: Loads a model’s parameter dictionary using a deserialized state_dict.


In PyTorch, the learnable parameters (i.e. weights and biases) of an torch.nn.Module model are contained in the model’s parameters (accessed with model.parameters()). A state_dict is simply a Python dictionary object that maps each layer to its parameter tensor. Note that only layers with learnable parameters (convolutional layers, linear layers, etc.) and registered buffers (batchnorm’s running_mean) have entries in the model’s state_dict. Optimizer objects (torch.optim) also have a state_dict, which contains information about the optimizer’s state, as well as the hyperparameters used.



### ONNX
Open Neural Network Exchange is the open standard for machine learning interoperability. Pytorch has ONNX support and there is also some libraries that are offering ONNX support for sklearn as well, such as `sklearn-onnx` 


It is always a good idea to version models and their code altogether, using an internal versioning scheme. 
Of course this comes with a cost of disk space, however storage is relatively cheap and not much of an issue.


## Serving models

There are two major patterns for serving models. The batch mode and the real time mode.
These patterns typically obey business related rules and depend on the nature of the problem the ML model is solving.


### Batch mode


In the batch mode, the model runs in one-off scenarios in fixed time intervals. For example, a demand forecasting model could run every weekend to predict demand for next week. Similarily, a recommender system would update weekly or daily or a price optimisation system could run 2 or 3 times daily to update prices, depending on the business context. 


In this scenario, model would produce the entire output required for the business to operate, eg recommendations for all the products, forecasting for all the products etc.


The primary steps are:-
- Code runs at fixed time/interval
- Code loads the model from the saved location
- Code reads a batch of input data
- Input data is new and unlabelled data that we want predictions for-
- Input data might have data for multiple users/entities grouped together
- Code runs model prediction over the Input batch and produces a Prediction batch
- Prediction batch contains the predicted labels for each record in the input data
- Predicted data is then saved in some new location



### Real time mode. 

In this scenario, the machine learning pipeline is attached to an on-demand/streaming scenario, where the model is essentially a transformation or augmentation of the input data, such as for example image labelling and object detection in real time applications.


We have a web service that wraps our code
The web service exposes Rest endpoints for getting predictions
Consumer application makes web service call and sends input data in Json format
Input data contains all the feature required for prediction. It typically has only one record instead of a batch
Code loads the model from the saved location
Code gets input data when the web service endpoint is called
Code runs model prediction over the Input data and produces Prediction data
Prediction data is return back to the consumer application
Consumer application can decide how to use the prediction data for a better user experience



### Predictions via SQL

This is a new trend that has caught off in the industry recently. This approach exposes the ML model as a SQL function which is applied as a transformation to the input data.

This approach treats new input data as tables and allows ad-hoc analysis on the data by running our ML model as a function. The output is also viewed as table and can be saved for future if required.


- ML model is wrapped in a SQL UDF
- There is an SQL execution engine (like Spark or Google Big Table) that understands the UDF
- SQL execution engine loads the code in a UDF
- The user issues a SQL query to the execution engine, selecting the feature table in the SQL query
- The execution engine runs the input features through the UDF to compute prediction
- The prediction data is returned to the user
- User might save the predicted data as a new table



## Simple Model Serving


In the simplest case, we can setup a REST API to serve our models and also keep track of model versioning.

This can be done by setting up a REST server which will listen to a certain port and will be structured to serve models in the form:

```javascript
/model_name/version/endpoint
```

for example 

```bash
curl -XPOST "data=@my_image.jpg" http://localhost:5000/pet_recognition/1/endpoint 
```

This is very practical for a small number of models even in large input scale as we can scale horizontally with docker images and load balancers. 


### Airflow jobs

For the batch mode, airflow scheduled jobs (or something similar) is a very good option. It allows to setup a DAG of tasks with the prediction phase included. It can combine ETL and feature engineering with the data validation/testing step above and finally the prediction step. 


### Docker Containers 

Docker containers can be used both for training and model serving jobs. Depending on the nature of the task, docker containers can be useful both in batch mode predictions as well as in real time/streaming scenarios, with the extra benefit of scaling.


### MLFlow

ML flow is open source and provides us capabilities for model tracking and serving. ML flow is also great for training lifecycle because it provides a web interface that shows model performance and accuracy over multiple runs. ML flow also lets us to group multiple runs as part of a single experiment.


### SageMaker 

Amazon's SageMaker is a widely adopted solution for model serving in AWS. 
Pros: - Managed by Amazon, no in house infrastructure required
Cons: - it can be expensive
      - You may or may not want to lock yourself with Amazon
      


### Google BigQuery ML

Google is also offering machine learning capabilities on top of BigQuery. It lets us use ML models as SQL functions on top of existing BigQuery tables.

Pros: - Managed by Google, no in house infrastructure required
Cons: - it can be expensive
      - You may or may not want to lock yourself with google