# Import all the libraries that you need

WARNING : USE THE CUSTOM EXPECTATIONS WITH THIS NOTEBOOK IN 'plugins' folder

In [1]:
import os
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
# from expectations.expect_queried_column_values_to_exist_in_second_table_column import ExpectQueriedColumnValuesToExistInSecondTableColumn
# from great_expectations_experimental.expect_column_values_to_not_be_null_and_column_to_not_be_empty import ExpectColumnValuesToNotBeNullAndColumnToNotBeEmpty
from expectations.expect_column_values_to_equal_zero import ExpectColumnValuesToEqualZero
import pandas as pd
from openpyxl import Workbook
from datetime import datetime
import psycopg2

# Set up GX

In [2]:
context = gx.get_context()

In [3]:
print(context)

{
  "anonymous_usage_statistics": {
    "explicit_url": false,
    "enabled": true,
    "usage_statistics_url": "https://stats.greatexpectations.io/great_expectations/v1/usage_statistics",
    "explicit_id": true,
    "data_context_id": "3b6de599-bf8a-4bd3-89fa-b425a84dde11"
  },
  "checkpoint_store_name": "checkpoint_store",
  "config_version": 3.0,
  "data_docs_sites": {},
  "datasources": {
    "pandas_datasource": {
      "class_name": "Datasource",
      "name": "pandas_datasource",
      "execution_engine": {
        "module_name": "great_expectations.execution_engine",
        "class_name": "PandasExecutionEngine"
      },
      "module_name": "great_expectations.datasource",
      "data_connectors": {
        "runtime_data_connector": {
          "class_name": "RuntimeDataConnector",
          "name": "runtime_data_connector",
          "module_name": "great_expectations.datasource.data_connector",
          "batch_identifiers": [
            "id_key_0",
            "id_key_1"


In [4]:
# Check List of Datasources
context.list_datasources()

[{'execution_engine': {'class_name': 'PandasExecutionEngine',
   'module_name': 'great_expectations.execution_engine'},
  'data_connectors': {'runtime_data_connector': {'batch_identifiers': ['id_key_0',
     'id_key_1'],
    'module_name': 'great_expectations.datasource.data_connector',
    'class_name': 'RuntimeDataConnector'}},
  'name': 'pandas_datasource',
  'module_name': 'great_expectations.datasource',
  'class_name': 'Datasource'},
 {'execution_engine': {'class_name': 'SparkDFExecutionEngine',
   'module_name': 'great_expectations.execution_engine'},
  'data_connectors': {'runtime_data_connector': {'batch_identifiers': ['id_key_0',
     'id_key_1'],
    'module_name': 'great_expectations.datasource.data_connector',
    'class_name': 'RuntimeDataConnector'}},
  'name': 'spark_datasource',
  'module_name': 'great_expectations.datasource',
  'class_name': 'Datasource'},
 {'type': 'pandas',
  'name': 'postgres_v3_customers_inner_join_orders_datasource',
  'assets': [{'name': 'postg

In [5]:
# WARNING : THIS CODE FOR DELETE DATASOURCE, PLEASE USING WISELY
# context.delete_datasource("postgres_v3b_customers_inner_join_orders_datasource")


KeyboardInterrupt



## Connect to your data

In [6]:
# Run the following Python code to create a Pandas Data Source:
pd_datasource = context.sources.add_pandas(name="postgres_v3b_customers_inner_join_orders_datasource")

# Connect to database
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="postgres",
    user="postgres",
    password="KantorAHP123!")

# Run SQL
sql_query = pd.read_sql("SELECT c.customer_id, c.first_name, c.last_name, o.credit_card_amount FROM customers c INNER JOIN orders o ON c.customer_id = o.customer_id", con=conn)

# Convert to dataframe
df = pd.DataFrame(data=sql_query, columns=['customer_id', 'first_name', 'last_name', 'credit_card_amount'])

# Set Data Asset name
data_asset_name = "postgres_v3b_customers_inner_join_orders_dataframe"

# Set Data Asset
data_asset = pd_datasource.add_dataframe_asset(name=data_asset_name)

# Set Batch Request
batch_request = data_asset.build_batch_request(dataframe=df)

# Create Expectations

You'll use a **Validator** to interact with your batch of data and generate an **Expectation Suite**.

Every time you evaluate an Expectation with `validator.expect_*`, it is immediately Validated against your data. This instant feedback helps you identify unexpected data and removes the guesswork from data exploration. The Expectation configuration is stored in the Validator. When you are finished running the Expectations on the dataset, you can use `validator.save_expectation_suite()` to save all of your Expectation configurations into an Expectation Suite for later use in a checkpoint.

In [7]:
# Run the following command to create the suite and get a Validator
expectation_suite_name = "postgres_v3b_customers_inner_join_orders_suite"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
)

print(validator.head())

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

   customer_id first_name last_name  credit_card_amount
0           31       Jane        G.                   0
1            2      Shawn        M.                  23
2           99       Mary        G.                  27
3           11       Fred        S.                   0
4           39     Louise        W.                  10


In [8]:
# Run the following command to use the Validator to add a few Expectations:
# credit_card_amount to be 0

# CAUTION : IF YOU SEARCH A COLUMN VALUES == 0, PLEASE USE expect_column_values_to_not_be_in_set
validator_credit_card_amount = validator.expect_column_values_to_equal_zero(
    column="credit_card_amount",
    result_format={
        "result_format" : "COMPLETE",
        "unexpected_index_column_names" : ["customer_id", "first_name", "last_name"], 
        "return_unexpected_index_query" : True,
    }, 
    catch_exception=True,
)

print(validator_credit_card_amount)

Calculating Metrics:   0%|          | 0/10 [00:00<?, ?it/s]

{
  "meta": {},
  "result": {
    "element_count": 99,
    "unexpected_count": 50,
    "unexpected_percent": 50.505050505050505,
    "partial_unexpected_list": [
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0,
      0
    ],
    "unexpected_index_column_names": [
      "customer_id",
      "first_name",
      "last_name"
    ],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 50.505050505050505,
    "unexpected_percent_nonmissing": 50.505050505050505,
    "partial_unexpected_counts": [
      {
        "value": 0,
        "count": 50
      }
    ],
    "partial_unexpected_index_list": [
      {
        "credit_card_amount": 0,
        "customer_id": 31,
        "first_name": "Jane",
        "last_name": "G."
      },
      {
        "credit_card_amount": 0,
        "customer_id": 11,
        "first_name": "Fred",
        "last_name

In [9]:
unexpected_index_query_variable_credit_card_amount_result = df.filter(items=[0, 3, 5, 7, 9, 10, 13, 14, 16, 17, 18, 20, 21, 22, 24, 25, 27, 30, 31, 35, 36, 37, 39, 40, 43, 44, 45, 46, 48, 50, 51, 53, 57, 59, 62, 63, 65, 66, 67, 68, 71, 73, 74, 76, 78, 82, 89, 90, 94, 97], axis=0)

print(unexpected_index_query_variable_credit_card_amount_result)

    customer_id first_name last_name  credit_card_amount
0            31       Jane        G.                   0
3            11       Fred        S.                   0
5            26      Aaron        R.                   0
7            94    Gregory        H.                   0
9            63     Edward        G.                   0
10           90       Paul        P.                   0
13           40      Maria        A.                   0
14            6      Sarah        R.                   0
16           27   Benjamin        B.                   0
17           71     Gerald        C.                   0
18            8      Frank        R.                   0
20           85    Theresa        M.                   0
21           63     Edward        G.                   0
22           25     Victor        H.                   0
24           35       Sara        T.                   0
25           50      Billy        L.                   0
27           21     Willie     

In [26]:
# Directory to save excel file
path = os.getcwd()+"/file_result"

# Date now
date_now = datetime.now().strftime("%Y-%m-%d")

# Unexpected Index Query from Unexpected Index Query (Great Expectations)
unexpected_index_query = unexpected_index_query_variable_credit_card_amount_result

# Get data and convert to dataframe
df = unexpected_index_query

# Save dataframe to excel file
df.to_excel(f'{path}/unexpected_index_query_variable_credit_card_amount_result_{date_now}.xlsx', sheet_name='Sheet1', index=False)

print(f'unexpected_index_query_variable_credit_card_amount_result_{date_now}.xlsx created')

unexpected_index_query_variable_credit_card_amount_result_2023-11-28.xlsx created


In [10]:
# Run the following command to get your Expectation Suite
print(validator.get_expectation_suite(discard_failed_expectations=False))

{
  "expectations": [
    {
      "kwargs": {
        "column": "credit_card_amount",
        "catch_exception": true
      },
      "meta": {},
      "expectation_type": "expect_column_values_to_equal_zero"
    }
  ],
  "expectation_suite_name": "postgres_v3b_customers_inner_join_orders_suite",
  "meta": {
    "great_expectations_version": "0.18.4"
  },
  "data_asset_type": null,
  "ge_cloud_id": null
}


In [11]:
# Run the following command to save your Expectation Suite (all the unique Expectation Configurations from each run of validator.expect_*) to your Expectation Store:
validator.save_expectation_suite("postgres_v3b_customers_inner_join_orders_great_expectations.json", discard_failed_expectations=False)

# Validate your data

You'll create and store a *Checkpoint* for your batch, which you can use to validate and run post-validation actions.

In [12]:
# Run the following command to create the Checkpoint configuration that uses your Data Context:
my_checkpoint_name = "postgres_v3b_customers_inner_join_orders_checkpoint"

checkpoint = Checkpoint(
    name=my_checkpoint_name,
    run_name_template="%Y%m%d-%H%M%S-postgres_v3b_customers_inner_join_orders_checkpoint",
    data_context=context,
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
    action_list=[
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {
            "name": "store_evaluation_params",
            "action": {"class_name": "StoreEvaluationParametersAction"},
        },
        {
            "name": "update_data_docs", 
            "action": {"class_name": "UpdateDataDocsAction"}
        },
    ],
)

The `store_validation_result` action saves your validation results from the Checkpoint run and allows the results to be persisted for future use. The `store_evaluation_params` store evaluation parameters from a validation result. The `update_data_docs` action builds Data Docs files for the validations run in the Checkpoint.

In [13]:
# Run the following command to save the Checkpoint:
context.add_or_update_checkpoint(checkpoint=checkpoint)

{
  "action_list": [
    {
      "name": "store_validation_result",
      "action": {
        "class_name": "StoreValidationResultAction"
      }
    },
    {
      "name": "store_evaluation_params",
      "action": {
        "class_name": "StoreEvaluationParametersAction"
      }
    },
    {
      "name": "update_data_docs",
      "action": {
        "class_name": "UpdateDataDocsAction"
      }
    }
  ],
  "batch_request": {
    "datasource_name": "postgres_v3b_customers_inner_join_orders_datasource",
    "data_asset_name": "postgres_v3b_customers_inner_join_orders_dataframe",
    "options": {}
  },
  "class_name": "Checkpoint",
  "config_version": 1.0,
  "evaluation_parameters": {},
  "expectation_suite_name": "postgres_v3b_customers_inner_join_orders_suite",
  "module_name": "great_expectations.checkpoint",
  "name": "postgres_v3b_customers_inner_join_orders_checkpoint",
  "profilers": [],
  "run_name_template": "%Y%m%d-%H%M%S-postgres_v3b_customers_inner_join_orders_checkpoint",


In [14]:
# Run the following command to run the Checkpoint and pass in your Batch Request (your data) and your Expectation Suite (your tests):
checkpoint_result = checkpoint.run()

Calculating Metrics: 0it [00:00, ?it/s]

In [15]:
# View the full Checkpoint configuration
print(checkpoint.get_config().to_yaml_str())

name: postgres_v3b_customers_inner_join_orders_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: '%Y%m%d-%H%M%S-postgres_v3b_customers_inner_join_orders_checkpoint'
expectation_suite_name: postgres_v3b_customers_inner_join_orders_suite
batch_request:
  datasource_name: postgres_v3b_customers_inner_join_orders_datasource
  data_asset_name: postgres_v3b_customers_inner_join_orders_dataframe
  options: {}
  batch_slice:
action_list:
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction
  - name: store_evaluation_params
    action:
      class_name: StoreEvaluationParametersAction
  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction
evaluation_parameters: {}
runtime_configuration: {}
validations: []
profilers: []
ge_cloud_id:
expectation_suite_ge_cloud_id:



# Build and view Data Docs

Your Checkpoint contained an `UpdateDataDocsAction`, so your Data Docs have already been built from the validation you ran and your Data Docs store contains a new rendered validation result.

In [20]:
# Run the following command to open your Data Docs and review the results of your Checkpoint run:
context.open_data_docs()