# Processing and Data Validation

In this notebook, I will join the two datasets and create a schema. The schema is useful when analyzing future data because it will allow us to detect anomalies in the future input data.

Bellow, there is a picture of a typical ML pipeline. In this notebook, I will implement the data validation step using the open-source library `TensorFlow Data Validation`.

![title](ml_pipeline.png)

## 1. Setup and Imports

In [1]:
# Import packages
import tensorflow as tf
import pandas as pd
import tensorflow_data_validation as tfdv

# Set TF's logger to only display errors to avoid internal warnings being shown
tf.get_logger().setLevel('ERROR')

## 2. Load Intermediate Results

Load Intermediate Results from previus notebooks.

In [2]:
trip_data_2020_10 = pd.read_parquet('../intermediate_results/202010-citibike-tripdata-clean.parquet', engine='fastparquet')
weather_data_2020_10 = pd.read_parquet('../intermediate_results/202010-nyc-weather-data-clean.parquet', engine='fastparquet')

In [3]:
trip_data_2020_10.head(5)

Unnamed: 0,tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,end station longitude,bikeid,usertype,birth year,gender
0,1403,2020-10-01 00:00:10.787,2020-10-01 00:23:34.540,224,Spruce St & Nassau St,40.711464,-74.005524,3347,Van Brunt St & Wolcott St,40.677343,-74.012751,39079,Subscriber,1995,1
1,462,2020-10-01 00:00:18.239,2020-10-01 00:08:00.571,3161,W 76 St & Columbus Ave,40.780184,-73.977285,3158,W 63 St & Broadway,40.771639,-73.982614,41267,Subscriber,1998,2
2,770,2020-10-01 00:00:21.307,2020-10-01 00:13:12.084,354,Emerson Pl & Myrtle Ave,40.693631,-73.962236,3761,Cedar St & Myrtle Ave,40.697842,-73.926241,47398,Customer,1996,1
3,233,2020-10-01 00:00:25.008,2020-10-01 00:04:18.641,3141,1 Ave & E 68 St,40.765005,-73.958185,464,E 56 St & 3 Ave,40.759345,-73.967597,36482,Subscriber,1993,1
4,126,2020-10-01 00:00:25.310,2020-10-01 00:02:32.133,335,Washington Pl & Broadway,40.729039,-73.994046,229,Great Jones St,40.727434,-73.99379,45319,Subscriber,1989,1


In [4]:
weather_data_2020_10.head(5)

Unnamed: 0,datetime,tempmax,tempmin,temp,humidity,precip,snow,windgust,windspeed,visibility
0,2020-10-01,70.9,60.7,65.2,60.69,0.0,0.0,21.9,10.0,9.9
1,2020-10-02,65.6,55.9,60.6,63.46,0.03,0.0,19.7,9.2,9.7
2,2020-10-03,66.9,53.1,59.4,54.86,0.0,0.0,49.4,8.1,9.9
3,2020-10-04,66.0,53.3,59.4,61.91,0.0,0.0,28.141667,8.6,9.9
4,2020-10-05,68.0,54.2,60.8,67.12,0.0,0.0,28.141667,6.1,9.9


## 3. Join the two datasets

I will use `starttime` from the trip data and `datetime` from the wheater data to create a new column `date`. I will use the new `date` comuln to combine the datasets.

In [5]:
trip_data_2020_10["date"] = pd.to_datetime(trip_data_2020_10['starttime'], format='%Y-%m-%d').dt.date
weather_data_2020_10["date"] = pd.to_datetime(weather_data_2020_10['datetime'], format='%Y-%m-%d').dt.date

trip_weather_data_2020_10 = pd.merge(trip_data_2020_10, weather_data_2020_10, on='date')
trip_weather_data_2020_10.head(5)

Unnamed: 0,tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,...,datetime,tempmax,tempmin,temp,humidity,precip,snow,windgust,windspeed,visibility
0,1403,2020-10-01 00:00:10.787,2020-10-01 00:23:34.540,224,Spruce St & Nassau St,40.711464,-74.005524,3347,Van Brunt St & Wolcott St,40.677343,...,2020-10-01,70.9,60.7,65.2,60.69,0.0,0.0,21.9,10.0,9.9
1,462,2020-10-01 00:00:18.239,2020-10-01 00:08:00.571,3161,W 76 St & Columbus Ave,40.780184,-73.977285,3158,W 63 St & Broadway,40.771639,...,2020-10-01,70.9,60.7,65.2,60.69,0.0,0.0,21.9,10.0,9.9
2,770,2020-10-01 00:00:21.307,2020-10-01 00:13:12.084,354,Emerson Pl & Myrtle Ave,40.693631,-73.962236,3761,Cedar St & Myrtle Ave,40.697842,...,2020-10-01,70.9,60.7,65.2,60.69,0.0,0.0,21.9,10.0,9.9
3,233,2020-10-01 00:00:25.008,2020-10-01 00:04:18.641,3141,1 Ave & E 68 St,40.765005,-73.958185,464,E 56 St & 3 Ave,40.759345,...,2020-10-01,70.9,60.7,65.2,60.69,0.0,0.0,21.9,10.0,9.9
4,126,2020-10-01 00:00:25.310,2020-10-01 00:02:32.133,335,Washington Pl & Broadway,40.729039,-73.994046,229,Great Jones St,40.727434,...,2020-10-01,70.9,60.7,65.2,60.69,0.0,0.0,21.9,10.0,9.9


