In [1]:
import datetime
from pyspark.sql import SparkSession, DataFrame
import os
import json

from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.profile.json_schema_profiler import JsonSchemaProfiler
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
    DataContextConfig,
    FilesystemStoreBackendDefaults,
)


In [2]:
def main(root_directory: str = "/dbfs/great_expectations/") -> None:
    spark = SparkSession.builder.appName('TestGreatExpectations').enableHiveSupport().getOrCreate()
    df = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("s3a://confessions-of-a-data-guy/*divvy-tripdata.csv")

In [3]:
def prepare_ge_context(root_dir: str) -> BaseDataContext:
    data_context_config = DataContextConfig(
        store_backend_defaults=FilesystemStoreBackendDefaults(
            root_directory=root_dir
        ),
    )
    ge_context = BaseDataContext(project_config=data_context_config)
    return ge_context

In [6]:
def prepare_get_datasource(dname: str = 'DataFrame_Trips_Source') -> dict:
    ge_dataframe_datasource = {
        "name": dname,
        "class_name": "Datasource",
        "execution_engine": {"class_name": "SparkDFExecutionEngine"},
        "data_connectors": {
            "DataFrame_Trips_Data_Connector": {
                "module_name": "great_expectations.datasource.data_connector",
                "class_name": "RuntimeDataConnector",
                "batch_identifiers": [
                    "trips_source",
                    "divvy_bike_trips",
                ],
            }
        },
    }
    return ge_dataframe_datasource

In [4]:
root_directory = os.path.join ( os.getcwd() , "great_exp") 

In [5]:
ge_context = prepare_ge_context(root_directory)

In [7]:
ge_context.add_datasource(**prepare_get_datasource())

22/12/15 15:25:51 WARN Utils: Your hostname, wagner-desktop resolves to a loopback address: 127.0.1.1; using 192.168.0.12 instead (on interface enp5s0)
22/12/15 15:25:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/15 15:25:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<great_expectations.datasource.new_datasource.Datasource at 0x7fa52528a980>

In [8]:
def prepare_checkpoint() -> dict:
    ge_trip_data_checkpoint = "trip_check"
    checkpoint_config = {
        "name": ge_trip_data_checkpoint,
        "config_version": 1.0,
        "class_name": "SimpleCheckpoint",
        "run_name_template": "%Y%m%d-%H%M%S-trip-run",
    }
    return checkpoint_config

In [9]:
trips_check = prepare_checkpoint()
ge_context.add_checkpoint(**trips_check)


{
  "action_list": [
    {
      "name": "store_validation_result",
      "action": {
        "class_name": "StoreValidationResultAction"
      }
    },
    {
      "name": "store_evaluation_params",
      "action": {
        "class_name": "StoreEvaluationParametersAction"
      }
    },
    {
      "name": "update_data_docs",
      "action": {
        "class_name": "UpdateDataDocsAction",
        "site_names": []
      }
    }
  ],
  "batch_request": {},
  "class_name": "Checkpoint",
  "config_version": 1.0,
  "evaluation_parameters": {},
  "module_name": "great_expectations.checkpoint",
  "name": "trip_check",
  "profilers": [],
  "run_name_template": "%Y%m%d-%H%M%S-trip-run",
  "runtime_configuration": {},
  "validations": []
}

In [10]:
trips_expect = {
  "properties": {},
  "type": 'object',
  "data_asset_type": '',
  "expectation_suite_name": "bikes",
  "expectations": [
    {
      "expectation_type": "expect_table_columns_to_match_ordered_list",
      "kwargs": {
        "column_list": [
          "ride_id", "rideable_type", "started_at", "ended_at", "start_station_name", "start_station_id",
            "end_station_name", "end_station_id", "start_lat", "start_lng", "end_lat", "end_lng", "member_casual"
        ]
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_table_row_count_to_be_between",
      "kwargs": {
        "max_value": 1000000,
        "min_value": 1000
      },
      "meta": {}
    }
  ],
  "ge_cloud_id": '',
  "meta": {
    "citations": [
      {
        "batch_request": {
          "data_asset_name": "trip_data_batch",
          "data_connector_name": "DataFrame_Trips_Data_Connector",
          "datasource_name": "DataFrame_Trips_Source",
          "limit": 1000
        },
        "citation_date": "2022-06-02",
        "comment": "Created suite "
      }
    ],
    "great_expectations_version": "0.14.10"
  }
}

In [11]:
# create and save expectation suite
profiler = JsonSchemaProfiler()
suite = profiler.profile(trips_expect, "bikes")
ge_context.save_expectation_suite(suite)

In [13]:
def prepare_runtime_batch(df: DataFrame):
    batch_request = RuntimeBatchRequest(
        datasource_name="DataFrame_Trips_Source",
        data_connector_name="DataFrame_Trips_Data_Connector",
        data_asset_name="trip_data_batch",  # This can be anything that identifies this data_asset for you
        batch_identifiers={
            "trips_source": "trips_source",
            "divvy_bike_trips": "divvy_bike_trips",
        },
        runtime_parameters={"batch_data": df},  # Your dataframe goes here
    )
    return batch_request


def run_checkpoint(context, batch_request):
    checkpoint_result = context.run_checkpoint(
        checkpoint_name="trip_check",
        validations=[
            {
                "batch_request": batch_request,
                "expectation_suite_name": "bikes",
            }
        ],
    )
    return checkpoint_result

In [None]:
# Prepare Batch and Validate
trips_batch_request = prepare_runtime_batch(df)
validation_results = run_checkpoint(ge_context, trips_batch_request)
print(validation_results)