In [20]:
import great_expectations as gx

In [21]:
def dd_site_config(context, site_name):
    # Define a Data Docs site configuration dictionary
    base_directory = "uncommitted/data_docs/local_site/"  # this is the default path (relative to the root folder of the Data Context) but can be changed as required
    site_config = {
        "class_name": "SiteBuilder",
        "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
        "store_backend": {
            "class_name": "TupleFilesystemStoreBackend",
            "base_directory": base_directory,
        },
    }


    if site_name not in context.get_site_names():
        context.add_data_docs_site(site_name=site_name, site_config=site_config)
    return context

In [22]:
def get_data_source(context, data_source_name):
    my_BQ_connection_string="bigquery://brazilian-e-commerce-team-3/BET_Team3"

    # Add BigQuery datasource (Fluent style)
    data_source = context.data_sources.add_or_update_sql(
        name=data_source_name,
        connection_string=my_BQ_connection_string
    )
    return data_source

In [23]:
def get_data_asset(data_source, tgt, asset_name):
    database_table_name = tgt
    table_data_asset = data_source.add_table_asset(
        table_name=database_table_name, name=asset_name
    )

    # Get the Data Asset from the Data Source
    data_asset = data_source.get_asset(asset_name)
    return data_asset

In [24]:
def get_full_table_batch(data_asset, batch_definition_name):
    full_table_batch_definition = data_asset.add_batch_definition_whole_table(
        name=batch_definition_name
    )
    full_table_batch = full_table_batch_definition.get_batch()
    return full_table_batch

In [25]:
import importlib

def get_exp_suite(context, exp_suite_name, mod_name, func_name):
    exp_suite = gx.ExpectationSuite(name=exp_suite_name)
    exp_suite = context.suites.add_or_update(exp_suite)

    #from gxe_01_olist_customers_suite import build_expectations_gxe_01_olist_customers_suite
    # exp_suite = build_expectations_01_olist_customers_suite(exp_suite)

    module = importlib.import_module(mod_name)
    func = getattr(module, func_name)
    exp_suite = func(exp_suite)
    return exp_suite

In [26]:
def get_batch_defintion(context, data_source_name, asset_name, batch_definition_name):
    batch_definition = (
        context.data_sources.get(data_source_name)
        .get_asset(asset_name)
        .get_batch_definition(batch_definition_name)
    )
    return batch_definition

In [27]:
def get_validation_definitions(context, validation_definition_name, batch_definition, exp_suite):
    validation_definition = gx.ValidationDefinition(
        data=batch_definition, suite=exp_suite, name=validation_definition_name
    )
    validation_definition = context.validation_definitions.add_or_update(validation_definition)
    validation_definitions = [
        context.validation_definitions.get(validation_definition_name)
    ]
    return validation_definitions

In [28]:
def get_checkpoint(context, checkpoint_name, site_name, validation_definitions):
    actions = [
        gx.checkpoint.actions.UpdateDataDocsAction(
            name="update_olist_site", site_names=[site_name]       
        )
    ]
    checkpoint = context.checkpoints.add_or_update(
        gx.Checkpoint(
            name=checkpoint_name,
            validation_definitions=validation_definitions,
            actions=actions,
        )
    )
    return checkpoint

In [29]:
def run_checkpoint(tgt, mod_name, func_name):
    context = gx.get_context(mode="file")

    # Add the Data Docs configuration to the Data Context
    site_name = "olist_data_docs_site"
    context = dd_site_config(context, site_name)

    #data_source_name = "BQ_olist_customer_datasource"
    data_source_name = "BQ_olist_" + tgt + "_datasource"
    data_source = get_data_source(context, data_source_name)

    # asset_name = "olist_customers_asset"
    asset_name = "olist_" + tgt + "_asset"
    data_asset = get_data_asset(data_source, tgt, asset_name)

    # batch_definition_name = "olist_customers_batch_def_name"
    batch_definition_name = "olist_" + tgt + "_batch_def_name"
    full_table_batch = get_full_table_batch(data_asset, batch_definition_name)
    full_table_batch.head()

    #exp_suite_name = "olist_customers_suite"
    exp_suite_name = "olist_" + tgt + "_suite"
    exp_suite = get_exp_suite(context, exp_suite_name, mod_name, func_name)

    batch_definition = get_batch_defintion(context, data_source_name, asset_name, batch_definition_name)

    # validation_definition_name = "BQ_customers_validation_definition"
    validation_definition_name = "BQ_" + tgt + "_validation_definition"
    validation_definitions = get_validation_definitions(context, validation_definition_name, batch_definition, exp_suite)

    #checkpoint_name = "olist_customers_checkpoint"
    checkpoint_name = "olist_" + tgt + "_checkpoint"
    checkpoint = get_checkpoint(context, checkpoint_name, site_name, validation_definitions)

    result = checkpoint.run()
    context.open_data_docs()


