In [0]:
#%pip install great_expectations
#%pip install -U jsonschema

Python interpreter will be restarted.
Collecting great_expectations
  Downloading great_expectations-0.15.50-py3-none-any.whl (5.3 MB)
Collecting marshmallow<4.0.0,>=3.7.1
  Downloading marshmallow-3.19.0-py3-none-any.whl (49 kB)
Collecting makefun<2,>=1.7.0
  Downloading makefun-1.15.1-py2.py3-none-any.whl (22 kB)
Collecting tzlocal>=1.2
  Downloading tzlocal-4.2-py3-none-any.whl (19 kB)
Collecting colorama>=0.4.3
  Downloading colorama-0.4.6-py2.py3-none-any.whl (25 kB)
Collecting jsonpatch>=1.22
  Downloading jsonpatch-1.32-py2.py3-none-any.whl (12 kB)
Collecting altair<4.2.1,>=4.0.0
  Downloading altair-4.2.0-py3-none-any.whl (812 kB)
Collecting importlib-metadata>=1.7.0
  Downloading importlib_metadata-6.0.0-py3-none-any.whl (21 kB)
Collecting pydantic<2.0,>=1.10.4
  Downloading pydantic-1.10.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
Collecting ruamel.yaml<0.17.18,>=0.16
  Downloading ruamel.yaml-0.17.17-py3-none-any.whl (109 kB)
Collecting notebook>=6.4.

In [0]:
import datetime
import pytz
import pandas as pd
from ruamel import yaml
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import DataContextConfig, FilesystemStoreBackendDefaults
 
# Configurations for filestore location and site location
rootDirectory = "/dbfs/great_expectations/"
siteName = "site"
sitePath = "/dbfs/great_expectations/data_docs/"
 

filesystem = FilesystemStoreBackendDefaults(root_directory=rootDirectory)
dataContext = DataContextConfig(store_backend_defaults=filesystem,
                               data_docs_sites={
                                  siteName: {
                                      "class_name": "SiteBuilder",
                                      "store_backend": {
                                          "class_name": "TupleFilesystemStoreBackend",
                                          "base_directory": sitePath,
                                      },
                                      "site_index_builder": {
                                          "class_name": "DefaultSiteIndexBuilder",
                                      }, 
                                  }
                              },
)
context = BaseDataContext(project_config=dataContext)
 
# Configuration for datasource
my_spark_datasource_config = {
    "name": "databricksDataSource",
    "class_name": "Datasource",
    "execution_engine": {"class_name": "SparkDFExecutionEngine"},
    "data_connectors": {
        "databricksConnector": {
            "module_name": "great_expectations.datasource.data_connector",
            "class_name": "RuntimeDataConnector",
            "batch_identifiers": [
                "runId"
            ],
        }
    },
}
 
# Uncomment line below when needing to test the configuration (not needed for production)
# context.test_yaml_config(yaml.dump(my_spark_datasource_config))
 
# Adds configurations to be used
context.add_datasource(**my_spark_datasource_config)

Out[2]: <great_expectations.datasource.new_datasource.Datasource at 0x7f10c61787f0>

In [0]:
def getBatch(dataframe, testingCategory):
  return RuntimeBatchRequest(
      datasource_name="databricksDataSource",
      data_connector_name="databricksConnector",
      data_asset_name= testingCategory, 
      batch_identifiers={
          "runId": f"{datetime.datetime.now().strftime('%Y%m%d-%H%M%S-%f')}"
      },
      runtime_parameters={"batch_data": dataframe},
  )
  
def createExpecationSuite(expectation_suite):
  context.create_expectation_suite(expectation_suite_name=expectation_suite, overwrite_existing=True)
 
def getTestingSuite(dataframe, testingCategory, expectationSuite):
  createExpecationSuite(expectationSuite)
  batch = getBatch(df, testingCategory)
  return (context.get_validator(
    batch_request= batch,
    expectation_suite_name=expectationSuite
  ) , batch)
  
def runTests(tests, batch):
  runName = f"{datetime.datetime.now(pytz.timezone('America/New_York')).strftime('%Y-%m-%d_%H_%M_%S_EST')}"
  tests.save_expectation_suite(discard_failed_expectations=False)
 
  checkpoint_name = f"{tests.expectation_suite_name}_checkpoint"
  checkpoint_config = {
      "name": checkpoint_name,
      "config_version": 1.0,
      "class_name": "SimpleCheckpoint",
      "run_name_template": runName
  }

  context.add_checkpoint(**checkpoint_config)
  checkpoint_result = context.run_checkpoint(
      checkpoint_name=checkpoint_name,
      validations=[
          {
              "batch_request": batch,
              "expectation_suite_name": tests.expectation_suite_name,
          }
      ],
  )

In [0]:
df = spark.read.table("irs_group_catalog.silver.efile_core_2012")

In [0]:
# These tests are run at the table level, Testing Category can be layer such as bronze, silver, gold

