In [18]:
import great_expectations as ge

In [19]:
import pyspark

In [20]:
from pyspark.sql import SparkSession

In [21]:
spark=SparkSession.builder.appName('ge').getOrCreate()

In [22]:
spark.sparkContext.setLogLevel("WARN")

In [23]:
spark

In [24]:
from pyspark.sql.functions import *

In [25]:
df=spark.read.option('header','true').csv('opening_balance_raw_20221116.csv')

In [26]:
df.show()

+-----------------+----------------+---+------+-------------------+
|               ID|            name|age|gender|              email|
+-----------------+----------------+---+------+-------------------+
|                1|     Peter Poole| 30|  Male|      po@zalowpo.az|
|                2|   Herbert Payne| 46|Female|      zos@toilet.bn|
|                3|      Callie Roy| 57|  Male|kapkapor@reckeho.ye|
|                4|  Isabella Wolfe| 61|Female| atuigome@asedas.io|
|                5|     Sara Knight| 53|Female| dudnukob@ucboki.tg|
|                6| Raymond Herrera| 23|  Male|       tolpij@ni.lu|
|                7|  Jeffery Brooks| 65|  Male|   anirus@ajjuhu.mw|
|                8|  Lillie Barnett| 55|  Male|      besi@vodje.hr|
|                9|    Emma Alvarez| 21|  Male|          ze@ebo.jm|
|               10|  Estella Rhodes| 27|  Male|         ez@jewe.si|
|               11|     Eugene West| 18|Female| hohobum@janucke.vn|
|               12|     Ollie Blake| 57|  Male| 

In [27]:
import datetime
 

from ruamel import yaml
from great_expectations import *
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
    DataContextConfig,
    FilesystemStoreBackendDefaults,
)
 
root_directory = r"C:\Users\hp\Desktop\ge1"

In [28]:
#create Data Context
data_context_config = DataContextConfig(
    store_backend_defaults=FilesystemStoreBackendDefaults(
        root_directory=root_directory
    ),
)
context = BaseDataContext(project_config=data_context_config)

In [29]:
my_spark_datasource_config = {
    "name": "insert_your_datasource_name_here",
    "class_name": "Datasource",
    "execution_engine": {"class_name": "SparkDFExecutionEngine"},
    "data_connectors": {
        "insert_your_data_connector_name_here": {
            "module_name": "great_expectations.datasource.data_connector",
            "class_name": "RuntimeDataConnector",
            "batch_identifiers": [
                "some_key_maybe_pipeline_stage",
                "some_other_key_maybe_run_id",
            ],
        }
    },
}
context.test_yaml_config(yaml.dump(my_spark_datasource_config))
 
context.add_datasource(**my_spark_datasource_config)
 
batch_request = RuntimeBatchRequest(
    datasource_name="insert_your_datasource_name_here",
    data_connector_name="insert_your_data_connector_name_here",
    data_asset_name="<YOUR_MEANGINGFUL_NAME>",  # This can be anything that identifies this data_asset for you
    batch_identifiers={
        "some_key_maybe_pipeline_stage": "prod",
        "some_other_key_maybe_run_id": f"my_run_name_{datetime.date.today().strftime('%Y%m%d')}",
    },
    runtime_parameters={"batch_data": df},  
)
expectation_suite_name = "insert_your_expectation_suite_name_here"
context.create_expectation_suite(
    expectation_suite_name=expectation_suite_name, overwrite_existing=True
)
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
)

Attempting to instantiate class from config...
	Instantiating as a Datasource, since class_name is Datasource
	Successfully instantiated Datasource


ExecutionEngine class name: SparkDFExecutionEngine
Data Connectors:
	insert_your_data_connector_name_here:RuntimeDataConnector

	Available data_asset_names (0 of 0):
		Note : RuntimeDataConnector will not have data_asset_names until they are passed in through RuntimeBatchRequest

	Unmatched data_references (0 of 0): []



In [16]:
# Databricks notebook source
import json
import re  # regular expressions

# !!! This giant block of imports should be something simpler, such as:
from great_expectations import *
from great_expectations.execution_engine import (
    PandasExecutionEngine,
    SparkDFExecutionEngine,
    SqlAlchemyExecutionEngine,
)
from great_expectations.expectations.expectation import (
    ColumnMapExpectation,
    Expectation,
    ExpectationConfiguration,
)
from great_expectations.expectations.metrics import (
    ColumnMapMetricProvider,
    column_condition_partial,
)
from great_expectations.expectations.registry import (
    _registered_expectations,
    _registered_metrics,
    _registered_renderers,
)
from great_expectations.expectations.util import render_evaluation_parameter_string
from great_expectations.render.renderer.renderer import renderer
from great_expectations.render.types import RenderedStringTemplateContent
from great_expectations.render.util import num_to_str, substitute_none_for_missing
from great_expectations.validator.validator import Validator

