In [1]:
import great_expectations as gx
import pandas as pd

# Sample DataFrame
data = {
    "id": [1, 2, 3, 4],
    "age": [25, 30, 31, None],
    "name": ["Rohan", "Amit", "Neha", "Sam"]
}
df = pd.DataFrame(data)

# Context
context = gx.get_context()

# Data Source (Pandas)
data_source = context.data_sources.add_pandas(name="inventory_parts")

# Add Data Asset **with the dataframe directly**
data_asset = data_source.add_dataframe_asset(
    name="inventory_parts_asset"
)
assert type(context).__name__ == "EphemeralDataContext"

# Create Batch Definition
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    "inventory_parts_batch"
)

# Pass dataframe here
batch = batch_definition.get_batch(batch_parameters={"dataframe": df})

# Create Expectation Suite
suite = gx.ExpectationSuite(name="inventory_parts_suite")

suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="name"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="id"))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column="age"))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column="name"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="age"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
    column="age",
    min_value=18,
    max_value=60
))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column="name",
    type_="object"
))
suite.add_expectation(gx.expectations.ExpectTableColumnsToMatchOrderedList(
    column_list=["id", "age", "name"]
))

# Add suite
context.suites.add(suite)

# Validate
validation_results = batch.validate(suite)

# Clean output formatting (fix: use expectation_config.type)
print("\n===== Validation Summary =====")
print("Success:", validation_results.success)
for r in validation_results.results:
    print(f"{r.expectation_config.type} => {'PASSED' if r.success else 'FAILED'}")


Calculating Metrics: 100%|██████████| 26/26 [00:00<00:00, 1663.77it/s]


===== Validation Summary =====
Success: False
expect_column_values_to_not_be_null => PASSED
expect_column_to_exist => PASSED
expect_column_values_to_be_of_type => PASSED
expect_column_values_to_be_unique => PASSED
expect_column_to_exist => PASSED
expect_column_values_to_not_be_null => FAILED
expect_column_values_to_be_between => PASSED
expect_table_columns_to_match_ordered_list => PASSED





In [2]:
context.add_data_docs_site(
    site_name="local_site",
    site_config={
        "class_name": "SiteBuilder",
        "store_backend": {"class_name": "TupleFilesystemStoreBackend", "base_directory": "uncommitted/data_docs/local_site"},
        "renderer": {"module_name": "great_expectations.render.renderer"},
        "view": {"module_name": "great_expectations.render.view"},
    }
)


InvalidKeyError: Data Docs Site `local_site` already exists in the Data Context.

In [3]:
print(context.list_data_docs_sites())


{'local_site': {'class_name': 'SiteBuilder', 'show_how_to_buttons': True, 'store_backend': {'class_name': 'TupleFilesystemStoreBackend', 'base_directory': '/tmp/tmp99hunnrg'}, 'site_index_builder': {'class_name': 'DefaultSiteIndexBuilder'}}}


In [4]:
context.build_data_docs()


{'local_site': 'file:///tmp/tmp99hunnrg/index.html'}

In [None]:
context.open_data_docs()


gio: file:///tmp/tmp99hunnrg/index.html: Failed to find default application for content type ‘text/html’


In [None]:
from pyspark.sql import SparkSession 
 
spark = (
    SparkSession.builder
    .appName("PyDeequ_AdvancedChecks")
    .master("local[*]") 
    .getOrCreate()
)

product = spark.read.csv(
    "/home/rohan/brazilian-ecommerce/data/product_category_name_translation.csv" ,
    header=True,
    inferSchema=True
)

In [13]:
product.show()

+---------------------+-----------------------------+
|product_category_name|product_category_name_english|
+---------------------+-----------------------------+
|         beleza_saude|                health_beauty|
| informatica_acess...|         computers_accesso...|
|           automotivo|                         auto|
|      cama_mesa_banho|               bed_bath_table|
|     moveis_decoracao|              furniture_decor|
|        esporte_lazer|               sports_leisure|
|           perfumaria|                    perfumery|
| utilidades_domest...|                   housewares|
|            telefonia|                    telephony|
|   relogios_presentes|                watches_gifts|
|    alimentos_bebidas|                   food_drink|
|                bebes|                         baby|
|            papelaria|                   stationery|
| tablets_impressao...|         tablets_printing_...|
|           brinquedos|                         toys|
|       telefonia_fixa|     

