In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
import random
from datetime import datetime, timedelta

# Initialize Spark session
spark = SparkSession.builder.appName("LargeDatasetWithNoneExample").getOrCreate()

# Define schema
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("order_amount", DoubleType(), True),
    StructField("order_status", StringType(), True)
])

df = spark.read.format('csv').load(r"data/orders.csv/",schema = schema,header=True)

df.show(5)


+--------+-----------+----------+------------+------------+
|order_id|customer_id|order_date|order_amount|order_status|
+--------+-----------+----------+------------+------------+
|       1|         23|2024-04-05|      426.48|     pending|
|       2|        316|2024-05-08|      483.77|     pending|
|       3|        721|2023-12-14|      291.96|     pending|
|       4|        481|2024-01-21|       53.94|   cancelled|
|       5|        847|2024-05-15|      472.08|   cancelled|
+--------+-----------+----------+------------+------------+
only showing top 5 rows



In [15]:
import great_expectations as ge
from great_expectations.dataset import SparkDFDataset
from great_expectations.core.batch import BatchRequest
import yaml

# Initialize a DataContext
context = ge.get_context()

# Add a PySpark datasource configuration
datasource_config = {
    "name": "pyspark_df_orders",
    "class_name": "Datasource",
    "execution_engine": {
        "module_name": "great_expectations.execution_engine",
        "class_name": "SparkDFExecutionEngine"
    },
    "data_connectors": {
        "default_runtime_data_connector_name": {
            "class_name": "RuntimeDataConnector",
            "batch_identifiers": ["default_identifier_name"]  # Provide a valid list for batch identifiers
        }
    }
}

# Add the datasource
context.add_datasource(**datasource_config)

# Create an expectation suite
expectation_suite_name = "01_ORDERS_TEST_SUITE"
suite = context.add_expectation_suite(expectation_suite_name)

You appear to be using a legacy capability with the latest config version (3.0).
    Your data context with this configuration version uses validation_operators, which are being deprecated.  Please consult the V3 API migration guide https://docs.greatexpectations.io/docs/guides/miscellaneous/migration_guide#migrating-to-the-batch-request-v3-api and update your configuration to be compatible with the version number 3.
    (This message will appear repeatedly until your configuration is updated.)



In [16]:
context = ge.get_context()

from great_expectations.core.batch import RuntimeBatchRequest

batch_request = RuntimeBatchRequest(
    datasource_name="my_spark_orders_datasource",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="df",
    runtime_parameters={"batch_data": df},
    batch_identifiers={"default_identifier_name": "default_identifier_name"}
)

You appear to be using a legacy capability with the latest config version (3.0).
    Your data context with this configuration version uses validation_operators, which are being deprecated.  Please consult the V3 API migration guide https://docs.greatexpectations.io/docs/guides/miscellaneous/migration_guide#migrating-to-the-batch-request-v3-api and update your configuration to be compatible with the version number 3.
    (This message will appear repeatedly until your configuration is updated.)



In [None]:
# Add expectations to the suite
suite = context.get_expectation_suite(expectation_suite_name)
suite.expectations = [
    {
        "expectation_type": "expect_column_to_exist",
        "kwargs": {"column": "order_id"}
    },
    {
        "expectation_type": "expect_column_to_exist",
        "kwargs": {"column": "customer_id"}
    },
    # Add more valid expectations below:
    {
        "expectation_type": "expect_table_row_count_to_be_between",
        "kwargs": {"min_value": 10, "max_value": 1000}
    },
    {
        "expectation_type": "expect_column_values_to_be_unique",
        "kwargs": {"column": "order_id"}
    },
    {
        "expectation_type": "expect_column_min_to_be_between",
        "kwargs": {"column": "order_amount", "min_value": 0, "max_value": 1000}
    },
    {
        "expectation_type": "expect_column_mean_to_be_between",
        "kwargs": {"column": "order_amount", "min_value": 100, "max_value": 500}
    },
    {
        "expectation_type": "expect_column_values_to_match_regex",
        "kwargs": {"column": "customer_id", "regex": "[0-9]*"}
    },
]
context.save_expectation_suite(suite, expectation_suite_name)


