# Create a new spark Datasource

In [118]:
import os
from ruamel import yaml
import great_expectations as gx
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource, check_if_datasource_name_exists
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.exceptions import DataContextError
from great_expectations.data_context.util import file_relative_path
from great_expectations.validator.validator import Validator

from great_expectations.checkpoint import SimpleCheckpoint

In [103]:
# instantiate the great expectations context. This is very similar to activating a session 
context = gx.get_context()
gx_data_context = gx.data_context.DataContext()

## Customize Your Datasource Configuration

**If you are new to Great Expectations Datasources,** you should check out our [how-to documentation](https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview)

**My configuration is not so simple - are there more advanced options?**
Glad you asked! Datasources are versatile. Please see our [How To Guides](https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/connect_to_data_overview)!

Give your datasource a unique name:

In [104]:
datasource_yaml = rf"""
name: adidas_retail_sales
class_name: Datasource
execution_engine:
    class_name: SparkDFExecutionEngine
data_connectors:
    default_runtime_data_connector_name:
        class_name: RuntimeDataConnector
        batch_identifiers:
            - default_identifier_name
    default_inferred_data_connector_name:
        class_name: InferredAssetFilesystemDataConnector
        base_directory:  ../data/
        default_regex:
            group_names:
                - data_asset_name
            pattern: (.*)\.csv
"""

context.test_yaml_config(yaml_config=datasource_yaml)
context.add_datasource(**yaml.load(datasource_yaml))


Attempting to instantiate class from config...
	Instantiating as a Datasource, since class_name is Datasource
	Successfully instantiated Datasource


ExecutionEngine class name: SparkDFExecutionEngine
Data Connectors:
	default_inferred_data_connector_name : InferredAssetFilesystemDataConnector

	Available data_asset_names (2 of 2):
		adidas_us_retail_sales_data-converted (1 of 1): ['adidas_us_retail_sales_data-converted.csv']
		adidas_us_retail_sales_data-raw (1 of 1): ['adidas_us_retail_sales_data-raw.csv']

	Unmatched data_references (0 of 0):[]

	default_runtime_data_connector_name:RuntimeDataConnector

	Available data_asset_names (0 of 0):
		Note : RuntimeDataConnector will not have data_asset_names until they are passed in through RuntimeBatchRequest

	Unmatched data_references (0 of 0): []



<great_expectations.datasource.new_datasource.Datasource at 0x7fabe006ffd0>

# Connect to our Data 

#### RuntimeDataConnector and RuntimeBatchRequest
* If we analyzed the yaml the configured yaml file above we can see that under `data_connectors` we have our class_name set as RuntimeDataConnector. This is a special kind of connector used to enable RuntimeBatchRequest, that allow you to use in_memory dataframes, sql queries, and even a file path to use are your data source when validating the expectations. 
* Below we will the configuration required to process an in_memory dataframe used to pass this request into the context validator. 

In [105]:
# here you could actually pass all of the file inside of the file path you provide under runtime paramters. 
# we just want to use the converted version 
from pyspark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate()

suite_name = "adidas_test_suite"

in_memory_df = spark.read.csv("../../data/adidas_us_retail_sales_data-converted.csv", header=True, sep=',')

path = "../../data/adidas_us_retail_sales_data-converted.csv"

runtime_batch_request = RuntimeBatchRequest(
    datasource_name="adidas_retail_sales",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="adidas_retail_sales",  # This can be anything that identifies this data_asset for you
    runtime_parameters={"path": path},  # Add your path here.
    batch_identifiers={"default_identifier_name": "adidas.retail_sales"},
    batch_spec_passthrough={
        "reader_method": "csv", 
        "reader_options": {"sep": ",", "header": True},
    }
)

# Create Expectation Suite 
### What are we doing? 
I am using spark to connect to our filesystem and wanted to provide a set of paramater to my batch request. The RuntimeBatchRequest is not as limited as a BatchRequest. We provide similar paramters as BatchRequest but can also apply the reader method, and options in our request. I found this highly useful, since there are time where we are expecting files with column names, and not just default index value names when validating data sources. 

In [106]:
# validator replaces the batch in v3 version 
# first we create our suite

context.create_expectation_suite(
    expectation_suite_name = suite_name, 
    overwrite_existing=True
)

validator = context.get_validator(
    batch_request=runtime_batch_request, 
    expectation_suite_name=suite_name
)

validator_df = validator.active_batch.data.dataframe
print(validator_df.count())
column_names = [column for column in validator.columns()]
print(column_names)

validator_df.limit(5).show(truncate=False)

9648


Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

['invoice_date', 'retailer', 'retailer_id', 'region', 'state', 'city', 'product', 'price_per_unit', 'units_sold', 'total_sales', 'operating_profit', 'operating_margin', 'sales_method']
+------------+-----------+-----------+---------+--------+--------+-------------------------+--------------+----------+-----------+----------------+----------------+------------+
|invoice_date|retailer   |retailer_id|region   |state   |city    |product                  |price_per_unit|units_sold|total_sales|operating_profit|operating_margin|sales_method|
+------------+-----------+-----------+---------+--------+--------+-------------------------+--------------+----------+-----------+----------------+----------------+------------+
|2020-01-01  |Foot Locker|1185732    |Northeast|New York|New York|Men's Street Footwear    |50.0          |1200.0    |600000.0   |300000.0        |50.0            |In-store    |
|2020-01-02  |Foot Locker|1185732    |Northeast|New York|New York|Men's Athletic Footwear  |50.0       

