# Using Great Expectations for DBT Model Development

As SQL models are developed, you can encode assumptions about input and output datasets as **expectations**.

This has the following benefits:

1. These are machine verifiable and can be used to monitor data flowing through your pipelines.
2. These eliminate poisonous implicit assumptions that cause data engineers re-work and waste time - "How do we define visits?"
3. These **will eventually** be easy to edit.
4. These **will eventually** be easy to reason about visually.

In [56]:
import json
import os

import great_expectations as ge
import pandas as pd
import sqlalchemy
import random
import yaml

## Set Up DBT Tools

This is a helper class that makes using great expectations with dbt easy!

You'll need:
- your dbt `profile name`
- your dbt `target`

In [31]:
profile = "superconductive"
target = "rds"

dbt_tools = ge.DBTTools(profile, target)

## Initialize a DataContext

A great expectations `DataContext` represents the collection of data asset specifications in this project.

You'll need:
- the directory where you ran `great_expectations init` (where the .great_expectations.yml file is).

In [32]:
pipeline_data_context = ge.data_context.DataContext('./')

### Create data asset configs for the input and the output datasets.
### Data asset configs represent a type of data asset that is specified by a collection of expectations.
### In a dbt pipeline these map to models and we name our data asset exactly as the model.

### If the data asset config for this name exists in the project, the method will load it from the file. The files are stored in great_expectations/data_asset_configurations folder in the current project.

In [33]:
base_scheduleappointment_dataset_config = pipeline_data_context.get_data_asset_config('base/base_schedule_appointment')
base_scheduleappointment_dataset_config

{'data_asset_name': 'base/base_schedule_appointment',
 'meta': {'great_expectations.__version__': '0.5.1__develop__sch_internal'},
 'expectations': []}

In [34]:
every_visit_per_day_dataset_config = pipeline_data_context.get_data_asset_config('schedule_appointments')
every_visit_per_day_dataset_config

{'data_asset_name': 'schedule_appointments',
 'meta': {'great_expectations.__version__': '0.5.1__develop__sch_internal'},
 'expectations': []}

### The data asset configs (expectation collections) for the input and output datasets are empty. We will load result sets for these modules and create some expectations for them.


In [35]:
engine = dbt_tools.get_sqlalchemy_engine()

In [36]:
# This loads the result set of the schedule_appointments model into schedule_appointments data asset config.

# We can call GE's expect_* methods on this dataset. This will both test if the dataset conforms to an expectation and will add 
# this expectation to the config, if it does.

query_str = dbt_tools.get_model_compiled_sql('base/base_schedule_appointment')

df_base_scheduleappointment = ge.dataset.SqlAlchemyDataset(engine=engine, table_name="tmp{0:d}".format(random.randint(1,100000)), custom_sql=query_str)

df_base_scheduleappointment._initialize_expectations(config=base_scheduleappointment_dataset_config)

#### Since we extract appointment's date from start_date column, can we assume that a non-empty value is always present?

In [37]:
df_base_scheduleappointment.expect_column_values_to_not_be_null('start_date')

{'success': False,
 'result': {'element_count': 4464508,
  'missing_count': 0,
  'missing_percent': 0.0,
  'unexpected_count': 1971231,
  'unexpected_percent': 0.44153375915106435,
  'partial_unexpected_list': []}}

#### No! Good to know. We will add a where clause to our every_visit_per_day's SQL to filter the empties out. Also, let's record the fact that we expect close to half of scheduleappopintments records to not have a value. If the percentage is exceeded, valiation will fail.

In [42]:
df_base_scheduleappointment.expect_column_values_to_not_be_null('start_date', mostly=0.5)

{'success': True,
 'result': {'element_count': 4464508,
  'missing_count': 0,
  'missing_percent': 0.0,
  'unexpected_count': 1971231,
  'unexpected_percent': 0.44153375915106435,
  'partial_unexpected_list': []}}

#### Can we assume that (the non-empty) start_date values will be in a reasonable range (few recent years) ?

In [43]:
df_base_scheduleappointment.expect_column_values_to_be_between('start_date', min_value='2010-01-01')