In [30]:
tgt = 'dim_customers'
mod_name = "gxe_01_olist_dim_customers_suite"
func_name = "build_expectations_gxe_01_olist_dim_customers_suite"
run_checkpoint(tgt, mod_name, func_name)

Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00,  1.11it/s]
Calculating Metrics: 100%|██████████| 30/30 [00:05<00:00,  5.29it/s]


In [31]:
tgt = 'dim_date'
mod_name = "gxe_02_olist_dim_date_suite"
func_name = "build_expectations_gxe_02_olist_dim_date_suite"
run_checkpoint(tgt, mod_name, func_name)

Calculating Metrics: 100%|██████████| 1/1 [00:01<00:00,  1.40s/it]
Calculating Metrics: 100%|██████████| 53/53 [00:08<00:00,  6.57it/s]


In [32]:
tgt = 'dim_time'
mod_name = "gxe_03_olist_dim_time_suite"
func_name = "build_expectations_gxe_03_olist_dim_time_suite"
run_checkpoint(tgt, mod_name, func_name)

Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00,  1.34it/s]
Calculating Metrics: 100%|██████████| 32/32 [00:06<00:00,  5.14it/s]


In [33]:
tgt = 'dim_payments'
mod_name = "gxe_04_olist_dim_payments_suite"
func_name = "build_expectations_gxe_04_olist_dim_payments_suite"
run_checkpoint(tgt, mod_name, func_name)

Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00,  1.09it/s]
Calculating Metrics:  78%|███████▊  | 32/41 [00:05<00:01,  5.53it/s]


In [34]:
tgt = 'dim_order_reviews'
mod_name = "gxe_05_olist_dim_order_reviews_suite"
func_name = "build_expectations_gxe_05_olist_dim_order_reviews_suite"
run_checkpoint(tgt, mod_name, func_name)

Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00,  1.16it/s]
Calculating Metrics: 100%|██████████| 37/37 [00:06<00:00,  5.80it/s]


In [35]:
tgt = 'dim_products'
mod_name = "gxe_06_olist_dim_products_suite"
func_name = "build_expectations_gxe_06_olist_dim_products_suite"
run_checkpoint(tgt, mod_name, func_name)

Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00,  1.34it/s]
Calculating Metrics: 100%|██████████| 84/84 [00:09<00:00,  9.14it/s]


In [36]:
tgt = 'dim_sellers'
mod_name = "gxe_07_olist_dim_sellers_suite"
func_name = "build_expectations_gxe_07_olist_dim_sellers_suite"
run_checkpoint(tgt, mod_name, func_name)

Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00,  1.17it/s]
Calculating Metrics: 100%|██████████| 24/24 [00:05<00:00,  4.75it/s]


In [37]:
tgt = 'dim_geolocation'
mod_name = "gxe_08_olist_dim_geolocation_suite"
func_name = "build_expectations_gxe_08_olist_dim_geolocation_suite"
run_checkpoint(tgt, mod_name, func_name)

Calculating Metrics: 100%|██████████| 1/1 [00:01<00:00,  1.05s/it]
Calculating Metrics: 100%|██████████| 24/24 [00:04<00:00,  5.08it/s]


In [38]:
tgt = 'fact_sales_corrected'
mod_name = "gxe_09_olist_fact_sales_suite"
func_name = "build_expectations_gxe_09_olist_fact_sales_suite"
run_checkpoint(tgt, mod_name, func_name)

Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00,  1.09it/s]
Calculating Metrics: 100%|██████████| 103/103 [00:16<00:00,  6.38it/s]
