##### Copyright 2020 Google Inc.

Licensed under the Apache License, Version 2.0 (the "License").
<!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
-->


# Analyzing AI Platform Prediction Request Response Log using TensorFlow Data Validation

In [39]:
import datetime
import json
import os

import numpy as np
import pandas as pd
import seaborn
import tensorflow_data_validation as tfdv
import tensorflow as tf

from datetime import datetime
from google.cloud import bigquery
from jinja2 import Template
from tensorflow_metadata.proto.v0 import statistics_pb2
from tensorflow_data_validation.utils.stats_util import get_feature_stats
from tensorflow_data_validation.utils import slicing_util
from tensorflow_data_validation import FeaturePath

from typing import List, Optional, Text, Union, Dict, Iterable, Mapping

## Configure environment settings

In [40]:
project_id = 'mlops-dev-env'
response_request_log_table = 'data_validation.covertype_classifier_logs_tf'
local_workspace = '/home/jarekk/workspace/analysis'
local_tfrecords_file = '{}/log_records.tfrecords'.format(local_workspace)

In [48]:
if tf.io.gfile.exists(local_workspace):
  print("Removing previous workspace artifacts...")
  tf.io.gfile.rmtree(local_workspace)

print("Creating a new workspace...")
tf.io.gfile.makedirs(local_workspace)


Removing previous workspace artifacts...
Creating a new workspace...


### Retrieve log records from BigQuey and convert them to a TFRecords file

Although TFDV provides a function to calculate statistics on a Pandas dataframe, the function does not support slicing. To mitigate, we will convert the log records to TFRecords.

#### Prepare and run a sampling query

In [50]:
def generate_query(table_name, start_time, end_time):

  sampling_query_template = """
       SELECT *
       FROM 
           `{{ source_table }}`
       WHERE time BETWEEN '{{ start_time }}' AND '{{ end_time }}'
       """
  
  query = Template(sampling_query_template).render(
      source_table=table_name, start_time=start_time, end_time=end_time)

  return query

start_time = '2020-05-14T00:00:00'
end_time = '2020-05-21T00:00:00'

query = generate_query(response_request_log_table, start_time, end_time)

client = bigquery.Client()
query_job = client.query(query)
rows = query_job.result()

#### Convert the results to TFRecords

In [51]:
def serialize_example(feature_dict: Dict) -> tf.train.Example:
  example = tf.train.Example()

  for name, values in feature_dict.items():
    feature = example.features.feature[name]
    if isinstance(values[0], str):
      values = [value.encode() for value in values]
      add = feature.bytes_list.value.extend
    elif isinstance(values[0], float):
      add = feature.float32_list.value.extend
    elif isinstance(values[0], int):
      add = feature.int64_list.value.extend
    else:
      raise AssertionError('Unsupported type: %s' % type(values[0]))
    add(np.array(values))

  return example.SerializeToString()


with tf.io.TFRecordWriter(local_tfrecords_file) as tfrecord_writer:
    for row in rows:
        raw_data = json.loads(row['raw_data'])
        time_stamp = row['time'].date().isoformat()
        for instance in raw_data['instances']:
            feature_dict = ({column_name: value
                for column_name, value in instance.items()})
            feature_dict['time_window'] = [time_stamp]
            tfrecord_writer.write(serialize_example(feature_dict))


In [52]:
!ls -la {local_workspace}

total 10032
drwxr-xr-x 2 jarekk jarekk     4096 May 20 19:27 .
drwxr-xr-x 5 jarekk jarekk     4096 May 20 19:27 ..
-rw-r--r-- 1 jarekk jarekk 10261918 May 20 19:27 log_records.tfrecords


### Configure a slicing function

In [53]:
slice_fn = slicing_util.get_feature_value_slicer(features={'time_window': None})
stats_options = tfdv.StatsOptions(
    slice_functions=[slice_fn]
)

stats = tfdv.generate_statistics_from_tfrecord(
    data_location=local_tfrecords_file,
    stats_options=stats_options
)

Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


In [55]:
for dataset in stats.datasets:
    print(dataset.name)

All Examples
time_window_2020-05-20
time_window_2020-05-19
time_window_2020-05-17
time_window_2020-05-18
time_window_2020-05-16
time_window_2020-05-15


In [56]:
tfdv.visualize_statistics(stats)

ValueError: lhs_statistics proto contains multiple datasets. Only one dataset is currently supported.

In [None]:
raw_data_dict = json.loads(raw_data[0])
feature_dict = raw_data_dict['instances'][0]
columns

In [None]:
for row in df.iterrows():
    

In [None]:
statistics_proto_list = statistics_pb2.DatasetFeatureStatisticsList()
for folder in tf.io.gfile.listdir(drift_reports_path):
    statistics_proto = tfdv.load_statistics(
        '{}/{}{}'.format(drift_reports_path, folder, 'stats.pb'))
    new_stats_proto = statistics_proto_list.datasets.add()
    new_stats_proto.CopyFrom(statistics_proto.datasets[0])
    new_stats_proto.name = folder[:-1]

tfdv.write_stats_text(statistics_proto_list, aggregated_statistics_path)

In [None]:
class StatsSeries():
    def __init__(self, stats_proto_list):
        self._stats_proto_list = stats_proto_list

    def get_feature_means(self, feature_index):
        list_of_means = {
             dataset.name[0:16]: dataset.features[feature_index].num_stats.mean
             for dataset in self._stats_proto_list.datasets}
        
        return list_of_means


In [None]:
aggregated_statistics = tfdv.load_stats_text(aggregated_statistics_path)
stats_series = StatsSeries(aggregated_statistics)

In [None]:
feature_mean_series = stats_series.get_feature_means(0)
feature_mean_series.keys()

In [None]:
seaborn.barplot(x=list(feature_mean_series.keys()), y=list(feature_mean_series.values()), color='blue')

In [None]:
OUTPUT_PATH = 'gs://mlops-dev-workspace/drift_monitor/output/tf'
 
_STATS_FILENAME='stats.pb'
_ANOMALIES_FILENAME='anomalies.pbtxt'

stats_output_path = os.path.join(OUTPUT_PATH, _STATS_FILENAME)
anomalies_output_path = os.path.join(OUTPUT_PATH, _ANOMALIES_FILENAME)

In [None]:
stats = tfdv.load_statistics(stats_output_path)

In [None]:
tfdv.visualize_statistics(stats)

In [None]:
anomalies = tfdv.load_anomalies_text(anomalies_output_path)

In [None]:
tfdv.display_anomalies(anomalies)