{'success': False,
 'result': {'element_count': 4464508,
  'missing_count': 1971231,
  'missing_percent': 0.44153375915106435,
  'unexpected_count': 13,
  'unexpected_percent': 2.911855012915197e-06,
  'unexpected_percent_nonmissing': 5.214021546743502e-06,
  'partial_unexpected_list': ['1970-03-16',
   '1950-10-12',
   '1973-03-21',
   '1992-05-10',
   '1969-12-31',
   '1992-10-22',
   '1969-12-31',
   '1969-12-31',
   '1969-12-31',
   '2002-05-07',
   '1943-11-16',
   '1969-12-31',
   '2002-10-22']}}

#### No! A small percentage of values are earlier that would make sense. Just like the in the "null" case earlier, let's add a where clause to every_visit_per_day's SQL and add an expectation on the input dataset to say that we do expect some nonsensical dates, but only a very small percentage (up to 1%)

In [44]:
df_base_scheduleappointment.expect_column_values_to_be_between('start_date', min_value='2010-01-01', mostly=0.99)

{'success': True,
 'result': {'element_count': 4464508,
  'missing_count': 1971231,
  'missing_percent': 0.44153375915106435,
  'unexpected_count': 13,
  'unexpected_percent': 2.911855012915197e-06,
  'unexpected_percent_nonmissing': 5.214021546743502e-06,
  'partial_unexpected_list': ['1970-03-16',
   '1950-10-12',
   '1973-03-21',
   '1992-05-10',
   '1969-12-31',
   '1992-10-22',
   '1969-12-31',
   '1969-12-31',
   '1969-12-31',
   '2002-05-07',
   '1943-11-16',
   '1969-12-31',
   '2002-10-22']}}

#### We assume that records in scheduleappointments table represents one day appointments, not hospitalizations or any other multi-day treatments. Actually, can we assume that? Let's check... 

#### In this case the easiest way to add this expectation is to add a computed column to the input model's SQL and add an expectation of this column values range. This is a pattern that is worth noticing - often it is easier to add an expectation from the standard library to a computed column than a custom expectation.

In [45]:
df_base_scheduleappointment.expect_column_values_to_be_between('scheduled_duration_sec', min_value=10, max_value=86400)