In [14]:
context = gx.get_context()

# Data Source (Pandas)
data_source = context.data_sources.add_spark(name="spark_inventory_parts")

# Add Data Asset **with the dataframe directly**
data_asset = data_source.add_dataframe_asset(
    name="spark_inventory_parts_asset"
)
assert type(context).__name__ == "EphemeralDataContext"

# Create Batch Definition
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    "spark_inventory_parts_batch"
)

# Pass dataframe here
batch = batch_definition.get_batch(batch_parameters={"dataframe": product})

# Create Expectation Suite
suite = gx.ExpectationSuite(name="spark_inventory_parts_suite")

suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="name"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="id"))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column="product_category_name"))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column="product_category_name_english"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="age"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
    column="age",
    min_value=18,
    max_value=60
))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column="name",
    type_="object"
))
suite.add_expectation(gx.expectations.ExpectTableColumnsToMatchOrderedList(
    column_list=["id", "age", "name"]
))

# Add suite
context.suites.add(suite)

# Validate
validation_results = batch.validate(suite)

# Clean output formatting (fix: use expectation_config.type)
print("\n===== Validation Summary =====")
print("Success:", validation_results.success)
for r in validation_results.results:
    print(f"{r.expectation_config.type} => {'PASSED' if r.success else 'FAILED'}")

25/11/10 15:27:54 WARN CacheManager: Asked to cache already cached data.
Calculating Metrics:  16%|█▌        | 5/31 [00:00<00:00, 26.07it/s] 


===== Validation Summary =====
Success: False
expect_column_values_to_not_be_null => FAILED
expect_column_values_to_be_unique => FAILED
expect_column_values_to_not_be_null => FAILED
expect_column_values_to_be_between => FAILED
expect_column_values_to_be_of_type => FAILED
expect_column_to_exist => PASSED
expect_column_to_exist => PASSED
expect_table_columns_to_match_ordered_list => FAILED





In [None]:
import great_expectations as gx
from pyspark.sql import SparkSession 
 
spark = (
    SparkSession.builder
    .appName("spark_AdvancedChecks")
    .master("local[*]") 
    .getOrCreate()
)
 
data = [
    (1, "Alice", 23, "F", 50000),
    (2, "Bob", None, "M", 45000),
    (3, "Cathy", 35, "F", 62000),
    (4, "David", 17, "M", 29000),
    (5, "Eve", 29, "F", None),
]

columns = ["id", "name", "age", "gender", "salary"]
spark_df = spark.createDataFrame(data, columns)

context = gx.get_context()

# Data Source (Pandas)
data_source = context.data_sources.add_spark(name="spark_inventory_parts")

# Add Data Asset **with the dataframe directly**
data_asset = data_source.add_dataframe_asset(
    name="spark_inventory_parts_asset"
)
assert type(context).__name__ == "EphemeralDataContext"

# Create Batch Definition
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    "spark_inventory_parts_batch"
)

# Pass dataframe here
batch = batch_definition.get_batch(batch_parameters={"dataframe": spark_df})

# Create Expectation Suite
suite = gx.ExpectationSuite(name="spark_inventory_parts_suite")

suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="name"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="id"))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column="age"))
suite.add_expectation(gx.expectations.ExpectColumnToExist(column="name"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="age"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
    column="age",
    min_value=18,
    max_value=60
))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column="name",
    type_="object"
))
suite.add_expectation(gx.expectations.ExpectTableColumnsToMatchOrderedList(
    column_list=["id", "age", "name"]
))

# Add suite
context.suites.add(suite)

# Validate
validation_results = batch.validate(suite)

