In [13]:
import os
from dotenv import load_dotenv
import great_expectations as ge
from great_expectations.checkpoint import Checkpoint
from great_expectations.core.batch import BatchRequest
from datahub.integrations.great_expectations.action import DataHubValidationAction

# Load environment variables from .env file
load_dotenv()


True

In [14]:

# Step 1: Get the secrets from environment variables
pg_connection_string = os.getenv("PG_CONNECTION_STRING")
datahub_token = os.getenv("DATAHUB_TOKEN")
datahub_server_url = os.getenv("DATAHUB_SERVER_URL")


In [15]:

# Step 2: Configure the PostgreSQL Datasource with environment variable
context = ge.get_context()

datasource_config = {
    "name": "pg_datasource",
    "class_name": "Datasource",
    "execution_engine": {
        "class_name": "SqlAlchemyExecutionEngine",
        "module_name": "great_expectations.execution_engine",
        "connection_string": pg_connection_string,
    },
    "data_connectors": {
        "default_inferred_data_connector_name": {
            "class_name": "InferredAssetSqlDataConnector",
            "module_name": "great_expectations.datasource.data_connector",
            "include_schema_name": True,
            "name": "default_inferred_data_connector",
        }
    },
}

context.add_datasource(**datasource_config)


<great_expectations.datasource.new_datasource.Datasource at 0x25f147e2c20>

In [16]:

# Step 3: Create an Expectation Suite
expectation_suite_name = "test_suite"
context.create_expectation_suite(expectation_suite_name, overwrite_existing=True)


{
  "expectation_suite_name": "test_suite",
  "ge_cloud_id": null,
  "expectations": [],
  "data_asset_type": null,
  "meta": {
    "great_expectations_version": "0.15.50"
  }
}

In [17]:

# Step 4: Build a BatchRequest for your table in PostgreSQL
batch_request = BatchRequest(
    datasource_name="pg_datasource",
    data_connector_name="default_inferred_data_connector_name",
    data_asset_name="public.upload_solar_20240726093401",  # Schema + table name
    limit=1000,  # Optional: limit the number of rows retrieved
)


In [18]:

# Step 5: Create a Validator and load the batch for validation
validator = context.get_validator(batch_request=batch_request, expectation_suite_name=expectation_suite_name)

# Add a simple validation
validator.expect_column_values_to_be_between("power", min_value=0, max_value=1000)

# Save the validation result
validator.save_expectation_suite(discard_failed_expectations=False)


Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

In [19]:

# Step 6: Register the custom DataHub action with token from the environment variable
custom_actions = {
    "send_to_datahub": {
        "class_name": "DataHubValidationAction",
        "module_name": "datahub.integrations.great_expectations.action",
        "server_url": datahub_server_url,
        "token": datahub_token,
    }
}


In [20]:

# Step 7: Define a Checkpoint that will use the custom action
checkpoint_config = {
    "name": "my_pg_checkpoint",
    "config_version": 1.0,
    "class_name": "Checkpoint",
    "run_name_template": "%Y-%m-%d-%H-%M-%S-my-checkpoint",
    "expectation_suite_name": expectation_suite_name,
    "batch_request": batch_request,
    "action_list": [
        {
            "name": "store_validation_result",
            "action": {
                "class_name": "StoreValidationResultAction",
            },
        },
        {
            "name": "store_evaluation_params",
            "action": {
                "class_name": "StoreEvaluationParametersAction",
            },
        },
        {
            "name": "send_to_datahub",
            "action": custom_actions["send_to_datahub"],
        },
    ],
}

context.add_checkpoint(**checkpoint_config)


{
  "action_list": [
    {
      "name": "store_validation_result",
      "action": {
        "class_name": "StoreValidationResultAction"
      }
    },
    {
      "name": "store_evaluation_params",
      "action": {
        "class_name": "StoreEvaluationParametersAction"
      }
    },
    {
      "name": "send_to_datahub",
      "action": {
        "class_name": "DataHubValidationAction",
        "module_name": "datahub.integrations.great_expectations.action",
        "server_url": "http://localhost:8080",
        "token": "eyJhbGciOiJIUzI1NiJ9.eyJhY3RvclR5cGUiOiJVU0VSIiwiYWN0b3JJZCI6ImRhdGFodWIiLCJ0eXBlIjoiUEVSU09OQUwiLCJ2ZXJzaW9uIjoiMiIsImp0aSI6IjMxYTA5MGYzLWEyYWItNDg4ZC04NTYzLWZkZjc4MDdjZGEwYiIsInN1YiI6ImRhdGFodWIiLCJpc3MiOiJkYXRhaHViLW1ldGFkYXRhLXNlcnZpY2UifQ.aqFK4kkAakjaj_sXGTOOvml6388UUla3Xsjnv_5uzhs"
      }
    }
  ],
  "batch_request": {
    "datasource_name": "pg_datasource",
    "data_connector_name": "default_inferred_data_connector_name",
    "data_asset_name": "public.

In [21]:

# Step 8: Run the Checkpoint
context.run_checkpoint(checkpoint_name="my_pg_checkpoint")


Calculating Metrics:   0%|          | 0/13 [00:00<?, ?it/s]

Datasource pg_datasource is not present in platform_instance_map


{
  "run_id": {
    "run_name": "2024-10-15-13-52-11-my-checkpoint",
    "run_time": "2024-10-15T13:52:11.451628+00:00"
  },
  "run_results": {
    "ValidationResultIdentifier::test_suite/2024-10-15-13-52-11-my-checkpoint/20241015T135211.451628Z/bdbccb8766e556f19e843cd2f9024e4c": {
      "validation_result": {
        "success": true,
        "results": [
          {
            "success": true,
            "expectation_config": {
              "expectation_type": "expect_column_values_to_be_between",
              "kwargs": {
                "column": "power",
                "max_value": 1000,
                "min_value": 0,
                "batch_id": "bdbccb8766e556f19e843cd2f9024e4c"
              },
              "meta": {}
            },
            "result": {
              "element_count": 2024,
              "unexpected_count": 0,
              "unexpected_percent": 0.0,
              "partial_unexpected_list": [],
              "missing_count": 0,
              "missing_perc