# Data Validation

### Tutorial Notebook no how to implement and configure GE validation to run with pipeline
##### In this tutorial, we would:
    - overview of kedro-ge integration
    - create and edit and expectation suite for a dataset
    - add 2 types of expectations. (Table and column level)
    - save expectations to our validation suite
    - review the results in html

This module helps identify anomalies and outliers in the dataset. It is based on kedro GE framework and has custom expectations that can be used.
## How to get started?
The following steps will help get started with setting up data validation.
### Installing Kedro GE
#### Step 1: Install kedro GE
You can install kedro-great-expectations using pip. 
```commandline
pip install optimus/packages/kedro_great_expectations-0.3.0-py3-none-any.whl
```
> .whl file for kedro_great_expectations is also available on [box](https://mckinsey.box.com/v/kedro-great-expectations)

##### Default Rules
Please refer to this [link](https://docs.greatexpectations.io/en/v0.4.4/glossary.html) for all the available rules.

#### Step 2: Using kedro GE in pipeline
Modify your src/optimus_pkg/ins_data_assets/run.py to the following to enable validation on kedro run
```python
from kedro.context import KedroContext
from kedro_great_expectations import GreatExpectationsMixin
 
class ProjectContext(GreatExpectationsMixin, KedroContext):
    # refer to sample config in optimus/pipeline/conf/base/kedro_ge.yml
    ge_config_key = "kedro_ge.yml"   # optional, defaults to this value
    ### ...
```

#### Step 3: Setting up and configuring great expectations
When kedro GE is initiated, it generates a `kedro_ge.yml` configuration file  in the `conf/base` folder. This can be done by running the following command
```commandline
kedro ge init
```
This file can be configured to suit the project needs. The class path for custom expectations developed for OptimusAI is included in this file.
For more information on how to configure, please refer to [GE documentation](https://one.quantumblack.com/docs/alchemy/kedro_great_expectations/03_user_guide/01_configuration.html).


#### Step 4: Create a GE suite.
The following commands will help create an empty GE suite for each dataset. Make sure you are in the pipeline folder before executing the commands.
 
```commandline
cd pipeline
kedro ge generate <dataset_name> --empty
kedro ge edit <dataset_name>
``` 
 This will open a jupyter notebook `dataset_name.ipynb` for editing.

#### **Step 5: Build your Expectation Suite**
OptimusAI has built some custom expectations that can be used in addition to those provided by GE. These can be found in the `great_expectations_utils.py` file. The custom expectation class and its methods are detailed in the `adtk_custom_expectation.py` file.
Simply copy paste the desired method into the notebook. The below example implements Anomaly detection using quantiles.

```python
from optimus_pkg.data_validation.great_expectations_utils import *
params = context.params

# Custom Expectation - Quantile anomaly detection
validate_column_quantile_anomaly(batch, params)
```

The parameter file for data validation module is located at `<my_project>/pipeline/conf/base/pipelines/validate/parameters.yml`

```
dataset_1:
  column_list: ["status_time", "outp_quantity", "inp_quantity", "cu_content", "inp_avg_hardness"]

  data_length:
    min_value: 0
    max_value: 26

  schema:
    "cu_content": "float64"
    "inp_avg_hardness": "float64"
    "inp_quantity": "float64"
    "outp_quantity": "float64"
    "status_time": "object"

  time:
    column: "status_time"
    format: "%Y-%m-%d %H:%M:%S"

  process_window: 8       #  amount of time to complete ops process

  sensor_pair_1:
    first_sensor: "inp_quantity"
    second_sensor: "outp_quantity"

  quantile_anomaly:
    low: 0.01             # Quantile of historical data lower which a value is regarded as anomaly
    high:  0.99           # Quantile of historical data above which a value is regarded as anomaly
```
Params are designated by dataset i.e. each data can have their own top level key to differentiate between config

## Custom Expectations
Currently, OptimusAI supports two types of anomaly detection. 
- Rule based anomaly detection
- Model based advanced anomaly detection

All of them have been implemented using the Anomaly Detection ToolKit ([ADTK](https://adtk.readthedocs.io/en/stable/index.html)) package.


#### Rule based anomaly detection
The following methods detect anomalies using set rules to detect anomalies.
1. Level Shift Anomaly Detection: `create_level_shift_expectation`
This detects level shifts in the dataset by comparing values from two time windows.
2. Quantile Anomaly Detection: `validate_column_quantile_anomaly`
This detects anomalies based on quantiles of historical data
3. Persist Anomaly Detection: `validate_column_persist_anomaly`
This detects anomalies based on values in a preceding time period.

#### Advanced Anomaly detection 
Sometimes, it is difficult to detect anomalies based on simple rules. Model based anomaly detection can help solve this issue. The following methods are currently available. 
1. Isolation Forest: `validate_multi_dimension_isolationforest_anomaly`
This method identifies time points as anomalous based isolation forest technique. This is a tree based technique and is highly effective in high dimensional data. 
2. KMeans Clustering: `validate_multi_dimension_cluster_anomaly`
This method identifies anomalies based on clustering historical data

## FAQ
### 1. Can I add my own expectation?
Yes, you can create your own expectation. 
1. Go to ./pipeline/src/optimus_pkg/data_validation/adtk_custom_expectation.py
2. Add your function to the class `CustomADTKExpectations`.
3. Include your function in the GE utils file, i.e., `great_expectations_utils.py`. 
4. Call this function when creating your GE suite through the jupyter notebook generated for your dataset.

### 2. What are decorators? How is it used here?
Decorators are callable objects that add new functionality to existing objects without modifying its structure. GE provides high-level decorators that help convert our custom functions into a fully-fledged expectation.
We use `column_aggregate_expectation` decorator from class `MetaPandasDataset`, other options include . For more information on them, refer to the [GE documentation](https://docs.greatexpectations.io/en/latest/autoapi/great_expectations/dataset/index.html#great_expectations.dataset.MetaPandasDataset). 

### 3. How should I configure my expectation file?
You can configure how validation works for your datasets by using the following config schema available [here](https://one.quantumblack.com/docs/alchemy/kedro_great_expectations/03_user_guide/01_configuration.html)

### 4. Something is not right. How can I get in touch?
Please raise an issue [here](https://git.mckinsey-solutions.com/opm/optimus/issues/new/choose). Alternatively, get in touch via slack [#optimus](https://mckinsey-client-cap.slack.com/archives/C9S1RM6SX).

# Example of a GE validation  notebook
Use this notebook to recreate and modify your expectation suite:

**Expectation Suite Name**: `dataset_name` <br>
*for this tutorial we would use in_out_recent as our dataset*

In [1]:
%reload_kedro

2020-08-17 13:35:14,111 - root - INFO - ** Kedro project optimus_pkg
2020-08-17 13:35:14,112 - root - INFO - Defined global variable `context` and `catalog`
2020-08-17 13:35:14,117 - root - INFO - Registered line magic `run_viz`


### Read dataset and assign batch

In [2]:
from datetime import datetime

import great_expectations.jupyter_ux
from great_expectations.data_context.types.resource_identifiers import (
    ValidationResultIdentifier,
)
from kedro_great_expectations.config import KedroGEConfig
from kedro_great_expectations import ge_context as ge

kedro_ge_config = KedroGEConfig.for_interactive_mode(context)

data_context = ge.get_ge_context()

expectation_suite_name = "in_out_recent"
dataset_name = "in_out_recent"
suite = data_context.get_expectation_suite(expectation_suite_name)
suite.expectations = []

# Use kedro to load the dataset:
batch_kwargs = ge.get_batch_kwargs(
    data=catalog.load(dataset_name), ds_name=dataset_name, ge_context=data_context
)
batch = data_context.get_batch(batch_kwargs, suite.expectation_suite_name)
batch.head(5)

2020-08-17T13:35:16-0500 - INFO - Great Expectations logging enabled at 20 level by JupyterUX module.
2020-08-17 13:35:16,153 - great_expectations - INFO - Great Expectations logging enabled at 20 level by JupyterUX module.
  from collections import Mapping

  cls = validator_for(schema)

2020-08-17 13:35:16,461 - kedro.io.data_catalog - INFO - Loading data from `in_out_recent` (CSVDataSet)...
  self._batch_parameters = batch_parameters



Unnamed: 0,status_time,inp_quantity,cu_content,outp_quantity,inp_avg_hardness
0,2020-07-27 03:59:58,147.0,0.079816,159,0.478026
1,2020-07-27 04:14:59,230.0,0.079816,238,0.503276
2,2020-07-27 04:29:56,251.0,0.079816,246,0.506612
3,2020-07-27 04:45:04,250.0,0.079816,251,0.518337
4,2020-07-27 05:00:00,240.0,0.079683,242,0.489977


## Clear all expectations

If this is the first time you're editing this expectation suite and you've autogenerated the expectations, you may wish to clear all and add the expectations selectively.

In that case, run the code cell below and execute the cells containing the expectations you wish to keep before saving the suite. You can either delete the cells of those you don't wish to keep, but they will be automatically removed the next time you run `kedro ge edit in_out_recent` anyway.


In [3]:
batch._expectation_suite.expectations = []
from optimus_pkg.core.data_validation.great_expectations_utils import *
params = context.params

### Table Expectation(s)

#### Validate if sensors are part of the dataframe

In [4]:
create_sensor_exist_expectation(batch, params)

#### Validate if tags are part of the dataframe

In [5]:
create_data_length_expectation(batch, params)

### Column Expectation(s)

#### Validate a dataset has no null values in column

In [6]:
create_not_null_expectations_from_tagdict(batch)

#### Validate the schema of a dataframe  with predefined key-pairs

In [7]:
create_data_schema_expectation(batch, params)

#### Validate the timestamp column of the dataframe and ensure it conforms to the format provided

In [8]:
create_time_format_expectation(batch, params)

#### Validate the value range of a dataset based on expected values defined in the TagDict

In [9]:
# load tag dictionary
td = catalog.load('td')
create_range_expectations_from_tagdict(batch, td)

2020-08-17 13:35:32,157 - kedro.io.data_catalog - INFO - Loading data from `td` (TagDictCSVLocalDataSet)...


#### Validate the sensor pairs to ensure if they have the same values

In [10]:
create_sensor_pair_equals_expectation(batch, params)

#### Validate sensor values are not violating flatline rules i.e. no data change with in a process period

In [11]:
create_flatline_expectation(batch, params)

#### Validate sensor values are not violating quantile anomaly detection

In [12]:
validate_column_quantile_anomaly(batch, params)

#### Validate sensor values are not violating level shift anomaly detection

In [13]:
create_level_shift_expectation(batch, params)

2020-08-17 13:35:37,340 - numexpr.utils - INFO - Note: NumExpr detected 12 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.
2020-08-17 13:35:37,341 - numexpr.utils - INFO - NumExpr defaulting to 8 threads.


In [14]:
validate_column_persist_anomaly(batch, params)

In [15]:
validate_multi_dimension_cluster_anomaly(batch, params)

  self._batch_parameters = batch_parameters

  self._batch_parameters = batch_parameters



## Save Your Expectations

Let's save the expectation suite as a JSON file in the `great_expectations/expectations` directory of your project.
If you decide not to save some expectations that you created, use the [remove_expectaton method](https://docs.greatexpectations.io/en/latest/module_docs/data_asset_module.html?highlight=remove_expectation&utm_source=notebook&utm_medium=edit_expectations#great_expectations.data_asset.data_asset.DataAsset.remove_expectation).


In [16]:
batch.save_expectation_suite(discard_failed_expectations=False)

2020-08-17T13:35:41-0500 - INFO - 	37 expectation(s) included in expectation_suite. result_format settings filtered.
2020-08-17 13:35:41,525 - great_expectations.data_asset.data_asset - INFO - 	37 expectation(s) included in expectation_suite. result_format settings filtered.
  cls = validator_for(schema)



## Review your Expectations (optional)

Let's now run the validation operators against your expectation suite and rebuild your Data Docs, which helps you communicate about your data with both machines and humans.


In [17]:
run_id = datetime.utcnow().strftime("%Y%m%dT%H%M%S.%fZ-kedro-ge-edit")

results = data_context.run_validation_operator("action_list_operator", assets_to_validate=[batch], run_id=run_id)
expectation_suite_identifier = list(results["details"].keys())[0]
validation_result_identifier = ValidationResultIdentifier(
    expectation_suite_identifier=expectation_suite_identifier,
    batch_identifier=batch.batch_kwargs.to_id(),
    run_id=run_id
)
data_context.build_data_docs()
data_context.open_data_docs(validation_result_identifier)


2020-08-17T13:35:43-0500 - INFO - 	37 expectation(s) included in expectation_suite.
2020-08-17 13:35:43,861 - great_expectations.data_asset.data_asset - INFO - 	37 expectation(s) included in expectation_suite.
  if not isinstance(data, list) and np.isnan(data):

