Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notes from great_expectations workflow experiments: replace now-obsolete CLI workflow #132

Open
MattTriano opened this issue May 4, 2023 · 3 comments

Comments

@MattTriano
Copy link
Owner

Three days ago, the Great Expectations team published a blog post titled A fond farewell to the CLI, announcing that they've recently introduced new functionality (Fluent Datasources) that makes their CLI-to-jupyter-notebook workflow obsolete. I already built that CLI-to-notebook workflow into this system's developer workflow, but I agree that it's rather cumbersome, so I'm keen to explore the new functionality and hopefully improve the jupyter notebook workflow beyond what was generated by the gx CLI tools.

I'll use this issue to largely document experiments, useful bits, and usage notes, and I'll close it when I've merged in documentation reflecting the new workflow.

@MattTriano
Copy link
Owner Author

Connecting to a data source

The prior CLI workflow guided you through some prompts, providing options for the kind of data to connect to (filesystem or relational database) and either options for either the execution engine (for filesystem datasources) or options for the database backend (for SQL datasources), and then gx formatted those inputs into a yaml config string in a jupyter notebook. For the SQL branch, you would also enter the details needed to format a connection string so gx could connect to the database, and after gx tested the connection, it would save that information to the great_expectations.yml file as a "data_connectors" (as shown in my great_expectations.yml, which I can safely include in version control because "secret" credentials in that file just reference the env-vars defined in the dot-env files created by the setup process). This enables users to just invoke that data_connector when they need to connect to the data. Now, gx calls this a SQL database block-configuration and documents it as an advanced datasource configuration.

The new mode for connecting to a (postgres) datasource is described here, and points to this other documentation page for guidance on "securely" storing connection credentials.

I'll see how cumbersome it is to reference the cached data_connector using the new functionality. Based on preliminary experiments, it looks like I can access one of my data_connectors with a few lines of code like this

import great_expectations as gx

datasource_name = "where_house_source"
data_connector_name = "data_raw_inferred_data_connector_name"

context = gx.get_context()
datasource = context.get_datasource(datasource_name)
data_connector = datasource.data_connectors[data_connector_name]

but the new documentation seems largely focused on a Datasource -> DataAsset flow, and after playing with/reading the source code of the relevantly named methods of my data_connector object, I don't yet see a clean way to get from a DataConnector to a DataAsset without going through a pattern like

expectation_suite_name = "data_raw.cook_county_parcel_sales.warning"
data_asset_name = "data_raw.cook_county_parcel_sales"

batch_request = {
    "datasource_name": datasource_name,
    "data_connector_name": data_connector_name,
    "data_asset_name": data_asset_name,
    "limit": 1000,
}
validator = context.get_validator(
    batch_request=BatchRequest(**batch_request),
    expectation_suite_name=expectation_suite_name,
)

I'll experiment more to see if it's worth switching to the new flow or if I should maintain the bloc-config + DataConnector -> Validator flow.

@MattTriano
Copy link
Owner Author

Fluent Datasource definition

