# Data Validation Workflow
The following diagram illustrates the end-to-end GX data validation workflow that I will implement.</br>
<img src="src/workflow.png" alt="Workflow" width="75%">

## Install GX

In [1]:
# Install GX
# %pip install great_expectations

In [2]:
# Import GX
import great_expectations as gx

## Create a Data Context
### Default Project Structure
``` bash
gx/
├── checkpoints
├── expectations
├── plugins
├── profilers
└── uncommitted
    ├── data_docs
    ├── validations
    └── config_variables.yml
├── .gitignore
├── great_expectations.yml
```

In [3]:
# Initialize GX
# After running, you will check the project structure created
!echo y | great_expectations init

[36m
  ___              _     ___                  _        _   _
 / __|_ _ ___ __ _| |_  | __|_ ___ __  ___ __| |_ __ _| |_(_)___ _ _  ___
| (_ | '_/ -_) _` |  _| | _|\ \ / '_ \/ -_) _|  _/ _` |  _| / _ \ ' \(_-<
 \___|_| \___\__,_|\__| |___/_\_\ .__/\___\__|\__\__,_|\__|_\___/_||_/__/
                                |_|
             ~ Always know what to expect from your data ~
[0m
Let's create a new Data Context to hold your project configuration.

Great Expectations will create a new directory with the following structure:

    great_expectations
    |-- great_expectations.yml
    |-- expectations
    |-- checkpoints
    |-- plugins
    |-- .gitignore
    |-- uncommitted
        |-- config_variables.yml
        |-- data_docs
        |-- validations

OK to proceed? [Y/n]: 

[36mCongratulations! You are now ready to customize your Great Expectations configuration.[0m

[36mYou can customize your configuration in many ways. Here are some examples:[0m

  [36mUse the CLI to:[0m
 

In [4]:
# Run this method to instantiating and returning a Data Context
context = gx.get_context()

## Connect to the Data

In [5]:
# Provide a name for the datasource and 
# specify the path to the data folder
datasource_name = "nyc_taxy"
path_foder = 'data'

# Connect to the data
datasource = context.sources.add_pandas_filesystem(
    name = datasource_name, 
    base_directory = path_foder
)

# Upon executing the code above, you will observe the creation 
# of the fluent data source section in gx/great_expectations.yml.

In [6]:
# Create a Data Asset and 
# specify a regular expression to identify relevant files
asset_name = f"{datasource_name}_11_2013"
batching_regex = r"batch_(?P<batch>\d{1}).parquet"

asset = datasource.add_parquet_asset(name=asset_name,
                                     batching_regex=batching_regex,
                                     order_by=["batch"], )

# You can check the great_expectations.yml file again.

In [7]:
# Verify the Data Asset works using Batch Request methods.
batch_request = asset.build_batch_request()
batches_list = asset.get_batch_list_from_batch_request(batch_request)
for batch in batches_list:
    print(batch.batch_spec)

{'path': 'data/batch_0.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}
{'path': 'data/batch_1.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}
{'path': 'data/batch_2.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}
{'path': 'data/batch_3.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}
{'path': 'data/batch_4.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}


In [8]:
batches_list[4].batch_spec

{'path': 'data/batch_4.parquet',
 'reader_method': 'read_parquet',
 'reader_options': {}}

## Create Expectations

In [9]:
# You will use the add_expectation_suite() method to create an ExpectationSuite.
expectation_name = "nyc_expectations"
suite = context.add_expectation_suite(expectation_suite_name=expectation_name)

# After running, check the config file in gx/expectations.

In [10]:
# Define functions to add specific expectations to the suite
# For this sample, the chosen expectations were expect_column_values_to_not_be_null 
# and expect_column_values_to_be_between.
# The method to add expectations is append_expectation.

def add_null_expectation(suite, column):
    """
    Add an expectation to the suite to check if the specified column values are not null.
    
    Parameters:
    - suite: The expectation suite to which the expectation will be added.
    - column: The column for which the expectation will be applied.
    """
    suite.append_expectation({
        "expectation_type": "expect_column_values_to_not_be_null",
        "kwargs": {"column": column}
    })

def add_between_expectation(suite, column, min_value, max_value):
    """
    Add an expectation to the suite to check if the specified column values fall within a given range.
    
    Parameters:
    - suite: The expectation suite to which the expectation will be added.
    - column: The column for which the expectation will be applied.
    - min_value: The minimum expected value for the column.
    - max_value: The maximum expected value for the column.
    """
    suite.append_expectation({
        "expectation_type": "expect_column_values_to_be_between",
        "kwargs": {"column": column, "min_value": min_value, "max_value": max_value}
    })

In [11]:
# Add expectations for null values
add_null_expectation(suite, "request_datetime")
add_null_expectation(suite, "on_scene_datetime")
add_null_expectation(suite, "pickup_datetime")
add_null_expectation(suite, "dropoff_datetime")

# Add expectations for value ranges
add_between_expectation(suite, "PULocationID", 1, 265)
add_between_expectation(suite, "DOLocationID", 1, 265)
add_between_expectation(suite, "base_passenger_fare", 5, 500)
add_between_expectation(suite, "driver_pay", 5, 500)
add_between_expectation(suite, "trip_miles", 0.2, 100)
add_between_expectation(suite, "trip_time", 500, 5000)

In [12]:
# Save the expectations suite for later use.
context.save_expectation_suite(suite)

# After running, you can inspect the file to view the added expectations.

'/Users/leonelcortez/Desktop/nyc_gx/gx/expectations/nyc_expectations.json'

## Run a Checkpoint

In [13]:
# Before running a Checkpoint, take note next:
# You have a list of batches (batches_list).
batches_list

[Batch(datasource=PandasFilesystemDatasource(type='pandas_filesystem', name='nyc_taxy', id=None, assets=[ParquetAsset(name='nyc_taxy_11_2013', type='parquet', id=None, order_by=[Sorter(key='batch', reverse=False)], batch_metadata={}, batching_regex=re.compile('batch_(?P<batch>\\d{1}).parquet'), connect_options={}, splitter=None, engine='auto', columns=None, storage_options=None, use_nullable_dtypes=None, dtype_backend=None, kwargs=None)], base_directory=PosixPath('data'), data_context_root_directory=None), data_asset=ParquetAsset(name='nyc_taxy_11_2013', type='parquet', id=None, order_by=[Sorter(key='batch', reverse=False)], batch_metadata={}, batching_regex=re.compile('batch_(?P<batch>\\d{1}).parquet'), connect_options={}, splitter=None, engine='auto', columns=None, storage_options=None, use_nullable_dtypes=None, dtype_backend=None, kwargs=None), batch_request=BatchRequest(datasource_name='nyc_taxy', data_asset_name='nyc_taxy_11_2013', options={'path': 'batch_0.parquet', 'batch': '0'}

In [14]:
# So you'll convert the list of Batches into a list of Batch Requests
batch_request_list = [batch.batch_request for batch in batches_list]

In [15]:
batch_request_list

[BatchRequest(datasource_name='nyc_taxy', data_asset_name='nyc_taxy_11_2013', options={'path': 'batch_0.parquet', 'batch': '0'}),
 BatchRequest(datasource_name='nyc_taxy', data_asset_name='nyc_taxy_11_2013', options={'path': 'batch_1.parquet', 'batch': '1'}),
 BatchRequest(datasource_name='nyc_taxy', data_asset_name='nyc_taxy_11_2013', options={'path': 'batch_2.parquet', 'batch': '2'}),
 BatchRequest(datasource_name='nyc_taxy', data_asset_name='nyc_taxy_11_2013', options={'path': 'batch_3.parquet', 'batch': '3'}),
 BatchRequest(datasource_name='nyc_taxy', data_asset_name='nyc_taxy_11_2013', options={'path': 'batch_4.parquet', 'batch': '4'})]

In [16]:
# Build a validations list
validations = [
    {"batch_request": batch.batch_request, 
     "expectation_suite_name": expectation_name}
    for batch in batches_list
]

In [17]:
# Create a Checkpoint
checkpoint_name = "BatchValidationPoint"
checkpoint = context.add_or_update_checkpoint(
    name=checkpoint_name, 
    validations=validations
)

# Upon execution, a new checkpoint file named "BatchValidationPoint" will be created in gx/checkpoints.
# This file will contain configuration for each data batch.

In [18]:
# Run the Checkpoint
checkpoint_result = checkpoint.run()

Calculating Metrics:   0%|          | 0/65 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/65 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/65 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/65 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/65 [00:00<?, ?it/s]

## View Validation Results

In [19]:
run_identifier = next(iter(checkpoint_result['run_results']))
checkpoint_result['run_results'][run_identifier]['validation_result']['statistics']

{'evaluated_expectations': 10,
 'successful_expectations': 5,
 'unsuccessful_expectations': 5,
 'success_percent': 50.0}

In [20]:
# You observed that the checkpoint result showed a 50% success rate.
# Run the following code to examine the validation results in more detail.
# Build and view the Validation Results as Data Docs.
context.build_data_docs()
context.open_data_docs()

<img src="src/gx_doc.png" alt="doc" width="75%">

<img src="src/gx_doc_result.png" alt="result" width="75%">

## Create a Validator
You can use a Validator to interactively create your Expectations.

In [21]:
# This code will take the last batch for validating
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_name, # optional, if you want to update the expectations made above.
)
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,driver_pay
0,2023-11-25 18:35:17,2023-11-25 18:38:09,2023-11-25 18:39:34,2023-11-25 19:15:20,36,132,10.5,2146,59.53,45.38
1,2023-11-25 18:11:04,NaT,2023-11-25 18:13:29,2023-11-25 18:28:59,69,168,2.96,930,16.73,12.74
2,2023-11-25 18:53:13,2023-11-25 18:57:53,2023-11-25 18:59:03,2023-11-25 19:14:06,166,244,3.77,903,19.63,13.85
3,2023-11-25 18:57:54,2023-11-25 18:59:06,2023-11-25 18:59:18,2023-11-25 19:13:14,112,80,2.07,836,15.19,10.58
4,2023-11-25 18:40:19,2023-11-25 18:40:58,2023-11-25 18:42:59,2023-11-25 18:48:23,249,68,0.77,324,8.4,5.5


In [22]:
# Create and run an Expectation
validator.expect_column_values_to_be_between("PULocationID",3,45)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": false,
  "result": {
    "element_count": 3269250,
    "unexpected_count": 2834877,
    "unexpected_percent": 86.71337462720807,
    "partial_unexpected_list": [
      69,
      166,
      112,
      249,
      246,
      112,
      240,
      174,
      243,
      51,
      254,
      61,
      165,
      78,
      213,
      197,
      138,
      130,
      215,
      89
    ],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 86.71337462720807,
    "unexpected_percent_nonmissing": 86.71337462720807
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [23]:
# Save or update Expectations for future use 
# validator.save_expectation_suite(discard_failed_expectations=False)