# GX Load Package and Context

In [7]:
import pandas as pd
import great_expectations as gx
context = gx.data_context.DataContext('./gx')

In [8]:
df = pd.read_parquet('./green_tripdata_2023-07.parquet')
df['lpep_pickup_datetime'] = df['lpep_pickup_datetime'].astype('str')
df['lpep_dropoff_datetime'] = df['lpep_dropoff_datetime'].astype('str')

# Create New Great Expectation Project

In [None]:
from  gx.plugins.expectations.expect_column_values_to_be_non_negative import ExpectColumnValuesToBeNonNegative

In [7]:
dataframe_asset = context.sources.add_pandas(
    "green_taxi_validator_checkpoint"
).add_dataframe_asset(
    name="taxi_df", dataframe=df, batch_metadata={"year": "2023", "month": "07"}
)
batch_request = dataframe_asset.build_batch_request()

context.add_or_update_expectation_suite("green_taxi_expectation_suite")
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="green_taxi_expectation_suite",
)
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2023-07-01 00:08:06,2023-07-01 00:21:24,N,1.0,75.0,170.0,2.0,3.45,17.0,1.0,0.5,4.45,0.0,,1.0,26.7,1.0,1.0,2.75
1,2.0,2023-07-01 00:56:14,2023-07-01 01:03:28,N,1.0,75.0,262.0,2.0,1.49,10.0,1.0,0.5,3.05,0.0,,1.0,18.3,1.0,1.0,2.75
2,2.0,2023-06-30 23:47:32,2023-06-30 23:47:35,N,5.0,42.0,42.0,2.0,0.0,50.0,0.0,0.0,15.3,0.0,,1.0,66.3,1.0,2.0,0.0
3,2.0,2023-07-01 00:30:12,2023-07-01 00:51:20,N,1.0,33.0,75.0,1.0,9.32,38.0,1.0,0.5,0.0,0.0,,1.0,43.25,2.0,1.0,2.75
4,2.0,2023-07-01 00:46:56,2023-07-01 00:49:02,N,1.0,74.0,74.0,1.0,0.97,5.8,1.0,0.5,0.0,0.0,,1.0,8.3,1.0,1.0,0.0


In [9]:
# validator.expect_column_values_to_not_be_null(column="VendorID")
# validator.expect_column_values_to_match_strftime_format(column="lpep_pickup_datetime", strftime_format="%Y-%m-%d %H:%M:%S")
validator.expect_column_values_to_match_strftime_format(column="lpep_pickup_datetime", strftime_format="%Y-%m-%d %H:%M:%S")
validator.expect_column_values_to_match_strftime_format(column="lpep_dropoff_datetime", strftime_format="%Y-%m-%d %H:%M:%S")
validator.expect_column_values_to_be_non_negative(column="passenger_count")
validator.expect_column_values_to_be_non_negative(column="trip_distance")
validator.expect_column_values_to_be_non_negative(column="tolls_amount")
validator.save_expectation_suite(discard_failed_expectations=False)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

In [10]:
checkpoint = context.add_or_update_checkpoint(
    name="green_taxi_checkpoint",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": "green_taxi_expectation_suite",
        },
    ],
)
results = checkpoint.run()
results

Calculating Metrics:   0%|          | 0/38 [00:00<?, ?it/s]

{
  "run_id": {
    "run_name": null,
    "run_time": "2023-10-31T15:27:11.456007+01:00"
  },
  "run_results": {
    "ValidationResultIdentifier::green_taxi_expectation_suite/__none__/20231031T142711.456007Z/green_taxi_validator_checkpoint-taxi_df": {
      "validation_result": {
        "success": true,
        "results": [
          {
            "success": true,
            "expectation_config": {
              "expectation_type": "expect_column_values_to_match_strftime_format",
              "kwargs": {
                "column": "lpep_pickup_datetime",
                "strftime_format": "%Y-%m-%d %H:%M:%S",
                "batch_id": "green_taxi_validator_checkpoint-taxi_df"
              },
              "meta": {}
            },
            "result": {
              "element_count": 61343,
              "unexpected_count": 0,
              "unexpected_percent": 0.0,
              "partial_unexpected_list": [],
              "missing_count": 0,
              "missing_percent": 0.

# Validate a pandas dataframe with existing Great Expectation expectations

In [9]:
df = pd.read_parquet('./green_tripdata_2023-07.parquet')
df['lpep_pickup_datetime'] = df['lpep_pickup_datetime'].astype('str')
df['lpep_dropoff_datetime'] = df['lpep_dropoff_datetime'].astype('str')

taxi_asset = context.get_datasource("green_taxi_validator_checkpoint").get_asset("taxi_df")
batch_request = taxi_asset.build_batch_request(dataframe=df)
checkpoint = context.get_checkpoint(name="green_taxi_checkpoint")
results = checkpoint.run_with_runtime_args(
    batch_request=batch_request,
    expectation_suite_name="green_taxi_expectation_suite"
)
results.success

Calculating Metrics:   0%|          | 0/38 [00:00<?, ?it/s]

True

# Validate Pandas Schema with Pandera

In [1]:
from urllib.error import HTTPError
import pandas as pd

df = None
try:
    df = pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-07.parquet")
except HTTPError:
    print('the requested file is not available')

df.head()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2.0,2023-07-01 00:08:06,2023-07-01 00:21:24,N,1.0,75.0,170.0,2.0,3.45,17.0,1.0,0.5,4.45,0.0,,1.0,26.7,1.0,1.0,2.75
1,2.0,2023-07-01 00:56:14,2023-07-01 01:03:28,N,1.0,75.0,262.0,2.0,1.49,10.0,1.0,0.5,3.05,0.0,,1.0,18.3,1.0,1.0,2.75
2,2.0,2023-06-30 23:47:32,2023-06-30 23:47:35,N,5.0,42.0,42.0,2.0,0.0,50.0,0.0,0.0,15.3,0.0,,1.0,66.3,1.0,2.0,0.0
3,2.0,2023-07-01 00:30:12,2023-07-01 00:51:20,N,1.0,33.0,75.0,1.0,9.32,38.0,1.0,0.5,0.0,0.0,,1.0,43.25,2.0,1.0,2.75
4,2.0,2023-07-01 00:46:56,2023-07-01 00:49:02,N,1.0,74.0,74.0,1.0,0.97,5.8,1.0,0.5,0.0,0.0,,1.0,8.3,1.0,1.0,0.0


In [4]:
df.dtypes

In [11]:
from numpy import datetime64
from pandera.errors import SchemaError
from pandera import Column, DataFrameSchema, Float64, Index

schema = DataFrameSchema(
    {
        "VendorID": Column(Float64),
        "lpep_pickup_datetime": Column(datetime64),
        "lpep_dropoff_datetime": Column(datetime64),
        "store_and_fwd_flag": Column(object, nullable=True),
        "RatecodeID": Column(Float64, nullable=True),
        "PULocationID": Column(Float64),
        "DOLocationID": Column(Float64),
        "passenger_count": Column(Float64, nullable=True),
        "trip_distance": Column(Float64),
        "fare_amount": Column(Float64),
        "extra": Column(Float64),
        "mta_tax": Column(Float64),
        "tip_amount": Column(Float64),
        "tolls_amount": Column(Float64),
        "ehail_fee": Column(Float64, nullable=True),
        "improvement_surcharge": Column(Float64),
        "total_amount": Column(Float64),
        "payment_type": Column(Float64, nullable=True),
        "trip_type": Column(Float64, nullable=True),
        "congestion_surcharge": Column(Float64, nullable=True)
    },
    index=Index(int),
    strict=True,
)

try:
    schema.validate(df)
except SchemaError:
    print('schema error')