In [39]:
import numpy as np
import pandas as pd
import glob
import os
import matplotlib.pyplot as plt
from data_quality import *
import tensorflow as tf
import tensorflow_data_validation as tfdv


import warnings
from tensorflow_metadata.proto.v0 import schema_pb2
warnings.simplefilter('ignore')

In [40]:
clean_flights, dirty_flights = partition_data_files('tmp/FLIGHTS/*.csv')
clean_fb, dirty_fb = partition_data_files('tmp/FBPosts/*.csv')

In [41]:
'''
@description: this is the same function from assignment2; infers schema from a given csv file
@params: csv_file - string csv filepath to read
         column_names - string list, names of the columns along the csv header
@return: schema - inferred schema
'''
def infer_schema_from_csv(csv_file, column_names):
    data_stats = tfdv.generate_statistics_from_csv(data_location=csv_file, column_names=column_names)
    
    #tfdv.visualize_statistics(data_stats)
    schema = tfdv.infer_schema(statistics=data_stats)
    
    return schema 

'''
@description: same function from assignment 2; determines whether the new csv file 
              has anomalies against the schema
@params: csv_file - new csv filepath to compare against schema
         schema - schema against which to compare the new csv file
@return: True - if there are anomalies
          False - if there are no anomalies
'''
def has_anomalies(csv_file, schema):
    #get column names from passed in schema
    cols = [f.name for f in schema.feature]
    data_stats = tfdv.generate_statistics_from_csv(data_location=csv_file, column_names=cols)

    # Check eval data for errors by validating the eval data stats using the previously inferred schema
    anomalies = tfdv.validate_statistics(statistics=data_stats, schema=schema)
    
    #tfdv.display_anomalies(anomalies)

    if len(anomalies.anomaly_info) > 0:
        return True
    else:
        return False

'''
@description: takes the current csv file and updates the passed in schema to include its values as "valid"
@params: csv_file - new csv file to add to the schema
         schema - schema to be updated
@return: updated_schema - new schema with anomalies of csv_file param added
'''
def update_schema(csv_file, schema):
    #get column names from passed in schema
    cols = [f.name for f in schema.feature]
    new_batch_stats = tfdv.generate_statistics_from_csv(data_location=csv_file, column_names=cols)

    # Check eval data for errors by validating the eval data stats using the previously inferred schema
    updated_schema = tfdv.update_schema(schema, new_batch_stats)
    #tfdv.display_schema(schema=updated_schema)

    return updated_schema

'''
@description: updates the schema for num_files number of files
@params: all_filenames - string list of all csv filenames
         num_files - the number of files we want to include in our "acceptable" schema
@return: updated_schema - new schema with anomalies of all num_files number of csv files added
'''
def acceptable_schema(all_filenames, num_files):
    files_to_update = all_filenames[0:num_files]
    successful = True
    
    for i, file in enumerate(files_to_update):
        print("Reading file: ", file)
        
        #for the first file read in, infer its schema
        if i == 0:
            columns = list(pd.read_csv(file, error_bad_lines=False).columns)
            schema = infer_schema_from_csv(file, columns)
        else:
            schema = update_schema(file, schema) #updated schema
            #sanity check to make sure this is working
            if has_anomalies(file, schema):
                print('Unsuccessful schema update on file number {}: {}'.format(i,file))
                successful = False
                break
                
    if successful:
        print("Schema successfully updated for all {} files".format(num_files))
        
    return updated_schema

In [38]:
updated_schema = acceptable_schema(clean_flights,5)

Reading file:  tmp/FLIGHTS/FLIGHTS_clean_734472.csv
Reading file:  tmp/FLIGHTS/FLIGHTS_clean_734473.csv
Reading file:  tmp/FLIGHTS/FLIGHTS_clean_734474.csv
Reading file:  tmp/FLIGHTS/FLIGHTS_clean_734475.csv
Reading file:  tmp/FLIGHTS/FLIGHTS_clean_734476.csv
Schema successfully updated for all 5 files
