[![Open In Collab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/jiahaom/MLOps/blob/main/1_Data_Validation.ipynb)


# 1. Data Validation

## 1.1 Data Quality
> Data is the First Class Citizen for ML.

ℹ️ Data quality is important for a data pipeline in MLops because it affects the performance and accuracy of the machine learning models that are built and deployed. Ensuring high data quality is an important step in the MLops process to ensure that the models are reliable and accurate.

❌ Poor quality data can lead to inaccurate or biased models, which can result in poor decision-making and poor performance in production. Additionally, poor data quality can also cause delays and additional costs in the development and deployment of machine learning models. 

✅ By generating and visualizing data statistics, it is possible to get a better understanding of the data and identify potential problems early on. This can help to improve the performance and accuracy of the machine learning models that are built and deployed.

## 1.2 TensorFlow Data Validation 
ℹ️ TensorFlow Data Validation (TFDV) is a library provided by TensorFlow that can be used in MLOps to validate and analyze large datasets before they are used to train machine learning models. 

✅ By using TFDV, data scientists and engineers can detect and correct data quality issues early on, which can help to improve the performance and accuracy of the models. Additionally, it can also be used to detect data drift, monitor data distribution changes and detect data skew, which can help to ensure that the models continue to perform well in production.
The library provides tools for:

- Schema Generation: It creates a schema for a dataset based on the data types and statistics of the features.
- Schema Comparison: It can compare two schemas and identify differences between them, which can help to detect changes in the data distribution.
- Data Validation: It can check if the data conforms to the schema, including checking for missing values, data types, and feature ranges, and can also detect and notify about data outliers.
- Data Statistics: It can generate statistics for the data, such as mean, standard deviation, and histograms, which can be used to visualize the data and identify potential problems.
- Anomaly Detection: It can detect anomalies in the data by comparing the statistics of the data to the statistics of a previous version of the data.

🖥️ To use TFDV in Google Colab, you will need to first install the TensorFlow Data Validation library. We can do this by running the following command in a Google Colab notebook cell:
> Hint: TFDV doesn't support Apple silicon currently (Jan 2023)

In [None]:
!pip install tensorflow_data_validation

Once the library is installed, we can start using it to validate and analyze our data.

In [None]:
import tensorflow as tf
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import schema_pb2

print('TFDV Version: {}'.format(tfdv.__version__))
print('Tensorflow Version: {}'.format(tf.__version__))

TFDV Version: 1.12.0
Tensorflow Version: 2.11.0


## 1.3 Load the Dataset


The dataset we used is about MOOC dropout prediction, which includes followed features:
- user_log_num
- course_log_num
- take_course_num
- take_user_num
- log_num
- server_nagivate
- server_access
- server_problem
- browser_access
- browser_problem
- browser_page_close
- browser_video
- server_discussion
- server_wiki
- chapter_count
- sequential_count
- video_count
- problem_count
- discussion_count
- taked_course_chapter_amount
- taked_course_video_amount
- taked_course_discussion_amount

In [None]:
import pandas as pd

data = pd.read_csv('Mooc.csv',index_col = 'Unnamed: 0')
data.head()

Unnamed: 0,user_log_num,course_log_num,take_course_num,take_user_num,log_num,server_nagivate,server_access,server_problem,browser_access,browser_problem,...,server_wiki,chapter_count,sequential_count,video_count,problem_count,discussion_count,taked_course_chapter_amount,taked_course_video_amount,taked_course_discussion_amount,Success
0,1029,878119,6,10322,314,25,86,8,21,79,...,0,13,80,29,87,0,13,56,0,0
1,839,148500,3,1481,288,14,45,3,34,135,...,0,9,60,9,138,26,12,113,129,0
2,289,878119,2,10322,99,15,64,1,0,5,...,0,11,41,4,6,0,13,56,0,0
3,633,148500,1,1481,633,30,106,32,120,138,...,0,14,197,82,170,34,12,113,129,0
4,23,334927,1,7775,23,5,12,0,0,2,...,0,1,9,2,2,0,14,57,0,0


## 1.4 Split the Dataset
In order to ensure the models are reliable and accurate, we split our dataset into 3 parts: Training (60%), Validation (20%) and Serving (20%).

In [None]:
from sklearn.model_selection import train_test_split

train_data, val_data = train_test_split(data, test_size=0.2, random_state = 0)
train_data, serve_data = train_test_split(train_data, test_size=0.25, random_state = 0)
train_data = train_data.reset_index(drop=True)
val_data = val_data.reset_index(drop=True)
serve_data = serve_data.drop(['Success'], axis=1).reset_index(drop=True)
print('Training dataset has {} records\nValidation dataset has {} records\nServing dataset has {} records'.format(len(train_data),len(val_data),len(serve_data)))

Training dataset has 57860 records
Validation dataset has 19287 records
Serving dataset has 19287 records


## 1.5 Generate Training Data Statistics

There are two ways to generate training data statistics:

### 1.5.1 From Pandas.DataFrame


In [None]:
# feature_allowlist indicates which feature could be calculated for statistics.
stats_options = tfdv.StatsOptions(feature_allowlist = list (data.columns))

# Review allowed features
for feature in stats_options.feature_allowlist:
    print(feature)

user_log_num
course_log_num
take_course_num
take_user_num
log_num
server_nagivate
server_access
server_problem
browser_access
browser_problem
browser_page_close
browser_video
server_discussion
server_wiki
chapter_count
sequential_count
video_count
problem_count
discussion_count
taked_course_chapter_amount
taked_course_video_amount
taked_course_discussion_amount
Success


### 1.5.2 From CSV file

In [None]:
stats_csv = tfdv.generate_statistics_from_csv(data_location='Mooc.csv', delimiter=',')



Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


## 1.6 Visualize Training Data Statistics

In [None]:
tfdv.visualize_statistics(train_stats)
# tfdv.visualize_statistics(stats_csv)

## 1.7 Generate Training Data Schema 

ℹ️ A data schema is a blueprint or a set of rules that defines the structure, organization, and format of a dataset. It describes the types of data that are stored in the dataset, the relationships between different data elements, and the constraints on the data values.

✅ TFDV provides a method called `generate_schema()` that can automatically create a schema based on the data types and statistics of the features in a dataset. The schema is represented as a Schema object, which can be used to validate and analyze the data. For example, by comparing the statistics and schema of different versions of the data to detect data drift, or by comparing the statistics of different classes to detect data skew.

❌ Keep in mind that, schema generation is not a replacement for traditional data cleaning and preprocessing steps, but it can be a useful tool for identifying data quality issues, and for monitoring the data pipeline in MLops.

In [None]:
train_schema = tfdv.infer_schema(statistics=train_stats)
tfdv.display_schema(train_schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'user_log_num',INT,required,,-
'course_log_num',INT,required,,-
'take_course_num',INT,required,,-
'take_user_num',INT,required,,-
'log_num',INT,required,,-
'server_nagivate',INT,required,,-
'server_access',INT,required,,-
'server_problem',INT,required,,-
'browser_access',INT,required,,-
'browser_problem',INT,required,,-


## 1.8 Recognize and Fix DataData Problems

### 1.8.1 Compare Datasets

ℹ️ Schema skew refers to a situation where the schema of a dataset changes over time, leading to inconsistencies or inaccuracies in the data. This can happen for various reasons, such as changes in data sources, data processing pipelines, or data storage systems.

❌ Schema skew can cause a variety of problems in data processing and analysis, such as:

1. Inconsistencies in the data that can lead to incorrect or incomplete results.
1. Difficulty in understanding the data, as the schema is not consistent across the dataset.
1. Difficulty in integrating and merging data from different sources, as the schema is not consistent across the dataset.
1. Difficulty in implementing data governance and data quality control, as the schema is not consistent across the dataset.

✅  It's important to keep monitoring the data pipeline, validate the data against the schema, and update the schema accordingly. This can be done by using tools such as TensorFlow Data Validation (TFDV) which can detect and notify about schema skew by comparing the statistics and schema of different versions of the data.

In [None]:
train_stats = tfdv.generate_statistics_from_dataframe(train_data)
print(f"First feature of Train: {train_stats.datasets[0].features[0].path.step[0]}")
print(f"Last feature of Train: {train_stats.datasets[0].features[-1].path.step[0]}")
print(f"Number of features used: {len(train_stats.datasets[0].features)}")
print(f"Number of examples used: {train_stats.datasets[0].num_examples}\n")



val_stats = tfdv.generate_statistics_from_dataframe(val_data, stats_options)
print(f"First feature of Val: {val_stats.datasets[0].features[0].path.step[0]}")
print(f"Last feature of Val: {val_stats.datasets[0].features[-1].path.step[0]}")
print(f"Number of features used: {len(val_stats.datasets[0].features)}")
print(f"Number of examples used: {val_stats.datasets[0].num_examples}")

test_stats = tfdv.generate_statistics_from_dataframe(serve_data, stats_options)

First feature of Train: user_log_num
Last feature of Train: Success
Number of features used: 23
Number of examples used: 57860

First feature of Val: user_log_num
Last feature of Val: Success
Number of features used: 23
Number of examples used: 19287


In [None]:
# Generate evaluation dataset statistics
val_info = tfdv.generate_statistics_from_dataframe(val_data)

# Compare training with evaluation
tfdv.visualize_statistics(
    rhs_statistics=train_stats,rhs_name='TRAIN',
    lhs_statistics=val_stats, lhs_name='VAL') 

### 1.8.2 Validation anomalies

In [None]:
# Anomalies Detection and Visualization
outlier = tfdv.validate_statistics(statistics=val_stats, schema=train_schema)
tfdv.display_anomalies(outlier)

We'd like to see `No anomalies found.` after anomalies detection. Otherwise, we need to fix validation anomalies in the schema. For example:

  > - `Feature name: Skills`
  > - `Anomaly short description: Unexpected string values`
  > - `Unexpected string values: Examples contain values missing from the schema: Accounting (<1%).` 

which indicates validation data has records with `Accounting` for the features `Skills` that were not included in the schema generated from the training data. 

We can fix this by adding the new values that exist in the validation dataset to the domain of these features.

```Python
skills_domain = tfdv.get_domain(train_schema, 'Skills') 
skills_domain.value.append("Accounting")

calculate_and_display_anomalies(val_stats, schema=train_schema)
```

### 1.8.3 Serving anomalies
Likewise, we'd like to see `No anomalies found.` after anomalies detection.


In [None]:
options = tfdv.StatsOptions(schema=train_schema, 
                            infer_type_from_schema=True, 
                            feature_allowlist=approved_cols)

serve_stats = tfdv.generate_statistics_from_dataframe(serve_data, stats_options=options)
outlier = tfdv.validate_statistics(statistics=serve_stats, schema=train_schema)
tfdv.display_anomalies(outlier)

Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'Success',Column dropped,Column is completely missing


As Anomaly short description shows, we dropped 'Success'/labels column from serving dataset. Hence, we can let TFDV to ignore the anomaly.

In [None]:
# All features are by default in both TRAINING and SERVING environments.
train_schema.default_environment.append('TRAINING')
train_schema.default_environment.append('SERVING')

tfdv.get_feature(train_schema, 'Success').not_in_environment.append('SERVING')

serving_anomalies_with_env = tfdv.validate_statistics(serve_stats, train_schema, environment='SERVING')

tfdv.display_anomalies(serving_anomalies_with_env)

There is a chance to see followed example:

  > - `Feature name: Skills`
  > - `Anomaly short description: Unexpected string values`
  > - `Unexpected string values: Examples contain values missing from the schema: Accounting (<1%).` 

This is because we have a restrict detection constraints. Usually we can solve the error by relaxing the rule.

```Python
# relax to match 90% of the domain
skills = tfdv.get_feature(schema=train_schema, 'Skills')
skills.distribution_constraints.min_domain_mass = 0.9 

calculate_and_display_anomalies(serving_stats, schema=train_schema)
```

### 1.8.4 Data Drift and Skew

1. Data Drift

  ℹ️ Data drift occurs when the distribution of the data used to train a machine learning model differs from the distribution of the data that the model encounters in production. 

  ❌ This can lead to a decrease in model performance and accuracy. 

  🤔 Data drift can be caused by changes in the data generating process or by the accumulation of stale data.

1. Data Skew

  ℹ️ Data skew occurs when the data used to train a machine learning model is not evenly distributed among the different classes or categories. 
  
  ❌ This can lead to a bias in the model towards the more frequent classes, resulting in poor performance on less frequent classes. 
  
  🤔 Data skew can be caused by imbalanced data collection or by data preprocessing steps that do not properly balance the data.

In [None]:
# Calculate skew for the course_log_num feature
math_score = tfdv.get_feature(train_schema, 'course_log_num')
math_score.skew_comparator.infinity_norm.threshold = 0.01 # domain knowledge helps to determine this threshold

# Calculate drift for the taked_course_video_amount feature
reading_score = tfdv.get_feature(train_schema, 'taked_course_video_amount')
reading_score.drift_comparator.infinity_norm.threshold = 0.01 # domain knowledge helps to determine this threshold

# Calculate anomalies
skew_drift_anomalies = tfdv.validate_statistics(train_stats, train_schema,
                                               previous_statistics=val_stats,
                                                serving_statistics=serve_stats)

# Display anomalies
tfdv.display_anomalies(skew_drift_anomalies)

✅ Ideally, we could see `No anomalies found.` as aboved. 

❌ Otherwise, we could know how the Linfty distance between training and serving is, and decide whether accept a new  threshold or not. If the anomaly truly indicates a skew and drift, then further investigation is necessary as this could have a direct impact on model performance.

### 1.8.5 Data Slices

In [None]:
def split_datasets(dataset_list):
    '''
    split datasets.

            Parameters:
                    dataset_list: List of datasets to split

            Returns:
                    datasets: sliced data
    '''
    datasets = []
    for dataset in dataset_list.datasets:
        proto_list = DatasetFeatureStatisticsList()
        proto_list.datasets.extend([dataset])
        datasets.append(proto_list)
    return datasets


def display_stats_at_index(index, datasets):
    '''
    display statistics at the specified data index

            Parameters:
                    index : index to show the anomalies
                    datasets: split data

            Returns:
                    display of generated sliced data statistics at the specified index
    '''
    if index < len(datasets):
        print(datasets[index].datasets[0].name)
        tfdv.visualize_statistics(datasets[index])

In [None]:
def sliced_stats_for_slice_fn(slice_fn, approved_cols, dataframe, schema):
    '''
    generate statistics for the sliced data.

            Parameters:
                    slice_fn : slicing definition
                    approved_cols: list of features to pass to the statistics options
                    dataframe: pandas dataframe to slice
                    schema: the schema

            Returns:
                    slice_info_datasets: statistics for the sliced dataset
    '''
    # Set the StatsOptions
    slice_stats_options = tfdv.StatsOptions(schema=schema,
                                            slice_functions=[slice_fn],
                                            infer_type_from_schema=True,
                                            feature_allowlist=approved_cols)
    
    # Convert Dataframe to CSV since `slice_functions` works only with `tfdv.generate_statistics_from_csv`
    CSV_PATH = 'slice_sample.csv'
    dataframe.to_csv(CSV_PATH)
    
    # Calculate statistics for the sliced dataset
    sliced_stats = tfdv.generate_statistics_from_csv(CSV_PATH, stats_options=slice_stats_options)
    
    # Split the dataset using the previously defined split_datasets function
    slice_info_datasets = split_datasets(sliced_stats)
    
    return slice_info_datasets

In [None]:

slice_options = tfdv.StatsOptions(slice_functions=[slice_fn])

slice_stats = tfdv.generate_statistics_from_csv(

data_location='slice_sample.csv', stats_options=slice_options)

In [None]:
from tensorflow_metadata.proto.v0 import statistics_pb2 


def display_slice_keys(stats):
  print(list(map(lambda x: x.name, slice_stats.datasets))) 
  
def get_sliced_stats(stats, slice_key):
  for sliced_stats in stats.datasets:
    if sliced_stats.name == slice_key:

      result = statistics_pb2.DatasetFeatureStatisticsList() 
      result.datasets.add().CopyFrom(sliced_stats) 
      return result

    print('Invalid Slice key')

def compare_slices(stats, slice_key1, slice_key2):

  lhs_stats = get_sliced_stats(stats, slice_key1) 
  rhs_stats = get_sliced_stats(stats, slice_key2) 
  tfdv.visualize_statistics(lhs_stats, rhs_stats)

In [None]:

tfdv.visualize_statistics(get_sliced_stats(slice_stats, 'chapter_count_0'))

compare_slices(slice_stats, 'chapter_count_0', 'All Examples')

Invalid Slice key


Invalid Slice key


In [None]:
from tensorflow_data_validation.utils import slicing_util
from tensorflow_metadata.proto.v0.statistics_pb2 import DatasetFeatureStatisticsList, DatasetFeatureStatistics


# Generate slice function for the `medical_speciality` feature
slice_fn = slicing_util.get_feature_value_slicer(features={'chapter_count': None})

# Generate stats for the sliced dataset
slice_datasets = sliced_stats_for_slice_fn(slice_fn, approved_cols, dataframe=train_data, schema=train_schema)

# Print name of slices for reference
print(f'Statistics generated for:\n')
print('\n'.join([sliced.datasets[0].name for sliced in slice_datasets]))

Statistics generated for:

All Examples
chapter_count_0
chapter_count_1
chapter_count_24
chapter_count_2
chapter_count_26
chapter_count_12
chapter_count_3
chapter_count_13
chapter_count_16
chapter_count_8
chapter_count_11
chapter_count_7
chapter_count_4
chapter_count_9
chapter_count_18
chapter_count_6
chapter_count_5
chapter_count_10
chapter_count_19
chapter_count_22
chapter_count_36
chapter_count_15
chapter_count_14
chapter_count_27
chapter_count_23
chapter_count_43
chapter_count_30
chapter_count_28
chapter_count_17
chapter_count_47
chapter_count_35
chapter_count_25
chapter_count_21
chapter_count_44
chapter_count_20
chapter_count_37
chapter_count_32
chapter_count_34
chapter_count_58
chapter_count_33
chapter_count_29
chapter_count_104
chapter_count_132
chapter_count_41
chapter_count_38
chapter_count_62
chapter_count_53
chapter_count_42
chapter_count_31
chapter_count_40
chapter_count_54
chapter_count_48
chapter_count_78
chapter_count_52
chapter_count_71
chapter_count_56
chapter_count_49

In [None]:
# Display at index 0, which corresponds to the slice named `chapter_count_0`
display_stats_at_index(20, slice_datasets) 

chapter_count_22
