In [None]:
!pip install tensorflow_data_validation

In [1]:
import pandas as pd 
# load the dataset
df = pd.read_csv('../data/marketing_campaign.csv', sep= '\t')
df.head()

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,...,NumWebVisitsMonth,AcceptedCmp3,AcceptedCmp4,AcceptedCmp5,AcceptedCmp1,AcceptedCmp2,Complain,Z_CostContact,Z_Revenue,Response
0,5524,1957,Graduation,Single,58138.0,0,0,04-09-2012,58,635,...,7,0,0,0,0,0,0,3,11,1
1,2174,1954,Graduation,Single,46344.0,1,1,08-03-2014,38,11,...,5,0,0,0,0,0,0,3,11,0
2,4141,1965,Graduation,Together,71613.0,0,0,21-08-2013,26,426,...,4,0,0,0,0,0,0,3,11,0
3,6182,1984,Graduation,Together,26646.0,1,0,10-02-2014,26,11,...,6,0,0,0,0,0,0,3,11,0
4,5324,1981,PhD,Married,58293.0,1,0,19-01-2014,94,173,...,5,0,0,0,0,0,0,3,11,0


In [2]:
cols = ['Year_Birth', 'Education', 'Marital_Status', 'Income', 'Kidhome','Dt_Customer', 'Recency', 'MntWines', 'MntFruits',
       'MntMeatProducts', 'MntFishProducts', 'MntSweetProducts',
       'MntGoldProds', 'NumDealsPurchases', 'NumWebPurchases',
       'NumCatalogPurchases', 'NumStorePurchases', 'NumWebVisitsMonth']
# Keep only the relevant columns we want to explore
df = df[cols]

In [3]:
df.head()

Unnamed: 0,Year_Birth,Education,Marital_Status,Income,Kidhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth
0,1957,Graduation,Single,58138.0,0,04-09-2012,58,635,88,546,172,88,88,3,8,10,4,7
1,1954,Graduation,Single,46344.0,1,08-03-2014,38,11,1,6,2,1,6,2,1,1,2,5
2,1965,Graduation,Together,71613.0,0,21-08-2013,26,426,49,127,111,21,42,1,8,2,10,4
3,1984,Graduation,Together,26646.0,1,10-02-2014,26,11,4,20,10,3,5,2,2,0,4,6
4,1981,PhD,Married,58293.0,1,19-01-2014,94,173,43,118,46,27,15,5,5,3,6,5


In [4]:
df['Dt_Customer'] = pd.to_datetime(df['Dt_Customer'], format="%d-%m-%Y")

# get the minimum and maximum dates in your date column
min_date = df['Dt_Customer'].min()
max_date = df['Dt_Customer'].max()

# print out the range of dates
print('Date Range: {} - {}'.format(min_date, max_date))

Date Range: 2012-07-30 00:00:00 - 2014-06-29 00:00:00


In [5]:

# set the date column as the index
df.set_index('Dt_Customer', inplace=True)
# group the DataFrame by yearly frequency
years = df.groupby(pd.Grouper(freq='Y'))
# print out the data for each quarter
datasets = []
for i, (year, data) in enumerate(years):
    datasets.append(data)

In [6]:
# get the 2012 dataset and drop the index
early_data = datasets[0]
early_data = early_data.reset_index(drop=True)
# get the 2014 dataset and drop the index
latest_data = datasets[-1]
latest_data = latest_data.reset_index(drop=True)

In [7]:
import random 
def future_marital_status(v):
    """
      Increase the number of married customers by 20%
    """
    if v == "Married":
        return v
    else:
        coin_toss = random.uniform(0,1)
        if coin_toss > 0.2:
            return v
        return 'Married'

# increase income by 20-30% for 2014 costumers
latest_data['Income'] = latest_data['Income'].apply(lambda v : v * (1 + random.uniform(.2, .3)))
# increase the number of married costumers in 2014 by 20%
latest_data['Marital_Status'] = latest_data['Marital_Status'].apply(lambda v : future_marital_status(v))

In [8]:
import tensorflow as tf
import tensorflow_data_validation as tfdv
import pandas as pd
# generate statistics for 2012 
early_data_stats = tfdv.generate_statistics_from_dataframe(early_data)
# generate statistics for 2014
latest_data_stats = tfdv.generate_statistics_from_dataframe(latest_data)
# visualize statistics
tfdv.visualize_statistics(early_data_stats)