tests, batch = getTestingSuite(dataframe = df, testingCategory = "SilverLayer", expectationSuite = "efile_core_2012")

In [0]:
#list of possible expectations 
#https://legacy.docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html

tests.expect_column_values_to_be_unique(
    column = "EIN",
    mostly = .99
)

for column in df.columns[0:25]:
    tests.expect_column_values_to_not_be_null(
    column = column,
    mostly= 0.95
    )


# Make sure column values are of certain data type    
tests.expect_column_values_to_be_of_type(
    column = "GROSSRECEIPTS",
    type_ = "DoubleType"
)

tests.expect_column_values_to_be_of_type(
    column = "EIN",
    type_ = "LongType"
)

# Make sure table row count is correct value

tests.expect_table_row_count_to_equal(
    value = 249983
)

# Does data follow valid zipcode format
tests.expect_column_values_to_match_regex(
    column = "ZIP", 
    regex="^\d{5}(?:[-\s]\d{4})?$",
    mostly=.9
)

#validate length of EIN
tests.expect_column_value_lengths_to_be_between(
    column = "EIN",
    min_value=8,
    max_value=9,
    mostly=0.95
) 

#Validate that column values are 
# in value Set
tests.expect_column_values_to_be_in_set(
    column = "TERMINATED",
    value_set=["Y", "N", "NA"],
)

tests.expect_column_values_to_be_in_set(
    column = "ORGDISSOLVED",
    value_set=["TRUE", "FALSE", "NA"]
)

#Compare values in two columns
#Column matching also possible
tests.expect_column_pair_values_A_to_be_greater_than_B(
    column_A="ENDYEAR",
    column_B="STYEAR"
)


Calculating Metrics:   0%|          | 0/10 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/12 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Out[14]: {
  "result": {
    "element_count": 249983,
    "unexpected_count": 93504,
    "unexpected_percent": 37.404143481756755,
    "partial_unexpected_list": [
      [
        "6/30/2013",
        "7/1/2012"
      ],
      [
        "6/30/2013",
        "7/1/2012"
      ],
      [
        "3/31/2013",
        "4/1/2012"
      ],
      [
        "6/30/2013",
        "7/1/2012"
      ],
      [
        "6/30/2013",
        "7/1/2012"
      ],
      [
        "3/31/2013",
        "4/1/2012"
      ],
      [
        "6/30/2013",
        "7/1/2012"
      ],
      [
        "5/31/2013",
        "6/1/2012"
      ],
      [
        "6/30/2013",
        "7/1/2012"
      ],
      [
        "5/31/2013",
        "6/1/2012"
      ],
      [
        "6/30/2013",
        "7/1/2012"
      ],
      [
        "7/31/2013",
        "8/1/2012"
      ],
      [
        "6/30/2013",
        "7/1/2012"
      ],
      [
        "6/30/2013",
        "7/1/2012"
      ],
      [
        "6/30/2013",
        "

In [0]:
runTests(tests, batch)

Calculating Metrics:   0%|          | 0/190 [00:00<?, ?it/s]

In [0]:
#Optional: add deloitte logo to tests with this code

def addDeloitteLogo():
    
    from bs4 import BeautifulSoup
    import os
    from pathlib import Path

    htmlPaths = []

    for path in Path('/dbfs/great_expectations/data_docs').rglob('*.html'):

        htmlPaths.append(path)

    for html_doc in htmlPaths:

        HtmlFile = open(html_doc, 'r', encoding='utf-8')

        soup = BeautifulSoup(HtmlFile, 'html.parser')
        img = soup.find_all('img', attrs={"class":"NO-CACHE"})[0]
        img_urls = img['src']
        img_urls = img_urls.replace(img_urls, "https://logos-download.com/wp-content/uploads/2016/10/Deloitte_logo_black-700x130.png")
        img['src'] = img_urls
        with open(html_doc, "w", encoding = 'utf-8') as file:

        # prettify the soup object and convert it into a string  
            file.write(str(soup.prettify()))
            
addDeloitteLogo()

In [0]:
dbutils.fs.ls("s3://great-expectations-packages")

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
[0;32m<command-3398616893997182>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdbutils[0m[0;34m.[0m[0mfs[0m[0;34m.[0m[0mls[0m[0;34m([0m[0;34m"s3://great-expectations-packages"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/dbutils.py[0m in [0;36mf_with_exception_handling[0;34m(*args, **kwargs)[0m
[1;32m    360[0m                     [0mexc[0m[0;34m.[0m[0m__context__[0m [0;34m=[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[1;32m    361[0m                     [0mexc[0m[0;34m.[0m[0m__cause__[0m [0;34m=[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 362[0;31m                     [0;32mraise[0m [0mexc[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m    363[0m [0;34m[0m[0m
[1;32m    364[0m             [0;32mr

In [0]:
dbutils.fs.ls("/greatExpectationsPackages/")

Out[12]: [FileInfo(path='dbfs:/greatExpectationsPackages', name='greatExpectationsPackages', size=5294514, modificationTime=1677616689000)]