I added an env-var to the airflow dot-env file (it was the same as AIRFLOW_CONN_DWH_DB_CONN, just with the postgres:// prefix replaced with postgresql+psycopg2://, and I used the AIRFLOW_CONN_DWH_DB_CONN version because it already escaped the special characters in my dwh_db password), and then it was straightforward to create a new fluent.sql_datasource.Datasource via

import great_expectations as gx
from great_expectations.exceptions import DataContextError

datasource_name = "fluent_dwh_source"
context = gx.get_context()

try:
    datasource = context.sources.add_sql(name=datasource_name, connection_string="${GX_DWH_DB_CONN}")
except DataContextError:
    datasource = context.get_datasource(datasource_name)

and that fluent-style Datasource instance has methods that make it easy to programmatically register data tables or queries as data assets. The latter will make it very easy to adapt data to work with existing expectations.

table_asset = datasource.add_table_asset(
    name="data_raw.temp_chicago_food_inspections",
    schema_name="data_raw",
    table_name="temp_chicago_food_inspections",
)

query_asset = datasource.add_query_asset(
    name="food_inspection_results_by_zip_code",
    query="""
        SELECT count(*), results, zip
        FROM data_raw.temp_chicago_food_inspections
        GROUP BY results, zip
        ORDER BY count DESC
    """
)

And after the table or query is registered, you can just reference it by name.

Sidenote: I should add tasks to the general pipeline that registers tables as assets if they don't already exist. I could even set up the a task to run every time but just do nothing if the asset is already registered (datasource.add_table_asset throws a ValueError if a DataAsset with that name has already been registered; it would have been nice if they made a more specific error to catch this, like DataContextError above).

table_name="temp_chicago_food_inspections"
schema_name="data_raw"

if f"{schema_name}.{table_name}" not in datasource.get_asset_names():
    table_asset = datasource.add_table_asset(
        name=f"{schema_name}.{table_name}",
        schema_name=schema_name,
        table_name=table_name,
    )
else:
    print(f"A DataAsset named {schema_name}.{table_name} already exists.")

@MattTriano
Copy link
Owner Author

Setting Expectations with a fluent-style DataAsset

In the GX framework, a BatchRequest is the way to specify the data GX should validate, and the fluent-style DataAsset class provides methods and tools that help configure a BatchRequest.

import great_expectations as gx

datasource_name = "fluent_dwh_source"
table_name="temp_chicago_food_inspections"
schema_name="data_raw"

context = gx.get_context()
data_asset = context.get_datasource(datasource_name).get_asset(f"{schema_name}.{table_name}")
batch_request = data_asset.build_batch_request()

Before you can validate a DataAsset, you have to create an ExpectationSuite and then add Expectations to your suite

expectation_suite_name = f"{schema_name}.{table_name}_suite"

expectation_suite = context.add_or_update_expectation_suite(
    expectation_suite_name=expectation_suite_name
)

To set expectations, you can either define them manually (review the Expectation Gallery for options), or you can use a profiler to profile your data and generate expectations.

exclude_column_names = []  # If there are columns that shouldn't be profiled, add their names to this list
data_assistant_result = context.assistants.onboarding.run(
    batch_request=batch_request,
    exclude_column_names=exclude_column_names,
)

then extract the generated expectations from the profiler's run

expectation_suite = data_assistant_result.get_expectation_suite(
    expectation_suite_name=expectation_suite_name
)

The expectations set by the profiler values should be reviewed and edited before saving them (by design, the profiler generates expectations that fit the batches exactly).

These methods return Expectations organized/ordered by grouping

expectation_suite.get_grouped_and_ordered_expectations_by_expectation_type()
expectation_suite.get_grouped_and_ordered_expectations_by_column()
expectation_suite.get_grouped_and_ordered_expectations_by_domain_type()  # not too useful

and these methods are helpful in removing or replacing

expectation_suite.remove_all_expectations_of_type(expectation_types=[
    "expect_column_values_to_match_regex", "expect_column_proportion_of_unique_values_to_be_between", ...
])
expectation_suite.replace_expectation(
    new_expectation_configuration=ExpectColumnMedianToBeBetween(...),
    existing_expectation_configuration=ExpectationConfiguration_returned_in_last_codeblock
)

When you're content with your suite of Expectations, save it to your DataContext via

context.add_or_update_expectation_suite(expectation_suite=expectation_suite)

Then you can create a Checkpoint to run all of the table data against your expectations

checkpoint_config = {
    "class_name": "SimpleCheckpoint",
    "validations": [
        {
            "batch_request": batch_request,
            "expectation_suite_name": expectation_suite_name,
        }
    ],
}
checkpoint = SimpleCheckpoint(
    f"{schema_name}.{table_name}_checkpoint",
    context,
    **checkpoint_config,
)
checkpoint_result = checkpoint.run()

assert checkpoint_result["success"] is True

If any expectations fail, replace/edit them until you pass your checkpoint. Then you can generate data_docs and/or save your checkpoint to your DataContext

context.build_data_docs()
context.add_checkpoint(checkpoint=checkpoint)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant