In [None]:
import great_expectations as gx 
import pandas as pd 

context = gx.get_context()
context = context.convert_to_file_context()
print(context)

In [None]:
sample_data = pd.read_parquet("../data/yellow_tripdata/yellow_tripdata_2023-01.parquet")
sample_data.head()

In [None]:

# Get some first insights about the data
sample_data.describe()

In [None]:
sample_data.tail()

In [None]:

# Connect to a data source
validator = context.sources.pandas_default.read_parquet(
    "../data/yellow_tripdata/yellow_tripdata_2023-01.parquet"
)

# Let's assume we are ok with NaN in the other columns, except `passenger_count`
# After running the following code, you can see we have 71743 NaN values
# You can verify this by using this code (sample_data[sample_data["passenger_count"].isnull()].shape[0])/sample_data.shape[0]
validator.expect_column_values_to_not_be_null("passenger_count") 

In [None]:
validator.expect_column_values_to_be_between("trip_distance", min_value=0, max_value=100) 

In [None]:

print(validator.head())

In [None]:
validator.save_expectation_suite(discard_failed_expectations=False)

# Define the checkpoint
checkpoint = context.add_or_update_checkpoint(
    name="yellow_tripdata_checkpoint",
    validator=validator
)

# Get the result after validation
checkpoint_result = checkpoint.run()

# Quick view on the validation result
context.view_validation_result(checkpoint_result)

In [None]:
context.build_data_docs()
context.open_data_docs()

A folder as a data source


In [None]:
context.sources.add_pandas_filesystem(
    name="my_ds", base_directory="../data/yellow_tripdata/"
)

In [None]:
my_ds = context.datasources["my_ds"]

my_batching_regex = "yellow_tripdata_2023-.*.parquet"

# Create the data asset (as one or more files from our data source)
my_asset = my_ds.add_parquet_asset(
    name="my_tripdata_data_asset", batching_regex=my_batching_regex
)

# Define a Batch Request to include all batches in the available data set
my_batch_request = my_asset.build_batch_request()
batches = my_asset.get_batch_list_from_batch_request(my_batch_request)

In [None]:
for batch in batches:
    print(batch.batch_spec)

In [None]:
context.add_or_update_expectation_suite("my_asset_expectation_suite")

asset_validator = context.get_validator(
    batch_request=my_batch_request,
    expectation_suite_name="my_asset_expectation_suite",
)
asset_validator.head()

In [None]:
# Add the same expectations as the single-file
asset_validator.expect_column_values_to_not_be_null("passenger_count")
asset_validator.expect_column_values_to_be_between("trip_distance", min_value=0, max_value=100) 
asset_validator.save_expectation_suite(discard_failed_expectations=False)

In [None]:
checkpoint = context.add_or_update_checkpoint(
    name="yellow_tripdata_asset_checkpoint",
    validator=asset_validator
)

# Get the result after validation
checkpoint_result = checkpoint.run()

# Quick view on the validation result
context.view_validation_result(checkpoint_result)

In [None]:
#Database as the datasource 
datasource_name = "my_postgresql_ds"
my_connection_string = (
    "postgresql+psycopg2://helen:helen@localhost:5432/helen"
)

datasource = context.sources.add_postgres(
    name=datasource_name, connection_string=my_connection_string
)