# Generate schema on training data

In [1]:
from pathlib import Path
from pprint import pprint

from validation import TrainDataset
from validation import ServeDataset
from validation import Validator
from validation.enums import ConstraintType

from google.cloud import storage

2021-12-07 20:01:28.243379: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-12-07 20:01:28.243395: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


# Load common parameters

In [2]:
PROJECT = "mightyhive-data-science-poc"
SERVICE_PATH = Path.home().joinpath(".ssh", "mightyhive.json")
BUCKET_NAME = "data-drift-detection"
STATS_FILENAME = "stats.txt"
SCHEMA_FILENAME = "schema.txt"

# Initialize TrainDataset class

In [3]:
train = TrainDataset.from_bigquery(
    f"""
    SELECT
      datetime,
      temp,
      atemp,
      humidity,
      windspeed,
      season,
      holiday,
      workingday,
      EXTRACT(DAYOFWEEK FROM datetime) AS weekday,
      EXTRACT(HOUR FROM datetime) AS hour,
    FROM
      `mightyhive-data-science-poc.data_drift_demo.bike_sharing`
    WHERE DATE(datetime) BETWEEN "2011-01-01" and "2011-01-28"
    """
)

[DEBUG] Looking for cache in GCS
[DEBUG] Cache not found. Running query...
[INFO] Estimated cost is 0.0006 G
[DEBUG] Exporting result to GCS tmp folder.


# Modify autogenerated schema

In [4]:
train.show_schema()

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'temp',FLOAT,required,,-
'atemp',FLOAT,required,,-
'humidity',INT,required,,-
'windspeed',FLOAT,required,,-
'season',INT,required,,-
'holiday',INT,required,,-
'workingday',INT,required,,-
'weekday',INT,required,,-
'hour',INT,required,,-


**schema related modification**

In [5]:
constraints = [
    {"feature": "season",     "kind": ConstraintType.is_categorical, "value": True},
    {"feature": "holiday",    "kind": ConstraintType.is_categorical, "value": True},
    {"feature": "workingday", "kind": ConstraintType.is_categorical, "value": True},
    {"feature": "weekday",    "kind": ConstraintType.is_categorical, "value": True},
    {"feature": "hour",       "kind": ConstraintType.is_categorical, "value": True},
    {"feature": "temp",       "kind": ConstraintType.max,            "value": 50},
    {"feature": "humidity",   "kind": ConstraintType.max,            "value": 100},
]

for constraint in constraints:
    train.add_schema_constraint(**constraint)

**Data drift related modification**

In [6]:
constraints = [
    {"feature": "temp",      "kind": ConstraintType.numerical_drift_threshold, "value": 0.15},
    {"feature": "windspeed", "kind": ConstraintType.numerical_drift_threshold, "value": 0.05},
]

for constraint in constraints:
    train.add_schema_constraint(**constraint)

# Save modified schema and stats in GCS for future use

In [7]:
train.save_schema(SCHEMA_FILENAME)
train.save_stats(STATS_FILENAME)

In [7]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(f"File {source_file_name} uploaded to {destination_blob_name}.")
    
upload_blob(BUCKET_NAME, STATS_FILENAME, STATS_FILENAME)
upload_blob(BUCKET_NAME, SCHEMA_FILENAME, SCHEMA_FILENAME)

File stats.txt uploaded to stats.txt.
File schema.txt uploaded to schema.txt.


# Initialize serving dataset (for testing)

In [8]:
serve = ServeDataset.from_bigquery(
    f"""
    SELECT
      count,
      datetime,
      temp,
      atemp,
      humidity,
      windspeed,
      season,
      holiday,
      workingday,
      EXTRACT(DAYOFWEEK FROM datetime) AS weekday,
      EXTRACT(HOUR FROM datetime) AS hour,
    FROM
      `mightyhive-data-science-poc.data_drift_demo.bike_sharing`
    WHERE DATE(datetime) BETWEEN "2011-01-29" and "2011-02-25"
    """
)

[DEBUG] Looking for cache in GCS
[DEBUG] Cache not found. Running query...
[INFO] Estimated cost is 0.0007 G
[DEBUG] Exporting result to GCS tmp folder.


# Initialize validator class

In [9]:
validator = Validator(train, serve)

In [10]:
validator.show_stats()

In [11]:
_ = validator.validate_schema(visual=True)

Unnamed: 0_level_0,Anomaly short description,Anomaly long description
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1
'count',New column,New column (column in data but not in schema)


In [12]:
result = validator.detect_drift()

In [15]:
for info in result["driftSkewInfo"]:
    pprint(info)

{'path': {'step': ['temp']},
 'skewMeasurements': [{'threshold': 0.15,
                       'type': 'JENSEN_SHANNON_DIVERGENCE',
                       'value': 0.13301297312630075}]}
{'path': {'step': ['windspeed']},
 'skewMeasurements': [{'threshold': 0.05,
                       'type': 'JENSEN_SHANNON_DIVERGENCE',
                       'value': 0.03758136452220929}]}


In [14]:
 for info in result["driftSkewInfo"]:
    feature_name = info["path"]["step"][0]
    is_drifted = info["skewMeasurements"][0]["threshold"] < info["skewMeasurements"][0]["value"]
    print(f"feature {feature_name} drfited: {is_drifted}")

feature temp drfited: False
feature windspeed drfited: False
