# The Wonderful World of Data Quality Tools in Python
Sam Bail, Data Umbrella, March 2021

## Imports and data loading
We've got some CSV files with 10,000 row samples of NYC taxi ride data from January and February 2019 which I'm loading here.

In [3]:
import numpy as np
import pandas as pd
import warnings
warnings.simplefilter('ignore')

In [9]:
csv1 = 'data/yellow_tripdata_sample_2019-01.csv'
df1 = pd.read_csv(csv1)
df1.head()

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,fare_amount
0,2,2019-01-05 06:36:51,2019-01-05 06:50:42,1,14.0
1,1,2019-01-23 15:22:13,2019-01-23 15:32:50,1,12.5
2,2,2019-01-04 10:54:47,2019-01-04 11:18:31,2,17.0
3,1,2019-01-05 12:07:08,2019-01-05 12:14:06,1,6.0
4,2,2019-01-04 18:23:00,2019-01-04 18:25:22,5,3.5


In [None]:
csv2 = 'data/yellow_tripdata_sample_2019-02.csv'
df2 = pd.read_csv(csv2)
df2.head()

## Example 1: Pandas "describe" for DataFrames
A simple "profiler" for dataframes. It just gives you some basic statistics on numeric columns.

In [None]:
df1.describe()

# Example 2: Pandas Profiling
Like a very sophisticated extension of .describe() on Pandas dataframes, 
creates a more detailed profile report of the data.

**Note:** The key difference between the two dataframes is the minimum in the *passenger_count* column:
* In the January data (df1), we have passenger counts from 1 through 6. 
* In the February data (df2), we have counts from 0 through 6, which looks like a bug.

In [None]:
# Simple way to profile our dataframe and look at the nicely rendered HTML result

from pandas_profiling import ProfileReport
ProfileReport(df1, title="Pandas Profiling Report for df1").to_notebook_iframe()

In [None]:
ProfileReport(df2, title="Pandas Profiling Report for df2").to_notebook_iframe()

# Example 3: TDDA (Test-Driven Data Analysis)
TDDA allows us to generate "constraints" from a reference data asset and verify whether another data asset matches those constraints.

In [None]:
# Generate the constraints based on the January dataframe

from tdda.constraints import discover_df, verify_df
constraints = discover_df(df1)
constraints_path = 'tdda_refs/example_constraints.tdda'
with open(constraints_path, 'w') as f:
    f.write(constraints.to_json())
    
# Show the generated constraints
print(str(constraints))

In [None]:
# Verify that the January data matches the constraints - this should match of course!

v1 = verify_df(df1, constraints_path, type_checking='strict', epsilon=0)
print(str(v1))

In [None]:
# Verify that the February data matches the constraints - this should fail
# because we have a different min for passenger_count

v2 = verify_df(df2, constraints_path, type_checking='strict', epsilon=0)
print(str(v2))

## Example 4: Bulwark
Data testing framework that lets you add tests on methods that return Pandas dataframes. 
Has some built-in tests and allows custom methods for tests. List of all built-in tests ("checks"): https://bulwark.readthedocs.io/en/stable/bulwark.checks.html

In [None]:
import bulwark.decorators as dc

# Option 1: Add checks/assertions as decorators on methods that generate dataframes
# This will return the df if all tests pass, and raise errors if any of the tests fail
@dc.HasNoNans()
@dc.IsShape((10000, 5)) # 10000 rows, 5 columns
@dc.HasValsWithinRange(items={"passenger_count": (1,6)}) # min and max are inclusive here
def load_and_test_csv(csv_file_path):
    df = pd.read_csv(csv_file_path)
    return df

# Test this out with the January data
load_and_test_csv(csv1).head()

In [None]:
# Now let's test the February data - this should fail the "has vals within range" test
load_and_test_csv(csv2).head()

In [None]:
import bulwark.checks as ck

# Option 2: You can also use the built-in tests ("checks") directly on a dataframe
ck.is_shape(df2, (10000, 5)).head() # 10000 rows, 5 columns
ck.has_vals_within_range(df2, items={"passenger_count": (1,6)}).head()

## Example 5: Voluptous/Opulent Pandas
Voluptous is a data validation library that allows you to specify a "schema" to validate JSON/YAML. 
Opulent Pandas is a df-focused “version” of Voluptuous. The syntax to define and validate a schema
is very similar. 

**Note:** Opulent Pandas doesn't look like it's being actively maintained, so I'm
only showing Voluptuous here.

In [None]:
from voluptuous import All, Range, ALLOW_EXTRA