In [107]:
# access our suite
suite = validator._expectation_suite
print(vars(suite))

{'data_asset_type': None, 'evaluation_parameters': {}, 'expectation_suite_name': 'adidas_test_suite', 'ge_cloud_id': None, 'expectations': [], 'meta': {'great_expectations_version': '0.15.43'}, '_data_context': {
  "anonymous_usage_statistics": {
    "explicit_id": true,
    "explicit_url": false,
    "enabled": true,
    "usage_statistics_url": "https://stats.greatexpectations.io/great_expectations/v1/usage_statistics",
    "data_context_id": "3faccf3c-3a91-4315-a9b8-c8646e5a54dd"
  },
  "checkpoint_store_name": "checkpoint_store",
  "config_variables_file_path": "uncommitted/config_variables.yml",
  "config_version": 3.0,
  "data_docs_sites": {
    "local_site": {
      "class_name": "SiteBuilder",
      "show_how_to_buttons": true,
      "store_backend": {
        "class_name": "TupleFilesystemStoreBackend",
        "base_directory": "uncommitted/data_docs/local_site/"
      },
      "site_index_builder": {
        "class_name": "DefaultSiteIndexBuilder"
      }
    }
  },
  "dataso

In [122]:
# expectation to expect all columns to be in the order display in the the column list under the kwargs property. 
column_list_expectation_configs = ExpectationConfiguration(
   expectation_type="expect_table_columns_to_match_ordered_list",
   kwargs={
      "column_list": ['invoice_date', 'retailer', 'retailer_id', 
                      'region', 'state', 'city', 'product', 'price_per_unit',
                      'units_sold', 'total_sales', 'operating_profit', 
                      'operating_margin', 'sales_method']
   },
   meta={
      "notes": {
         "format": "markdown",
         "content": "columns must appear in this order"
      }
   }
)

suite.add_expectation(expectation_configuration=column_list_expectation_configs)

{"kwargs": {"column_list": ["invoice_date", "retailer", "retailer_id", "region", "state", "city", "product", "price_per_unit", "units_sold", "total_sales", "operating_profit", "operating_margin", "sales_method"]}, "expectation_type": "expect_table_columns_to_match_ordered_list", "meta": {"notes": {"format": "markdown", "content": "columns must appear in this order"}}}

In [123]:
# expectations to expect all values to not be null 
not_null_column_expectation_configs = ExpectationConfiguration(
   expectation_type="expect_column_values_to_not_be_null",
   kwargs={
      "column": "retailer_id",
      "mostly": 1.0,
   },
   meta={
      "notes": {
         "format": "markdown",
         "content": "We need all sales records to have a retailer id"
      }
   }
)
suite.add_expectation(expectation_configuration=not_null_column_expectation_configs)

{"kwargs": {"column": "retailer_id", "mostly": 1.0}, "expectation_type": "expect_column_values_to_not_be_null", "meta": {"notes": {"format": "markdown", "content": "We need all sales records to have a retailer id"}}}

In [None]:
expectation_configs = ExpectationConfiguration(
   expectation_type="expect_column_distinct_values_to_be_in_set",
   kwargs={
      "column": "retailers",
       
   },
   meta={
      "notes": {
         "format": "markdown",
         "content": "We need all sales records to have a retailer id"
      }
   }
)
suite.add_expectation(expectation_configuration=expectation_configs)

In [111]:
validator.save_expectation_suite(discard_failed_expectations=False)

In [112]:
# we can access the already configured expectations like this or develop some more complex expectations ourselves. 
print(vars(validator))

{'_data_context': {
  "anonymous_usage_statistics": {
    "explicit_id": true,
    "explicit_url": false,
    "enabled": true,
    "usage_statistics_url": "https://stats.greatexpectations.io/great_expectations/v1/usage_statistics",
    "data_context_id": "3faccf3c-3a91-4315-a9b8-c8646e5a54dd"
  },
  "checkpoint_store_name": "checkpoint_store",
  "config_variables_file_path": "uncommitted/config_variables.yml",
  "config_version": 3.0,
  "data_docs_sites": {
    "local_site": {
      "class_name": "SiteBuilder",
      "show_how_to_buttons": true,
      "store_backend": {
        "class_name": "TupleFilesystemStoreBackend",
        "base_directory": "uncommitted/data_docs/local_site/"
      },
      "site_index_builder": {
        "class_name": "DefaultSiteIndexBuilder"
      }
    }
  },
  "datasources": {
    "adidas_retail_sales": {
      "data_connectors": {
        "default_runtime_data_connector_name": {
          "name": "default_runtime_data_connector_name",
          "class_name

In [116]:
print(validator_df.select('retailer').distinct().count())

6


In [120]:
# build the checkpoint configuration
checkpoint_config = {
    "class_name": "SimpleCheckpoint", 
    "validations":[
        {
            "batch_request":runtime_batch_request, 
            "expectation_suite_name": suite_name
        }
    ]
}

checkpoint = SimpleCheckpoint(
    f"_tmp_checkpoint_{suite_name}", 
    gx_data_context, 
    **checkpoint_config
)

checkpoint_result = checkpoint.run()

context.build_data_docs()

validation_result_identifier = checkpoint_result.list_validation_result_identifiers()[0]

gx_data_context.open_data_docs(resource_identifier=validation_result_identifier)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]