EMAIL_REGEX = r"[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,7}$"


class ColumnValuesContainValidEmail(ColumnMapMetricProvider):
    # This is the id string that will be used to reference your metric.
    condition_metric_name = "column_values.valid_email"
    condition_value_keys = ()

    # This method defines the business logic for evaluating your metric when using a PandasExecutionEngine
    @column_condition_partial(engine=PandasExecutionEngine)
    def _pandas(cls, column, **kwargs):
        def matches_email_regex(x):
            if re.match(EMAIL_REGEX, str(x)):
                return True
            return False

        return column.apply(lambda x: matches_email_regex(x) if x else False)

    # This method defines the business logic for evaluating your metric when using a SqlAlchemyExecutionEngine
    #     @column_condition_partial(engine=SqlAlchemyExecutionEngine)
    #     def _sqlalchemy(cls, column, _dialect, **kwargs):
    #         return column.in_([3])

    # This method defines the business logic for evaluating your metric when using a SparkDFExecutionEngine
    @column_condition_partial(engine=SparkDFExecutionEngine)
    def _spark(cls, column, **kwargs):
        return column.rlike(EMAIL_REGEX)


# This class defines the Expectation itself
# The main business logic for calculation lives here.
class ExpectColumnValuesToContainValidEmail(ColumnMapExpectation):
    # These examples will be shown in the public gallery, and also executed as unit tests for your Expectation
    examples = [
        {
            "data": {
                "fail_case_1": ["a123@something", "a123@something.", "a123."],
                "fail_case_2": ["aaaa.a123.co", "aaaa.a123.", "aaaa.a123.com"],
                "fail_case_3": ["aaaa@a123.e", "aaaa@a123.a", "aaaa@a123.d"],
                "fail_case_4": ["@a123.com", "@a123.io", "@a123.eu"],
                "pass_case_1": [
                    "a123@something.com",
                    "vinod.km@something.au",
                    "this@better.work",
                ],
                "pass_case_2": [
                    "example@website.dom",
                    "ex.ample@example.ex",
                    "great@expectations.email",
                ],
                "valid_emails": [
                    "Janedoe@company.org",
                    "someone123@stuff.net",
                    "mycompany@mycompany.com",
                ],
                "bad_emails": ["Hello, world!", "Sophia", "this should fail"],
            },
            "tests": [
                {
                    "title": "negative_test_for_no_domain_name",
                    "exact_match_out": False,
                    "include_in_gallery": True,
                    "in": {"column": "fail_case_1"},
                    "out": {
                        "success": False,
                        "unexpected_index_list": [0, 1, 2],
                        "unexpected_list": [
                            "a123@something",
                            "a123@something.",
                            "a123.",
                        ],
                    },
                },
                {
                    "title": "negative_test_for_no_at_symbol",
                    "exact_match_out": False,
                    "include_in_gallery": True,
                    "in": {"column": "fail_case_2"},
                    "out": {
                        "success": False,
                        "unexpected_index_list": [0, 1, 2],
                        "unexpected_list": [
                            "aaaa.a123.co",
                            "aaaa.a123.",
                            "aaaa.a123.com",
                        ],
                    },
                },
                {
                    "title": "negative_test_for_ending_with_one_character",
                    "exact_match_out": False,
                    "include_in_gallery": True,
                    "in": {"column": "fail_case_3"},
                    "out": {
                        "success": False,
                        "unexpected_index_list": [0, 1, 2],
                        "unexpected_list": [
                            "aaaa@a123.e",
                            "aaaa@a123.a",
                            "aaaa@a123.d",
                        ],
                    },
                },
                {
                    "title": "negative_test_for_emails_with_no_leading_string",
                    "exact_match_out": False,
                    "include_in_gallery": True,
                    "in": {"column": "fail_case_4"},
                    "out": {
                        "success": False,
                        "unexpected_index_list": [0, 1, 2],
                        "unexpected_list": [
                            "aaaa@a123.e",
                            "aaaa@a123.a",
                            "aaaa@a123.d",
                        ],
                    },
                },
                {
                    "title": "pass_test",
                    "exact_match_out": False,
                    "include_in_gallery": True,
                    "in": {"column": "pass_case_1"},
                    "out": {
                        "success": True,
                        "unexpected_index_list": [],
                        "unexpected_list": [],
                    },
                },
                {
                    "title": "pass_test",
                    "exact_match_out": False,
                    "include_in_gallery": True,
                    "in": {"column": "pass_case_2"},
                    "out": {
                        "success": True,
                        "unexpected_index_list": [],
                        "unexpected_list": [],
                    },
                },
                {
                    "title": "valid_emails",
                    "exact_match_out": False,
                    "include_in_gallery": True,
                    "in": {"column": "valid_emails"},
                    "out": {
                        "success": True,
                        "unexpected_index_list": [],
                        "unexpected_list": [],
                    },
                },
                {
                    "title": "invalid_emails",
                    "exact_match_out": False,
                    "include_in_gallery": True,
                    "in": {"column": "bad_emails"},
                    "out": {
                        "success": False,
                        "unexpected_index_list": [0, 1, 2],
                        "unexpected_list": [
                            "Hello, world!",
                            "Sophia",
                            "this should fail",
                        ],
                    },
                },
            ],
        }
    ]

    # This dictionary contains metadata for display in the public gallery
    library_metadata = {
        "maturity": "experimental",
        "tags": ["experimental", "column map expectation"],
        "contributors": [  # Github
            "@aworld1",
            "@enagola",
            "@spencerhardwick",
            "@vinodkri1",
            "@degulati",
            "@ljohnston931",
            "@rexboyce",
            "@lodeous",
            "@sophiarawlings",
            "@vtdangg",
        ],
    }

    # This is the id string of the Metric used by this Expectation.
    # For most Expectations, it will be the same as the `condition_metric_name` defined in your Metric class above.
    map_metric = "column_values.valid_email"

    # This is a list of parameter names that can affect whether the Expectation evaluates to True or False
    # Please see {some doc} for more information about domain and success keys, and other arguments to Expectations
    success_keys = ()

    # This dictionary contains default values for any parameters that should have default values
    default_kwarg_values = {}

    # This method defines a question Renderer
    # For more info on Renderers, see {some doc}
    # !!! This example renderer should render RenderedStringTemplateContent, not just a string


