# Running Data Quality tests for DataFrame

In the following Notebook we will work with the `Tutorial Postgres.raw.public.taxi_yellow` table from the previous tutorial - [test workflow notebook](/lab/tree/notebooks/test_workflow.ipynb) - and perform some transformations on it before loading it into our staging database.

## Purpose
We want to showcase how we can hook OpenMetadata's data quality mechanisms directly in your ETLs before your data reaches its destination. For that, we're building an ETL that transforms the data we previously built and loads it in a table for which we have set up data quality tests in the [given instructions](/lab/tree/README.md).

## Description of the ETL
For context, please refer to [the test workflow notebook](/lab/tree/notebooks/test_workflow.ipynb)

In this case we will run some simple data cleaning and transformations on the dataframe. Then, we will use the `DataFrameValidator` interface to load the chunks of validated data into the destination and then finally report those results to OpenMetadata.

We will use the [`openmetadata-ingestion`](https://pypi.org/project/openmetadata-ingestion/) library to run the Data Quality tests we have defined in [OpenMetadata](http://localhost:8585/table/Tutorial%20Postgres.raw.public.taxi_yellow/profiler/data-quality).

## Dependencies
For our ETL we will be using SQLAlchemy to load the table, Pandas DataFrames to perform transformations, [`openmetadata-ingestion`](https://pypi.org/project/openmetadata-ingestion/) to run data quality tests and the OpenMetadata [Postgres Connector](https://docs.open-metadata.org/latest/connectors/database/postgres).

We can install all these dependencies specifying the right extras. A full list can be found in the project's [`setup.py`](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/setup.py), check it out if your installation differs from the example below.

## Requirements
If you haven't, please follow the [setup](/lab/tree/README.md#setup) steps in the README

For this example you will need:

- To have run the [`test_workflow`](/lab/tree/notebooks/test_workflow.ipynb) notebook
- An OpenMetadata instance running (achieved by following the setup instructions above)
- A bot JWT token. You can do so by using [Ingestion Bot's](http://localhost:8585/bots/ingestion-bot) token from your OpenMetadata instance
- [`openmetadata-ingestion`](https://pypi.org/project/openmetadata-ingestion/) version 1.11.0.0 or above (installed in this Notebook)

In [1]:
!pip install "openmetadata-ingestion[pandas,postgres]>=1.11.0.0"

Obtaining file:///opt/openmetadata/ingestion
  Installing build dependencies ... [?25ldone
[?25h  Checking if build backend supports build_editable ... [?25ldone
[?25h  Getting requirements to build editable ... [?25ldone
[?25h  Preparing editable metadata (pyproject.toml) ... [?25ldone
Building wheels for collected packages: openmetadata-ingestion
  Building editable for openmetadata-ingestion (pyproject.toml) ... [?25ldone
[?25h  Created wheel for openmetadata-ingestion: filename=openmetadata_ingestion-1.10.0.0.dev0-0.editable-py3-none-any.whl size=14100 sha256=7de4eb10961d6008744422eeb45dbc15b3967308192e57518fa91f670744ae60
  Stored in directory: /tmp/pip-ephem-wheel-cache-65dv56xb/wheels/94/a6/4b/951e6297508c20775c8465f8caed457f0821461c94c158f900
Successfully built openmetadata-ingestion
Installing collected packages: openmetadata-ingestion
  Attempting uninstall: openmetadata-ingestion
    Found existing installation: openmetadata-ingestion 1.10.0.0.dev0
    Uninstalling 

## Initial SDK setup
In this step we make sure our Python code is ready to work against OpenMetadata

You will be prompted for the JWT token mentioned in the [requirements](#requirements) section

In [2]:
from getpass import getpass

from metadata.sdk import configure

jwt_token = getpass("Please introduce a JWT token for authentication with OM")

configure(
    host="http://openmetadata_server:8585/api",
    jwt_token=jwt_token,
)

Please introduce a JWT token for authentication with OM ········


<metadata.sdk.client.OpenMetadata at 0xffff8c176910>

## Implementation of the ETL

In [3]:
# Define the transformation function to run on dataframes
def transform(df):
    # Keep only relevant columns
    cols_to_keep = [
        "vendorid", "tpep_pickup_datetime", "tpep_dropoff_datetime",
        "passenger_count", "trip_distance",
        "pulocationid", "dolocationid",
        "payment_type", "fare_amount", "tip_amount",
        "total_amount", "congestion_surcharge"
    ]
    df_stg = df[cols_to_keep]

    # Remove invalid or zero values
    df_stg = df_stg[
        (df_stg["fare_amount"] > 0) &
        (df_stg["total_amount"] > 0) &
        (df_stg["trip_distance"] > 0) &
        (df_stg["passenger_count"] > 0)
    ]

    # --- 2. Feature engineering ---
    df_stg["trip_duration_min"] = (
        (df_stg["tpep_dropoff_datetime"] - df_stg["tpep_pickup_datetime"]).dt.total_seconds() / 60
    )

    # Filter unrealistic durations and distances
    df_stg = df_stg[
        (df_stg["trip_duration_min"] >= 1) &
        (df_stg["trip_duration_min"] <= 180) &
        (df_stg["trip_distance"] <= 100)
    ]

    return df_stg

## Run Data Quality tests

In [4]:
# Define the validator we will use
from metadata.sdk.data_quality.dataframes.dataframe_validator import DataFrameValidator

validator = DataFrameValidator()

# Load the tests defined in OpenMetadata for the table `Tutorial Postgres.stg.public.dw_taxi_trips`
validator.add_openmetadata_table_tests("Tutorial Postgres.stg.public.dw_taxi_trips")

# Alternatively, one could define the same tests as code with:
# from metadata.sdk.data_quality import ColumnValuesToBeBetween
# validator.add_tests(
#     ColumnValuesToBeBetween(
#         name="amount_is_greater_than_0",
#         min_value=0,
#     ),
#     ColumnValuesToBeBetween(
#         name="trip_duration_to_be_between_1_and_180_minutes",
#         min_value=1,
#         max_value=180,
#     ),
#     ColumnValuesToBeBetween(
#         name="trip_distance_to_be_at_most_100",
#         max_value=100,
#     ),
# )

### Run mechanisms

The `DataFrameValidator` is designed so that you can use it in a variety of use cases, including when memory is a concern and you're running your ETLs using Pandas. For such cases we have created a shortcut which will make your code smaller. But let's check the trivial use case first, when your whole data fits in memory.

#### Strategy: data fits in memory

The following ETL reads from the source, applies transformations and validates the dataframe

In [6]:
# Read, transform, validate and load
import pandas as pd
from sqlalchemy import MetaData, Table, create_engine, delete, insert

SOURCE = "postgresql://user:pass@dwh:5432/raw"
DESTINATION = "postgresql://user:pass@dwh:5432/stg"

source = create_engine(SOURCE)

# Read
with source.connect() as conn:
    df = pd.read_sql("SELECT * FROM taxi_yellow", conn)

# Transform
df = transform(df)

# Validate
results = validator.validate(df)

# Load?
if results.success:
    destination = create_engine("postgresql://user:pass@dwh:5432/stg", future=True)

    with destination.connect() as conn:
        table = Table("dw_taxi_trips", MetaData(), autoload_with=conn)

        # Truncate and insert
        conn.execute(delete(table))
        conn.execute(insert(table).values(), df.to_dict(orient="records"))

# Optional: publish results to OpenMetadata
results.publish("Tutorial Postgres.stg.public.dw_taxi_trips")

#### Strategy: loading data in chunks
Now, this use case has two variants. The first one is pretty similar to the one before and it requires that your code follows the validator's `FailureMode`, which defaults to a short circuit. The second requires that you only define three methods: one that returns chunks of probably transformed dataframes, another that loads chunks and a third that handles errors.

The validator's default and only (for now) failure mode short-circuits execution of any other test case and stops iterating on the chunks of data if a failure is encountered. We will want our code to behave as such, so after short circuiting we will rollback our changes in the destination database.

In [None]:
# First: define a mechanism to load and transform chunks of data
## Credentials to the user are set up in `docker-compose.yml`

import pandas as pd
from sqlalchemy import create_engine

def load_and_transform():
    engine = create_engine(SOURCE)
    
    with engine.connect() as conn:
        chunks = pd.read_sql("SELECT * FROM taxi_yellow", conn, chunksize=1_000)

    for df in chunks:
        yield transform(df)

We will want the success and failure methods to have access to the same SQL connection so that everything stays in the same transaction. Thus we will create a little helper

In [8]:
# A little helper to manage the connection
class SQLAlchemyValidationSession:
    def __init__(self, connection_string, table_name):
        self.engine = create_engine(connection_string, future=True)
        self.table = Table(table_name, MetaData(), autoload_with=self.engine)
        self._conn = None

    def with_conn(self, connection):
        self._conn = connection
        return self

    def load_df_to_destination(self, df, _result):
        """Loads data into destination."""
        self._conn.execute(insert(self.table).values(), df.to_dict(orient="records"))

    def rollback(self, _df, _result):
        """Clears data previously loaded"""
        self._conn.rollback()

    def __enter__(self):
        conn = self.engine.connect()
        return self.with_conn(conn)

    def __exit__(self ,type, value, traceback):
        self._conn.close()
        self._conn = None

**Example 1: loading data in chunks with the `DataFrameValidator.validate` method**

In [11]:
# Example 1: loading data in chunks with the `DataFrameValidator.validate` method
from metadata.sdk.data_quality.dataframes.validation_results import ValidationResult

validation_session = SQLAlchemyValidationSession(
    connection_string=DESTINATION,
    table_name="dw_taxi_trips"
)

results = []
with validation_session as session:
    for transformed_df in load_and_transform():
        result = validator.validate(transformed_df)

        results.append(result)
        
        if result.success:
            session.load_df_to_destination(transformed_df, result)
        else:
            session.rollback(df, result)
            break

# Aggregate results for each test case for every chunk
results = ValidationResult.merge(*results)

# Publish to the server
results.publish("Tutorial Postgres.stg.public.dw_taxi_trips")

**Example 2: loading data in chunks with the `DataFrameValidator.run` method**

This method is a shortcut to the loop above that returns results already merged

> ⚠ **NOTE:** there is however one caveat. Some of our tests require the whole dataframe to be in memory for them to work. For example, tests counting the total amount of rows would return false results because they'd be running on subsets of the data. Future versions of the SDK will solve this issue. For the time being, if your data does not fit in memory you should resort to the example in [test_workflow.ipynb](/lab/tree/notebooks/test_workflow.ipynb).

In [12]:
with validation_session as session:
    results = validator.run(
        load_and_transform(),
        on_success=session.load_df_to_destination,
        on_failure=session.rollback,
    )

# Results are already merged and ready to publish in OpenMetadata
results.publish("Tutorial Postgres.stg.public.dw_taxi_trips")

In [13]:
# In both cases, results should be the same
for test_case, test_result in results.test_cases_and_results:

    print(f"\nTest: {test_case.name.root}")
    print(f"Status: {test_result.testCaseStatus}")
    print(f"Result: {test_result.result}")


Test: amount_is_greater_than_0
Status: TestCaseStatus.Success
Result: Found min=9.45, max=54.65 vs. the expected min=1.0, max=inf.; Found min=8.3, max=89.7 vs. the expected min=1.0, max=inf.; Found min=9.4, max=81.15 vs. the expected min=1.0, max=inf.; Found min=5.9, max=51.66 vs. the expected min=1.0, max=inf.; Found min=6.9, max=157.71 vs. the expected min=1.0, max=inf.; Found min=5.2, max=69.24 vs. the expected min=1.0, max=inf.; Found min=7.92, max=89.25 vs. the expected min=1.0, max=inf.; Found min=7.7, max=66.5 vs. the expected min=1.0, max=inf.; Found min=5.2, max=62.7 vs. the expected min=1.0, max=inf.; Found min=7.7, max=100.41 vs. the expected min=1.0, max=inf.

Test: trip_distance_to_be_at_most_100
Status: TestCaseStatus.Success
Result: Found min=0.02, max=7.45 vs. the expected min=-inf, max=100.0.; Found min=0.03, max=9.23 vs. the expected min=-inf, max=100.0.; Found min=0.06, max=8.79 vs. the expected min=-inf, max=100.0.; Found min=0.05, max=9.39 vs. the expected min=-in