# I had to fiddle a little to pass the right dict-type data structure into the schema
# This returns the df if the tests pass, or throws an error if a test fails 
def validate_df(df):
    schema = Schema(
        {
            'vendor_id': All(int)
            'passenger_count': All(int, Range(min=1, max=6))
        }, 
        extra=ALLOW_EXTRA
    )
    for r in df.to_dict('records'):
        schema(r)
    return df

# Test this out with df1, which should pass
validate_df(df1).head()

In [None]:
validate_df(df2).head()

## Example 6: mobydq

Data validation web app that allows you to check for "indicators" such as completeness, freshness, latency, validity.
Only showing a screenshot here because the setup is pretty heavyweight.

![mobydq](img/mobydq_screenshot.png)

## Example 7: dvc (data version control)
Command-line tool, showing screenshots.


![dvc init](img/dvc_init.png)
![dvc add](img/dvc_add.png)

You can then make modifications to data files and commit new versions and check out previous versions, 
all controlled via the .dvc file. 
dvc also allows you to set remotes to sync data to (instead of managing the actual data via GitHub), e.g. S3.

## Example 8: dedupe
Uses fuzzy matching to perform de-duplication and entity resolution in data. This isn't super useful with my sample data as the taxi ride records are unlikely to be "fuzzy" dupes. We could perhaps records to be duplicates if they match in all fields and have timestamps that are within a certain threshold.

In [37]:
import dedupe
import logging

logging.getLogger().setLevel(logging.WARNING)

input_file = csv1
output_file = 'dedupe_files/csv_example_output.csv'
settings_file = 'dedupe_files/csv_example_learned_settings'
training_file = 'dedupe_files/csv_example_training.json'

data = pd.read_csv(csv1)

# Define which fields we want deduper to consider
fields = [
    {'field': 'pickup_datetime', 'type': 'DateTime'},
    {'field': 'fare_amount', 'type': 'Price'},
]
deduper = dedupe.Dedupe(fields)

# Input to prepare_training needs to be a dict of index: dict elements
deduper.prepare_training(data.to_dict('index'))

# Interactive labeler allows us to confirm/reject whether records are dupes
dedupe.console_label(deduper)

pickup_datetime : 2019-01-26 22:57:27
fare_amount : 21.0

pickup_datetime : 2019-01-26 22:57:38
fare_amount : 11.5

0/10 positive, 0/10 negative
Do these records refer to the same thing?
(y)es / (n)o / (u)nsure / (f)inished


y


pickup_datetime : 2019-01-18 10:43:40
fare_amount : 10.0

pickup_datetime : 2019-01-18 10:44:12
fare_amount : 41.5

1/10 positive, 0/10 negative
Do these records refer to the same thing?
(y)es / (n)o / (u)nsure / (f)inished / (p)revious


n


pickup_datetime : 2019-01-30 18:50:23
fare_amount : 7.0

pickup_datetime : 2019-01-30 19:08:31
fare_amount : 9.5

1/10 positive, 1/10 negative
Do these records refer to the same thing?
(y)es / (n)o / (u)nsure / (f)inished / (p)revious


n


pickup_datetime : 2019-01-22 09:09:42
fare_amount : 7.5

pickup_datetime : 2019-01-22 09:09:05
fare_amount : 6.0

1/10 positive, 2/10 negative
Do these records refer to the same thing?
(y)es / (n)o / (u)nsure / (f)inished / (p)revious


y


pickup_datetime : 2019-01-04 06:47:43
fare_amount : 11.5

pickup_datetime : 2019-01-04 06:48:30
fare_amount : 33.0

2/10 positive, 2/10 negative
Do these records refer to the same thing?
(y)es / (n)o / (u)nsure / (f)inished / (p)revious


f


Finished labeling


In [38]:
# Train the model!
deduper.train()

In [40]:
# Save our weights and predicates to disk. If the settings file exists, 
# we will skip all the training and learning next time we run this file.

with open(training_file, 'w') as tf:
    deduper.write_training(tf)
with open(settings_file, 'wb') as sf:
    deduper.write_settings(sf)

In [44]:
# Partition will return sets of records that dedupe believes are all referring to the same entity.

# This throws an error because we don't have a cluster in our sample data :)
try:
    clustered_dupes = deduper.partition(data, threshold=0.5) 
    print('# duplicate sets:', len(clustered_dupes))
except Exception as e:
    pass

# clustered_dupes will contain a "cluster ID" for each record which we can use
# to combine or remove duplicate records from the same cluster