#     @classmethod
#     @renderer(renderer_type="renderer.question")
#     def _question_renderer(
#         cls, configuration, result=None, language=None, runtime_configuration=None
#     ):
#         column = configuration.kwargs.get("column")
#         mostly = configuration.kwargs.get("mostly")

#         return f'Do at least {mostly * 100}% of values in column "{column}" equal 3?'

# This method defines an answer Renderer
# !!! This example renderer should render RenderedStringTemplateContent, not just a string
#     @classmethod
#     @renderer(renderer_type="renderer.answer")
#     def _answer_renderer(
#         cls, configuration=None, result=None, language=None, runtime_configuration=None
#     ):
#         column = result.expectation_config.kwargs.get("column")
#         mostly = result.expectation_config.kwargs.get("mostly")
#         regex = result.expectation_config.kwargs.get("regex")
#         if result.success:
#             return f'At least {mostly * 100}% of values in column "{column}" equal 3.'
#         else:
#             return f'Less than {mostly * 100}% of values in column "{column}" equal 3.'

# This method defines a prescriptive Renderer
#     @classmethod
#     @renderer(renderer_type="renderer.prescriptive")
#     @render_evaluation_parameter_string
#     def _prescriptive_renderer(
#         cls,
#         configuration=None,
#         result=None,
#         language=None,
#         runtime_configuration=None,
#         **kwargs,
#     ):
# !!! This example renderer should be shorter
#         runtime_configuration = runtime_configuration or {}
#         include_column_name = runtime_configuration.get("include_column_name", True)
#         include_column_name = (
#             include_column_name if include_column_name is not None else True
#         )
#         styling = runtime_configuration.get("styling")
#         params = substitute_none_for_missing(
#             configuration.kwargs,
#             ["column", "regex", "mostly", "row_condition", "condition_parser"],
#         )

#         template_str = "values must be equal to 3"
#         if params["mostly"] is not None:
#             params["mostly_pct"] = num_to_str(
#                 params["mostly"] * 100, precision=15, no_scientific=True
#             )
#             # params["mostly_pct"] = "{:.14f}".format(params["mostly"]*100).rstrip("0").rstrip(".")
#             template_str += ", at least $mostly_pct % of the time."
#         else:
#             template_str += "."

#         if include_column_name:
#             template_str = "$column " + template_str