In [9]:
# infer schema from the statistics 
early_data_schema = tfdv.infer_schema(statistics=early_data_stats)
# display the schema
tfdv.display_schema(schema=early_data_schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'Year_Birth',INT,required,,-
'Education',STRING,required,,'Education'
'Marital_Status',STRING,required,,'Marital_Status'
'Income',FLOAT,optional,single,-
'Kidhome',INT,required,,-
'Recency',INT,required,,-
'MntWines',INT,required,,-
'MntFruits',INT,required,,-
'MntMeatProducts',INT,required,,-
'MntFishProducts',INT,required,,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'Education',"'2n Cycle', 'Basic', 'Graduation', 'Master', 'PhD'"
'Marital_Status',"'Absurd', 'Divorced', 'Married', 'Single', 'Together', 'Widow', 'YOLO'"


In [10]:
from tensorflow_metadata.proto.v0 import schema_pb2
# add range contraint to the year birth feature 
tfdv.set_domain(early_data_schema, 'Year_Birth', schema_pb2.IntDomain(name="Year_Birth", min=1900, max=2015))
# make income feature required
tfdv.get_feature(early_data_schema, "Income").presence.min_fraction = 1
# dispaly the new schema
tfdv.display_schema(schema=early_data_schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'Year_Birth',INT,required,,min: 1900; max: 2015
'Education',STRING,required,,'Education'
'Marital_Status',STRING,required,,'Marital_Status'
'Income',FLOAT,required,single,-
'Kidhome',INT,required,,-
'Recency',INT,required,,-
'MntWines',INT,required,,-
'MntFruits',INT,required,,-
'MntMeatProducts',INT,required,,-
'MntFishProducts',INT,required,,-


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'Education',"'2n Cycle', 'Basic', 'Graduation', 'Master', 'PhD'"
'Marital_Status',"'Absurd', 'Divorced', 'Married', 'Single', 'Together', 'Widow', 'YOLO'"


In [11]:
income = tfdv.get_feature(early_data_schema, 'Income')
income.drift_comparator.jensen_shannon_divergence.threshold = 0.1

martial_status = tfdv.get_feature(early_data_schema, 'Marital_Status')
martial_status.drift_comparator.infinity_norm.threshold = 0.1
martial_status.distribution_constraints.min_domain_mass = 0.9

In [12]:
# Overlay both statistics on top of each other.
tfdv.visualize_statistics(lhs_statistics=early_data_stats, rhs_statistics=latest_data_stats,
                          lhs_name='2012', rhs_name='2014')

In [13]:
drift_anomalies = tfdv.validate_statistics(latest_data_stats, schema=early_data_schema, previous_statistics= early_data_stats)
tfdv.display_anomalies(drift_anomalies)

Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'Year_Birth',Out-of-range values,Unexpectedly small value: 1893.
'Income',Multiple errors,"The feature was present in fewer examples than expected: minimum fraction = 1.000000, actual = 0.992819 The approximate Jensen-Shannon divergence between current and previous is 0.126524 (up to six significant digits), above the threshold 0.1."
'Marital_Status',High Linfty distance between current and previous,"The Linfty distance between current and previous is 0.13626 (up to six significant digits), above the threshold 0.1. The feature value with maximum difference is: Married"


In [14]:
from tensorflow.python.lib.io import file_io
from google.protobuf import text_format
import os 

schema_path = "./schema.txtpb"
stats_path = "./data_stats.txtpb"
# save schema to file 
tfdv.write_schema_text(early_data_schema, schema_path)
# save statistics to file
tfdv.write_stats_text(early_data_stats, stats_path)

In [15]:
def detect_data_drift(new_data: pd.DataFrame, schema_path:str, previous_stats_path: str) -> bool:
    """
     compare new data statistics with baseline data.
    """

    previous_stats = tfdv.load_stats_text(previous_stats_path)
    schema = tfdv.load_schema_text(schema_path)
    options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
    new_stats = tfdv.generate_statistics_from_dataframe(new_data, stats_options=options)
    drift_anomalies = tfdv.validate_statistics(new_stats, schema=schema, previous_statistics=previous_stats)
    drift_detected = len(drift_anomalies.anomaly_info) > 0

    return drift_detected, drift_anomalies

In [16]:
drift_detected, drift_anomalies = detect_data_drift(latest_data, './schema.txtpb', './data_stats.txtpb')
if drift_detected:
  print("stop pipeline")
else:
  print("proceed")

stop pipeline