ProgrammingError: (psycopg2.ProgrammingError) column "scheduled_duration_sec" does not exist
LINE 1: SELECT count(*) AS element_count, sum(CASE WHEN (scheduled_d...
                                                         ^

[SQL: SELECT count(*) AS element_count, sum(CASE WHEN (scheduled_duration_sec IN (NULL) OR scheduled_duration_sec IS NULL) THEN %(param_1)s ELSE %(param_2)s END) AS null_count, sum(CASE WHEN (NOT (scheduled_duration_sec >= %(scheduled_duration_sec_1)s AND scheduled_duration_sec <= %(scheduled_duration_sec_2)s) AND CASE WHEN (scheduled_duration_sec IS NULL) THEN %(param_3)s ELSE %(param_4)s END) THEN %(param_5)s ELSE %(param_6)s END) AS unexpected_count 
FROM tmp96565]
[parameters: {'param_1': 1, 'param_2': 0, 'scheduled_duration_sec_1': 10, 'scheduled_duration_sec_2': 86400, 'param_3': False, 'param_4': True, 'param_5': 1, 'param_6': 0}]
(Background on this error at: http://sqlalche.me/e/f405)

#### Our assumption is *almost* correct - there is a very small number of exceptions. In this case let's not even modify every_visit_per_day's SQL. Let's create an expectation on the input dataset that the rate of exceptions is under 1%. If it every changes, validation will fail and we will be notified.

In [34]:
df_base_scheduleappointment.expect_column_values_to_be_between('scheduled_duration_sec', min_value=10, max_value=86400, mostly=0.99)

{'success': True,
 'result': {'element_count': 4464508,
  'missing_count': 1971231,
  'missing_percent': 0.44153375915106435,
  'unexpected_count': 31,
  'unexpected_percent': 6.943654261567008e-06,
  'unexpected_percent_nonmissing': 1.243343599608066e-05,
  'partial_unexpected_list': [349200.0,
   3112200.0,
   2622600.0,
   157080600.0,
   -127059062400.0,
   693000.0,
   4669200.0,
   778500.0,
   -1988061300.0,
   349200.0,
   2421000.0,
   781200.0,
   89100.0,
   -723771000.0,
   1350518700.0,
   520200.0,
   90000.0,
   -867799800.0,
   21169800.0,
   1455818400.001]}}

#### We probably keep adding expectations to the input dataset, but for the demo purpose let's stop here and save the expectations

In [17]:
df_base_scheduleappointment.get_expectations_config()

	0 failing expectations
	2 result_format kwargs
	0 include_configs kwargs
	0 catch_exceptions kwargs
If you wish to change this behavior, please set discard_failed_expectations, discard_result_format_kwargs, discard_include_configs_kwargs, and discard_catch_exceptions_kwargs appropirately.


{'data_asset_name': 'base/base_schedule_appointment',
 'meta': {'great_expectations.__version__': '0.5.1__develop__sch_internal'},
 'expectations': [{'expectation_type': 'expect_column_values_to_not_be_null',
   'kwargs': {'column': 'start_date', 'mostly': 0.5}},
  {'expectation_type': 'expect_column_values_to_be_between',
   'kwargs': {'column': 'start_date',
    'min_value': '2010-01-01',
    'mostly': 0.99}}],
 'data_asset_type': 'Dataset'}

# TODO strip out `/` from json names

In [46]:
pipeline_data_context.save_data_asset_config(df_base_scheduleappointment.get_expectations_config())

	0 failing expectations
	2 result_format kwargs
	0 include_configs kwargs
	0 catch_exceptions kwargs
If you wish to change this behavior, please set discard_failed_expectations, discard_result_format_kwargs, discard_include_configs_kwargs, and discard_catch_exceptions_kwargs appropirately.


FileNotFoundError: [Errno 2] No such file or directory: '/Users/taylor/repos/forum-edw/great_expectations/data_asset_configurations/base/base_schedule_appointment.json'

### Previously we encoded our assumptions about the input dataset as expectations - this protects us from the risk coming from upstream. Now let's be nice to our the downstream models that consume the output of the every_visit_per_day model. We will encode our assumptions on our model's result set. This advertises to the downstream consumers what they can expect from us - a data contract of sorts.

In [47]:
# Load the result set of every_visit_per_day model into GE 

query_str = dbt_tools.get_model_compiled_sql('schedule_appointments')

df_every_visit_per_day = ge.dataset.SqlAlchemyDataset(engine=engine, table_name="tmp{0:d}".format(random.randint(1,100000)), custom_sql=query_str)
df_every_visit_per_day._initialize_expectations(config=every_visit_per_day_dataset_config)

#### Since we filtered out empty start_date values, we can confidently advertise this fact:

In [48]:
df_every_visit_per_day.expect_column_values_to_not_be_null('start_date')

{'success': True,
 'result': {'element_count': 65006,
  'missing_count': 0,
  'missing_percent': 0.0,
  'unexpected_count': 0,
  'unexpected_percent': 0.0,
  'partial_unexpected_list': []}}

#### "success: True" means that the result set conforms to our expectations - good!

#### Same logic applies to the range of values of start_date, since we filtered the unreasonably old ones in our SQL

In [49]:
df_every_visit_per_day.expect_column_values_to_be_between('start_date', min_value='2010-01-01')

{'success': True,
 'result': {'element_count': 65006,
  'missing_count': 0,
  'missing_percent': 0.0,
  'unexpected_count': 0,
  'unexpected_percent': 0.0,
  'unexpected_percent_nonmissing': 0.0,
  'partial_unexpected_list': []}}

#### We carry the expectation of no multi day appointments (with no more than 1% exceptions) from the input dataset (since we are not filtering them out in our SQL)

In [50]:
df_every_visit_per_day.expect_column_values_to_be_between('scheduled_duration_sec', min_value=10, max_value=86400, mostly=0.99)

ProgrammingError: (psycopg2.ProgrammingError) column "scheduled_duration_sec" does not exist
LINE 1: SELECT count(*) AS element_count, sum(CASE WHEN (scheduled_d...
                                                         ^

[SQL: SELECT count(*) AS element_count, sum(CASE WHEN (scheduled_duration_sec IN (NULL) OR scheduled_duration_sec IS NULL) THEN %(param_1)s ELSE %(param_2)s END) AS null_count, sum(CASE WHEN (NOT (scheduled_duration_sec >= %(scheduled_duration_sec_1)s AND scheduled_duration_sec <= %(scheduled_duration_sec_2)s) AND CASE WHEN (scheduled_duration_sec IS NULL) THEN %(param_3)s ELSE %(param_4)s END) THEN %(param_5)s ELSE %(param_6)s END) AS unexpected_count 
FROM tmp42693]
[parameters: {'param_1': 1, 'param_2': 0, 'scheduled_duration_sec_1': 10, 'scheduled_duration_sec_2': 86400, 'param_3': False, 'param_4': True, 'param_5': 1, 'param_6': 0}]
(Background on this error at: http://sqlalche.me/e/f405)

#### Are there any duplicated in the input dataset and, if yes, should we dedup them in every_visit_per_day's SQL?

#### This requires us to hypothesize about what might be viewed as a unique key in scheduleappointment. Let's say that the combination of start_date, office id, patient id and the provider id is a good candidate. Let's add this is a column to our model: `concat(sa.start_date, '__', office_id, '__', user_id_patient, '__', user_id_to_see) as appointment_key`.

#### Yet again, this is an example of adding a computed column so that we can reason on it using expectations. Check if we can assume this value to be unique in our result set: 

In [None]:
df_every_visit_per_day.expect_column_values_to_be_unique('appointment_key')

#### It is mostly unique - less than 2% exceptions. It is possible that we will have to deal with deduplication before deployment, but for now let's just encode this assumption as an expectation so that we don't forget it and so that other stake holders can see it:

In [None]:
df_every_visit_per_day.expect_column_values_to_be_unique('appointment_key', mostly=0.98)

#### Here is another example of encoding our assumption that would not be visible in the SQL source itself - scheduleappointment has `active` column that takes True and False values. We are not sure what it means - should "inactive" appointments be filtered out? Let's defer this decision and encode the assumption that the value of active is not important as an expectation on our output dataset:

In [53]:
df_every_visit_per_day.expect_column_values_to_be_in_set('active', ['t'])

{'success': True,
 'result': {'element_count': 65006,
  'missing_count': 0,
  'missing_percent': 0.0,
  'unexpected_count': 0,
  'unexpected_percent': 0.0,
  'unexpected_percent_nonmissing': 0.0,
  'partial_unexpected_list': []}}

#### Let's stop here for now and save the expectations on the output set:

In [54]:
df_every_visit_per_day.get_expectations_config()

	0 failing expectations
	3 result_format kwargs
	0 include_configs kwargs
	0 catch_exceptions kwargs
If you wish to change this behavior, please set discard_failed_expectations, discard_result_format_kwargs, discard_include_configs_kwargs, and discard_catch_exceptions_kwargs appropirately.


{'data_asset_name': 'schedule_appointments',
 'meta': {'great_expectations.__version__': '0.5.1__develop__sch_internal'},
 'expectations': [{'expectation_type': 'expect_column_values_to_not_be_null',
   'kwargs': {'column': 'start_date'}},
  {'expectation_type': 'expect_column_values_to_be_between',
   'kwargs': {'column': 'start_date', 'min_value': '2010-01-01'}},
  {'expectation_type': 'expect_column_values_to_be_in_set',
   'kwargs': {'column': 'active', 'value_set': ['t']}}],
 'data_asset_type': 'Dataset'}

In [55]:
pipeline_data_context.save_data_asset_config(df_every_visit_per_day.get_expectations_config())

	0 failing expectations
	3 result_format kwargs
	0 include_configs kwargs
	0 catch_exceptions kwargs
If you wish to change this behavior, please set discard_failed_expectations, discard_result_format_kwargs, discard_include_configs_kwargs, and discard_catch_exceptions_kwargs appropirately.


### The expectation collections for the two datasets are saved into JSON files in great_expectations/data_asset_configurations folder in the current project - let's commit them.