#         if params["row_condition"] is not None:
#             (
#                 conditional_template_str,
#                 conditional_params,
#             ) = parse_row_condition_string_pandas_engine(params["row_condition"])
#             template_str = conditional_template_str + ", then " + template_str
#             params.update(conditional_params)

#         return [
#             RenderedStringTemplateContent(
#                 **{
#                     "content_block_type": "string_template",
#                     "string_template": {
#                         "template": template_str,
#                         "params": params,
#                         "styling": styling,
#                     },
#                 }
#             )
#         ]

if __name__ == "__main__":
    ExpectColumnValuesToContainValidEmail().print_diagnostic_checklist()

# COMMAND ----------




Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\self_check\util.py", line 2774, in evaluate_json_test_v3_api
    check_json_test_result(
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\self_check\util.py", line 2941, in check_json_test_result
    assert result["result"]["unexpected_list"] == value, (
AssertionError: expected ['aaaa@a123.a', 'aaaa@a123.d', 'aaaa@a123.e'] but got ['@a123.com', '@a123.eu', '@a123.io']



Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\self_check\util.py", line 2774, in evaluate_json_test_v3_api
    check_json_test_result(
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\self_check\util.py", line 2878, in check_json_test_result
    assert result["success"] == value, f"{result['success']} != {value}"
AssertionError: False != True



Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 421, in resolve_metrics
    ] = self.resolve_metric_bundle(metric_fn_bundle=metric_fn_bundle)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 725, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 421, in resolve_metrics
    ] = self.resolve_metric_bundle(metric_fn_bundle=metric_fn_bundle)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 725, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 421, in resolve_metrics
    ] = self.resolve_metric_bundle(metric_fn_bundle=metric_fn_bundle)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 725, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 421, in resolve_metrics
    ] = self.resolve_metric_bundle(metric_fn_bundle=metric_fn_bundle)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 725, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 421, in resolve_metrics
    ] = self.resolve_metric_bundle(metric_fn_bundle=metric_fn_bundle)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 725, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 421, in resolve_metrics
    ] = self.resolve_metric_bundle(metric_fn_bundle=metric_fn_bundle)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 725, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 421, in resolve_metrics
    ] = self.resolve_metric_bundle(metric_fn_bundle=metric_fn_bundle)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 725, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\execution_engine.py", line 421, in resolve_metrics
    ] = self.resolve_metric_bundle(metric_fn_bundle=metric_fn_bundle)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\execution_engine\sparkdf_execution_engine.py", line 725, in resolve_metric_bundle
    res = df.agg(*aggregate["column_aggregates"]).collect()
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\hp\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\hp\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred

In [30]:
validator.expect_column_values_to_contain_valid_email(column="email")

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

{
  "meta": {},
  "success": false,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "result": {
    "element_count": 100,
    "unexpected_count": 1,
    "unexpected_percent": 1.0,
    "partial_unexpected_list": [
      "bocojheb@ulpeti.co.uk"
    ],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 1.0,
    "unexpected_percent_nonmissing": 1.0
  }
}

In [31]:
import great_expectations as gx
import os
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
from great_expectations.render.renderer import *
from great_expectations.render.view import DefaultJinjaPageView
from great_expectations.data_context.util import *
from IPython.display import display
from IPython.core.display import HTML

In [32]:
data_context_config = DataContextConfig(
    validation_operators={
        "action_list_operator": {
            "class_name": "ActionListValidationOperator",
            "action_list": [
                {
                    "name": "store_validation_result",
                    "action": {"class_name": "StoreValidationResultAction"},
                },
                {
                    "name": "store_evaluation_params",
                    "action": {"class_name": "StoreEvaluationParametersAction"},
                },
                {
                    "name": "update_data_docs",
                    "action": {"class_name": "UpdateDataDocsAction"},
                },
            ],
        }
    },
    store_backend_defaults=FilesystemStoreBackendDefaults(
        root_directory=root_directory
    ),
)
context = BaseDataContext(project_config=data_context_config)

In [33]:
my_spark_datasource_config = {
    "name": "insert_your_datasource_name_here",
    "class_name": "Datasource",
    "execution_engine": {"class_name": "SparkDFExecutionEngine"},
    "data_connectors": {
        "insert_your_data_connector_name_here": {
            "module_name": "great_expectations.datasource.data_connector",
            "class_name": "RuntimeDataConnector",
            "batch_identifiers": [
                "some_key_maybe_pipeline_stage",
                "some_other_key_maybe_run_id",
            ],
        }
    },
}
 
context.test_yaml_config(yaml.dump(my_spark_datasource_config))
 