In [None]:
# Get a Validator for the batch request
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name
)

# Validate the data
validation_results = validator.validate()

# Print validation results
print(validation_results)

In [None]:
import json

data_context = ge.data_context.DataContext()

"""
In Great Expectations, a Validation Operator is a component that manages the validation of data and handles the results of those validations. It orchestrates the validation process, saving results, triggering actions based on the results, and generally managing the validation workflow.
"""

validation_results = data_context.run_validation_operator(
    "action_list_operator",
    assets_to_validate=[validator],
    run_id="my_run_id"
)

# Convert validation results to JSON
validation_results_json = validation_results.to_json_dict()

# Print validation results JSON
print(validation_results_json)

# Save validation results to a JSON file
with open("validation_results.json", "w") as json_file:
    json.dump(validation_results_json, json_file)

# Build Data Docs
data_context.build_data_docs()

# View Data Docs
data_context.open_data_docs()

In [30]:
passed_test_cases = []
for result in validation_results["results"]:
    if result["success"]:
        expectation_type = result["expectation_config"]["expectation_type"]
        passed_test_cases.append(expectation_type)

# Print information about passed test cases
if passed_test_cases:
    print("The following test cases passed successfully:")
    for expectation_type in passed_test_cases:
        print("- ", expectation_type)
else:
    print("No test cases passed successfully.")

The following test cases passed successfully:
-  expect_column_to_exist
-  expect_column_values_to_be_unique
-  expect_column_to_exist
-  expect_column_values_to_match_regex
-  expect_table_row_count_to_be_between
-  expect_column_min_to_be_between
-  expect_column_mean_to_be_between


In [32]:
# If all test cases passed, print a success message
if validation_results['success']:
    print("All test cases passed successfully.")
else:
    # If any test case failed, raise an exception
    raise Exception("Not all test cases passed. Please check the validation results and address the issues.")

All test cases passed successfully.


In [35]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [36]:
# Use case 1: Calculate the total order amount for each customer
total_order_amount_per_customer = df.groupBy("customer_id").agg(sum("order_amount").alias("total_order_amount"))
total_order_amount_per_customer.show()

+-----------+------------------+
|customer_id|total_order_amount|
+-----------+------------------+
|        858|            375.33|
|        243|            407.73|
|        883|            907.66|
|         65|             254.2|
|        879|            464.96|
|        588|           1098.02|
|        853|            618.22|
|        133|            708.42|
|        513|            749.76|
|        918|            236.09|
|        322|            115.82|
|        362|             137.3|
|        961|             804.9|
|        876|             44.33|
|        683|           1083.38|
|        108|            758.12|
|        530|            362.97|
|        126|            539.95|
|        115|           1218.51|
|        210|            407.12|
+-----------+------------------+
only showing top 20 rows



In [37]:
# Use case 2: Determine the number of orders by status
order_count_by_status = df.groupBy("order_status").agg(count("order_id").alias("order_count"))
order_count_by_status.show()

+------------+-----------+
|order_status|order_count|
+------------+-----------+
|   completed|        313|
|        NULL|         57|
|   cancelled|        326|
|     pending|        304|
+------------+-----------+



In [38]:
# Use case 3: Find the average order amount by order status
avg_order_amount_by_status = df.groupBy("order_status").agg(avg("order_amount").alias("avg_order_amount"))
avg_order_amount_by_status.show()

+------------+------------------+
|order_status|  avg_order_amount|
+------------+------------------+
|   completed|229.82463333333337|
|        NULL|257.70779999999996|
|   cancelled|264.07883495145626|
|     pending|251.38595238095238|
+------------+------------------+



In [39]:
# Use case 4: Identify the top 5 customers by total order amount
top_customers = total_order_amount_per_customer.orderBy(col("total_order_amount").desc()).limit(5)
top_customers.show()

+-----------+------------------+
|customer_id|total_order_amount|
+-----------+------------------+
|        485|           1722.69|
|        455|           1693.28|
|        365|            1388.1|
|          1|           1336.23|
|        478|           1324.34|
+-----------+------------------+

