In [1]:
import great_expectations as gx
import pandas as pd

from great_expectations.data_context import FileDataContext

context = FileDataContext.create(project_root_dir=".")

# You can take a look at all configurations related to GX here
print(context)

{
  "anonymous_usage_statistics": {
    "enabled": true,
    "explicit_url": false,
    "usage_statistics_url": "https://stats.greatexpectations.io/great_expectations/v1/usage_statistics",
    "data_context_id": "0cec2e07-9643-4782-9ba1-c4f622b7ea8f",
    "explicit_id": true
  },
  "checkpoint_store_name": "checkpoint_store",
  "config_variables_file_path": "uncommitted/config_variables.yml",
  "config_version": 3.0,
  "data_docs_sites": {
    "local_site": {
      "class_name": "SiteBuilder",
      "show_how_to_buttons": true,
      "store_backend": {
        "class_name": "TupleFilesystemStoreBackend",
        "base_directory": "uncommitted/data_docs/local_site/"
      },
      "site_index_builder": {
        "class_name": "DefaultSiteIndexBuilder"
      }
    }
  },
  "datasources": {},
  "evaluation_parameter_store_name": "evaluation_parameter_store",
  "expectations_store_name": "expectations_store",
  "fluent_datasources": {
    "my_ds": {
      "type": "postgres",
      "assets"

In [3]:
# You have been working with a single file, but how to work with a folder or a DB with several tables?
# Now, we have some new terms, the data source contains some data assets, and each asset will be divided into several batches

# First, add a folder as a data source
context.sources.add_pandas_filesystem(
    name="my_ds_kafka", base_directory="../data/kafka"
)

PandasFilesystemDatasource(type='pandas_filesystem', name='my_ds_kafka', id=None, assets=[], base_directory=PosixPath('../data/kafka'), data_context_root_directory=None)

In [4]:
my_ds = context.datasources["my_ds_kafka"]

In [8]:
my_batching_regex = "streaming.csv"

# Create the data asset (as one or more files from our data source)
my_asset = my_ds.add_csv_asset(
    name="data_asset", batching_regex=my_batching_regex
)

# Define a Batch Request to include all batches in the available data set
my_batch_request = my_asset.build_batch_request()
batches = my_asset.get_batch_list_from_batch_request(my_batch_request)

In [9]:
for batch in batches:
    print(batch.batch_spec)

{'path': '../data/kafka/streaming.csv', 'reader_method': 'read_csv', 'reader_options': {}}


In [10]:
context.add_or_update_expectation_suite("my_asset_expectation_suite")

asset_validator = context.get_validator(
    batch_request=my_batch_request,
    expectation_suite_name="my_asset_expectation_suite",
)

asset_validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,id,labels,input_ids,attention_mask
0,2c1f86cc-a95d-4b06-8e40-148f535396a9,0,"[101, 2043, 2017, 2360, 20868, 3089, 10440, 35...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..."
1,dec242b5-d4f4-42fe-adf5-4850074c8e39,0,"[101, 1061, 1054, 2057, 2725, 2023, 1024, 6986...","[1, 1, 1, 1, 1, 1, 1, 1, 1]"
2,5c57e24e-bd48-40bd-a872-f48b834f3ddd,0,"[101, 1000, 28144, 12043, 1010, 5925, 3985, 27...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..."
3,ddab93d1-19cc-466c-9c33-1ec88a0359b6,1,"[101, 4931, 1012, 1012, 1012, 2054, 2003, 2009...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..."
4,f0a11d68-4423-42e9-bb9c-a579e68162f9,0,"[101, 1000, 1027, 1027, 2047, 1027, 1027, 1063...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..."


In [11]:
# Add the same expectations as the single-file
asset_validator.expect_column_values_to_not_be_null("id")
asset_validator.expect_column_values_to_not_be_null("labels")
asset_validator.expect_column_values_to_not_be_null("input_ids")
asset_validator.expect_column_values_to_not_be_null("attention_mask")

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 1137,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": []
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [12]:
uuid_v4_regex = r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
asset_validator.expect_column_values_to_match_regex("id", uuid_v4_regex, mostly=1.0)
asset_validator.expect_column_values_to_be_unique("id")

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 1137,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [13]:
asset_validator.expect_column_values_to_be_in_set("labels", [0, 1])

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 1137,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [14]:
json_array_of_ints = r"^\s*\[\s*\d+(?:\s*,\s*\d+)*\s*\]\s*$"
asset_validator.expect_column_values_to_match_regex("input_ids", json_array_of_ints)
asset_validator.expect_column_values_to_match_regex("input_ids", r"^\s*\[\s*101\b", mostly=0.99)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 1137,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [15]:
mask_pattern = r"^\s*\[\s*[01](?:\s*,\s*[01])*\s*\]\s*$"
asset_validator.expect_column_values_to_match_regex("attention_mask", mask_pattern)
asset_validator.expect_column_values_to_match_regex("attention_mask", r"^\s*\[\s*1\b", mostly=1.0)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 1137,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [16]:
asset_validator.save_expectation_suite(discard_failed_expectations=False)

In [17]:
# Similar to a single file, create a checkpoint to validate the result
# Define the checkpoint
checkpoint = context.add_or_update_checkpoint(
    name="data_asset_checkpoint",
    validator=asset_validator
)

# Get the result after validator
checkpoint_result = checkpoint.run()

# Quick view on the validation result
context.view_validation_result(checkpoint_result)

Calculating Metrics:   0%|          | 0/48 [00:00<?, ?it/s]

Opening in existing browser session.