# Clean output formatting (fix: use expectation_config.type)
print("\n===== Validation Summary =====")
print("Success:", validation_results.success)
for r in validation_results.results:
    print(f"{r.expectation_config.type} => {'PASSED' if r.success else 'FAILED'}")

Calculating Metrics: 100%|██████████| 31/31 [00:01<00:00, 25.00it/s]


===== Validation Summary =====
Success: False
expect_column_values_to_not_be_null => PASSED
expect_column_to_exist => PASSED
expect_column_values_to_be_of_type => FAILED
expect_column_values_to_be_unique => PASSED
expect_column_to_exist => PASSED
expect_column_values_to_not_be_null => FAILED
expect_column_values_to_be_between => FAILED
expect_table_columns_to_match_ordered_list => FAILED





In [3]:
import great_expectations as gx
from pyspark.sql import SparkSession 
import yaml

# Initialize Spark
spark = (
    SparkSession.builder
    .appName("spark_AdvancedChecks")
    .master("local[*]") 
    .getOrCreate()
)

# Sample data
data = [
    (1, "Alice", 23, "F", 50000),
    (2, "Bob", None, "M", 45000),
    (3, "Cathy", 35, "F", 62000),
    (4, "David", 17, "M", 29000),
    (5, "Eve", 29, "F", None),
]

columns = ["id", "name", "age", "gender", "salary"]
spark_df = spark.createDataFrame(data, columns)

# Get Great Expectations context
context = gx.get_context()

# Method 1: Load suite from YAML file (now used as primary)
def load_suite_from_yaml(context, yaml_file_path):
    """Load expectation suite from YAML file"""
    with open(yaml_file_path, 'r') as file:
        suite_config = yaml.safe_load(file)
    
    # Create or update the suite
    try:
        suite = context.suites.add(gx.ExpectationSuite(**suite_config))
    except ValueError:
        # Suite might already exist, get it instead
        suite = context.suites.get(suite_config["name"])
    
    return suite

# Method 2: Create suite from YAML string (alternative, not used here)
def create_suite_from_yaml_string(context, yaml_string):
    """Create expectation suite from YAML string"""
    suite_config = yaml.safe_load(yaml_string)
    suite = context.suites.add(gx.ExpectationSuite(**suite_config))
    return suite

# Setup data source and batch
data_source = context.data_sources.add_spark(name="spark_inventory_parts")
data_asset = data_source.add_dataframe_asset(name="spark_inventory_parts_asset")

batch_definition = data_asset.add_batch_definition_whole_dataframe("spark_inventory_parts_batch")
batch = batch_definition.get_batch(batch_parameters={"dataframe": spark_df})

# Load suite from YAML file (ensure the file exists at the specified path)
suite = load_suite_from_yaml(context, "/home/rohan/brazilian-ecommerce/ge/spark_inventory_parts_suite.yml")

# Validate
validation_results = batch.validate(suite)

# Print results
print("\n===== Validation Summary =====")
print("Success:", validation_results.success)
for r in validation_results.results:
    print(f"{r.expectation_config.type} => {'PASSED' if r.success else 'FAILED'}")

# Stop Spark session
spark.stop()

Calculating Metrics: 100%|██████████| 51/51 [00:07<00:00,  6.63it/s]            



===== Validation Summary =====
Success: False
expect_column_to_exist => PASSED
expect_column_values_to_be_of_type => FAILED
expect_column_proportion_of_unique_values_to_be_between => PASSED
expect_column_to_exist => PASSED
expect_column_values_to_be_of_type => FAILED
expect_column_value_lengths_to_be_between => PASSED
expect_column_to_exist => PASSED
expect_column_values_to_be_of_type => FAILED
expect_column_values_to_be_between => FAILED
expect_column_to_exist => PASSED
expect_column_values_to_be_of_type => FAILED
expect_column_values_to_be_in_set => PASSED
expect_column_to_exist => PASSED
expect_column_values_to_be_of_type => FAILED
expect_column_values_to_be_between => PASSED
expect_table_row_count_to_be_between => PASSED