context.add_datasource(**my_spark_datasource_config)
 
batch_request = RuntimeBatchRequest(
    datasource_name="insert_your_datasource_name_here",
    data_connector_name="insert_your_data_connector_name_here",
    data_asset_name="<YOUR_MEANGINGFUL_NAME>",  # This can be anything that identifies this data_asset for you
    batch_identifiers={
        "some_key_maybe_pipeline_stage": "prod",
        "some_other_key_maybe_run_id": f"my_run_name_{datetime.date.today().strftime('%Y%m%d')}",
    },
    runtime_parameters={"batch_data": df},  # Your dataframe goes here
)
 
expectation_suite_name = "insert_your_expectation_suite_name_here"
context.create_expectation_suite(
    expectation_suite_name=expectation_suite_name, overwrite_existing=True
)
# validator = context.get_validator(
#     batch_request=batch_request,
#     expectation_suite_name=expectation_suite_name,
# )

Attempting to instantiate class from config...
	Instantiating as a Datasource, since class_name is Datasource
	Successfully instantiated Datasource


ExecutionEngine class name: SparkDFExecutionEngine
Data Connectors:
	insert_your_data_connector_name_here:RuntimeDataConnector

	Available data_asset_names (0 of 0):
		Note : RuntimeDataConnector will not have data_asset_names until they are passed in through RuntimeBatchRequest

	Unmatched data_references (0 of 0): []



{
  "meta": {
    "great_expectations_version": "0.15.36"
  },
  "expectation_suite_name": "insert_your_expectation_suite_name_here",
  "data_asset_type": null,
  "expectations": [],
  "ge_cloud_id": null
}

In [35]:
suit_lst = []
col_expectations = {"expect_column_values_to_not_be_null":{"column":"ID"},"expect_compound_columns_to_be_unique":{"column":"ID"},"expect_column_values_to_be_of_type":{"column":"name","datatype":"str"},"expect_column_values_to_be_in_set":{"column":"gender","value_set":"['Male','Female']","expect_column_values_to_contain_valid_email":{"column":"email"}}}

from great_expectations.core import ExpectationSuite, ExpectationConfiguration
for k,v in col_expectations.items():
    expectation_configuration = ExpectationConfiguration(
    expectation_type =k, kwargs = v
    )
    suit_lst.append(expectation_configuration)

suite = ExpectationSuite("custom_expec_test",expectations=suit_lst)

validator_list = []
validator = context.get_validator(batch_request= batch_request, expectation_suite= suite)

validator.save_expectation_suite(discard_failed_expectations=False)

validator_list.append(validator)
results = context.run_validation_operator("action_list_operator",assets_to_validate=validator_list)

print(results)

Calculating Metrics:   0%|          | 0/19 [00:00<?, ?it/s]

An unexpected Exception occurred during data docs rendering.  Because of this error, certain parts of data docs will not be rendered properly and/or may not appear altogether.  Please use the trace, included in this message, to diagnose and repair the underlying issue.  Detailed information follows:
        TypeError: "object of type 'NoneType' has no len()".  Traceback: "Traceback (most recent call last):
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\render\renderer\content_block\content_block.py", line 107, in _render_list
    result = content_block_fn(
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\render\renderer\renderer.py", line 14, in inner_func
    return renderer_fn(*args, **kwargs)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\expectations\expectation.py", line 118, in inner_func
    ] = render_func(*args, **kwargs)
  File "C:\Users\hp\anaconda3\lib\site-packages\great_expectations\expectations\core\expect_compoun

{
  "success": false,
  "run_id": {
    "run_time": "2022-12-14T12:07:16.670834+00:00",
    "run_name": "20221214T120716.670834Z"
  },
  "run_results": {
    "ValidationResultIdentifier::custom_expec_test/20221214T120716.670834Z/20221214T120716.670834Z/68a74190e9f8d93771db1ff953848ddd": {
      "validation_result": {
        "meta": {
          "great_expectations_version": "0.15.36",
          "expectation_suite_name": "custom_expec_test",
          "run_id": {
            "run_time": "2022-12-14T12:07:16.670834+00:00",
            "run_name": "20221214T120716.670834Z"
          },
          "batch_spec": {
            "data_asset_name": "<YOUR_MEANGINGFUL_NAME>",
            "batch_data": "SparkDataFrame"
          },
          "batch_markers": {
            "ge_load_time": "20221214T120716.490848Z"
          },
          "active_batch_definition": {
            "datasource_name": "insert_your_datasource_name_here",
            "data_connector_name": "insert_your_data_connector_name_