In [6]:
trip_weather_data_2020_10.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 2248869 entries, 0 to 2248868
Data columns (total 26 columns):
 #   Column                   Dtype         
---  ------                   -----         
 0   tripduration             int64         
 1   starttime                datetime64[ns]
 2   stoptime                 datetime64[ns]
 3   start station id         int64         
 4   start station name       category      
 5   start station latitude   float64       
 6   start station longitude  float64       
 7   end station id           int64         
 8   end station name         category      
 9   end station latitude     float64       
 10  end station longitude    float64       
 11  bikeid                   category      
 12  usertype                 category      
 13  birth year               int64         
 14  gender                   category      
 15  date                     object        
 16  datetime                 datetime64[ns]
 17  tempmax                  fl

## 4. Generate and Visualize  DataFrame Statistics

In [7]:
# Define features to remove
features_to_remove = {'date'}

# Collect features to whitelist while computing the statistics
approved_cols = [col for col in trip_weather_data_2020_10.columns if (col not in features_to_remove)]

# Instantiate a StatsOptions class and define the feature_allowlist property
stats_options = tfdv.StatsOptions(feature_allowlist=approved_cols)

# Review the features to generate the statistics
print(stats_options.feature_allowlist)

['tripduration', 'starttime', 'stoptime', 'start station id', 'start station name', 'start station latitude', 'start station longitude', 'end station id', 'end station name', 'end station latitude', 'end station longitude', 'bikeid', 'usertype', 'birth year', 'gender', 'datetime', 'tempmax', 'tempmin', 'temp', 'humidity', 'precip', 'snow', 'windgust', 'windspeed', 'visibility']


### 4.1 Generate DataFrame Statistics
TFDV allows you to generate statistics from different data formats such as CSV or a Pandas DataFrame.

Since I already have the data stored in a DataFrame I can use the function tfdv.generate_statistics_from_dataframe() which, given a DataFrame and stats_options, generates an object of type DatasetFeatureStatisticsList. This object includes the computed statistics of the given dataset.

In [8]:
input_data_stats = tfdv.generate_statistics_from_dataframe(dataframe=trip_weather_data_2020_10, stats_options=stats_options)

### 4.2 Visualize DataFrame Statistics

Now that I have the computed statistics in the DatasetFeatureStatisticsList instance, you will need a way to visualize these to get actual insights. TFDV provides this functionality through the method tfdv.visualize_statistics().

Using this function in an interactive Python environment such as this one will output a very nice and convenient way to interact with the descriptive statistics I generated earlier.

In [9]:
tfdv.visualize_statistics(input_data_stats)

## 5. Infer a Data Schema

A schema defines the properties of the data and can thus be used to detect errors. Some of these properties include:

- which features are expected to be present
- feature type
- the number of values for a feature in each example
- the presence of each feature across all examples
- the expected domains of features

The schema is expected to be fairly static, whereas statistics can vary per data batch (month in this case). So, I will infer the data schema from this month's dataset. Later, I cloud generate statistics for a different month and compare their state with the data schema to detect anomalies.

### 5.1 Infer the dataset set schema

Schema inference is straightforward using tfdv.infer_schema(). This function needs only the statistics (an instance of DatasetFeatureStatisticsList) of your data as input. The output will be a Schema protocol buffer containing the results.

A complimentary function is tfdv.display_schema() for displaying the schema in a table. This accepts a Schema protocol buffer as input.

In [10]:
# Infer the data schema by using the training statistics that you generated
schema = tfdv.infer_schema(statistics=input_data_stats)

# Display the data schema
tfdv.display_schema(schema)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'tripduration',INT,required,,-
'start station id',INT,required,,-
'start station name',BYTES,required,,-
'start station latitude',FLOAT,required,,-
'start station longitude',FLOAT,required,,-
'end station id',INT,required,,-
'end station name',BYTES,required,,-
'end station latitude',FLOAT,required,,-
'end station longitude',FLOAT,required,,-
'bikeid',BYTES,required,,-


  pd.set_option('max_colwidth', -1)


Unnamed: 0_level_0,Values
Domain,Unnamed: 1_level_1
'usertype',"'Customer', 'Subscriber'"
'gender',"'', '', ''"


## 6. Save intermediate results

Saving the intermediate results in Parquet format.

In [11]:
trip_weather_data_2020_10.to_parquet('../intermediate_results/202010-trip-and-weather-data-clean.parquet')