## Himanshu, MDS202327

#### Installing required packages

In [0]:
%%capture
%pip install great_expectations

In [0]:
# Import required libraries
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.core.batch import BatchRequest
from great_expectations.exceptions import *
import os
import pandas as pd
from pyspark.sql import SparkSession



In [0]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

#### Read data into data frame

In [0]:
# Read CSV into a Spark DataFrame
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()
file_path = "dbfs:/FileStore/tables/adult-5.csv"
spark_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)

#### Add proper column names for better analysis

In [0]:
spark_df.show(5)

+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|education|education_num|    marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9|          Divorced|Handlers-cleaners|Not-in-family|White|  Male|           0|           0|            40| United-States| <=50K|
| 53|         Private|234721|     11th|            7|Married-civ-spouse|Handlers-cleaners|     

#### Creating a Data Source

In [0]:
# Get the context
context = gx.get_context()

# Add a Spark datasource
datasource_name = "databricks_source"
datasource = context.data_sources.add_or_update_spark(name=datasource_name)

# Create a dataframe asset
asset_name = "adult_data"
asset = datasource.add_dataframe_asset(name=asset_name)

# Create a batch definition
batch_definition_name = "my_batch_definition"
batch_definition = asset.add_batch_definition_whole_dataframe(batch_definition_name)

# Get the batch definition
batch_definition = (
    context.data_sources.get(datasource_name)
    .get_asset(asset_name)
    .get_batch_definition(batch_definition_name)
)

# Now use get_batch with batch_parameters
batch_parameters = {"dataframe": spark_df}
batch = batch_definition.get_batch(batch_parameters=batch_parameters)

#### Creating an Expectation Suite

In [0]:
# Create an Expectation Suite
expectation_suite_name = "adult_data_quality_suite"
suite = gx.ExpectationSuite(name=expectation_suite_name)

# Add the Expectation Suite to the Context
context.suites.add_or_update(suite)

Out[8]: {
  "name": "adult_data_quality_suite",
  "id": "a52ce41a-23e8-4fb4-bece-4acf384bffde",
  "expectations": [],
  "meta": {
    "great_expectations_version": "1.3.14"
  },
  "notes": null
}

#### Create a Validator

In [0]:
# Create a validator using the batch directly
validator = context.get_validator(
    batch=batch,
    expectation_suite_name=expectation_suite_name
)

In [0]:
print(validator.head())

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

   age         workclass  fnlwgt  education  education_num  \
0   50  Self-emp-not-inc   83311  Bachelors             13   
1   38           Private  215646    HS-grad              9   
2   53           Private  234721       11th              7   
3   28           Private  338409  Bachelors             13   
4   37           Private  284582    Masters             14   

       marital_status         occupation   relationship   race     sex  \
0  Married-civ-spouse    Exec-managerial        Husband  White    Male   
1            Divorced  Handlers-cleaners  Not-in-family  White    Male   
2  Married-civ-spouse  Handlers-cleaners        Husband  Black    Male   
3  Married-civ-spouse     Prof-specialty           Wife  Black  Female   
4  Married-civ-spouse    Exec-managerial           Wife  White  Female   

   capital_gain  capital_loss  hours_per_week native_country income  
0             0             0              13  United-States  <=50K  
1             0             0             

#### Check for order of columns

In [0]:
validator.expect_table_columns_to_match_ordered_list(
    column_list=spark_df.columns,
)

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

Out[11]: {
  "success": true,
  "expectation_config": {
    "type": "expect_table_columns_to_match_ordered_list",
    "kwargs": {
      "batch_id": "databricks_source-adult_data",
      "column_list": [
        "age",
        "workclass",
        "fnlwgt",
        "education",
        "education_num",
        "marital_status",
        "occupation",
        "relationship",
        "race",
        "sex",
        "capital_gain",
        "capital_loss",
        "hours_per_week",
        "native_country",
        "income"
      ]
    },
    "meta": {}
  },
  "result": {
    "observed_value": [
      "age",
      "workclass",
      "fnlwgt",
      "education",
      "education_num",
      "marital_status",
      "occupation",
      "relationship",
      "race",
      "sex",
      "capital_gain",
      "capital_loss",
      "hours_per_week",
      "native_country",
      "income"
    ]
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
   

#### Check for null values

In [0]:
validator.expect_column_values_to_not_be_null(column="income")

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Out[12]: {
  "success": true,
  "expectation_config": {
    "type": "expect_column_values_to_not_be_null",
    "kwargs": {
      "batch_id": "databricks_source-adult_data",
      "column": "income"
    },
    "meta": {}
  },
  "result": {
    "element_count": 32560,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": []
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [0]:
# Examine what columns are available in our dataset
column_list = spark_df.columns
print(f"Columns in the dataset: {column_list}")

Columns in the dataset: ['age', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_per_week', 'native_country', 'income']


####Function to run expectations

In [0]:
# Function to run expectation and print result without widget
def run_expectation(expectation_function, **kwargs):
    # Run the expectation
    result = expectation_function(**kwargs)
    
    # Get expectation type safely - handle different result structures
    expectation_type = "Unknown"
    if hasattr(result, 'expectation_config'):
        # Try different attribute names that might contain the type
        if hasattr(result.expectation_config, 'expectation_type'):
            expectation_type = result.expectation_config.expectation_type
        elif hasattr(result.expectation_config, 'kwargs') and 'expectation_type' in result.expectation_config.kwargs:
            expectation_type = result.expectation_config.kwargs['expectation_type']
        elif hasattr(result.expectation_config, 'expectation_context'):
            expectation_type = result.expectation_config.expectation_context
        elif hasattr(result.expectation_config, 'type'):
            expectation_type = result.expectation_config.type
    
    # If we still can't find it, try to derive from the function name
    if expectation_type == "Unknown" and hasattr(expectation_function, '__name__'):
        expectation_type = expectation_function.__name__
    
    # Print simple result
    status = "[[PASSED]]" if hasattr(result, 'success') and result.success else "[[FAILED]]"
    print(f"Validation: {status}")
    
    # Print basic metrics if available
    if hasattr(result, 'result') and isinstance(result.result, dict):
        metrics = []
        for key in ['unexpected_count', 'unexpected_percent', 'observed_value']:
            if key in result.result:
                metrics.append(f"{key}: {result.result[key]}")
        if metrics:
            print("" + ", ".join(metrics))
    
    return

#### Add basic expectations for data structure

In [0]:
%%capture captured_output
run_expectation(validator.expect_table_columns_to_match_ordered_list, column_list=column_list)

In [0]:
print(captured_output.stdout)

Validation: [[PASSED]]
observed_value: ['age', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_per_week', 'native_country', 'income']



In [0]:
%%capture captured_output
run_expectation(validator.expect_table_row_count_to_be_between, min_value=1000, max_value=100000)

In [0]:
print(captured_output.stdout)

Validation: [[PASSED]]
observed_value: 32560



#### Add expectations for age column

In [0]:
%%capture captured_output
if "age" in column_list:
    run_expectation(validator.expect_column_values_to_be_between, 
                   column="age", min_value=16, max_value=100)
    run_expectation(validator.expect_column_values_to_not_be_null, 
                   column="age")

In [0]:
print(captured_output.stdout)

Validation: [[PASSED]]
unexpected_count: 0, unexpected_percent: 0.0
Validation: [[PASSED]]
unexpected_count: 0, unexpected_percent: 0.0



#### Add expectations for education column

In [0]:
%%capture captured_output
if "education" in column_list:
    education_categories = [
        "Bachelors", "HS-grad", "Some-college", "Masters", "Doctorate", 
        "Prof-school", "11th", "10th", "9th", "12th", "Assoc-voc", 
        "Assoc-acdm", "7th-8th", "1st-4th", "5th-6th", "Preschool"
    ]
    run_expectation(validator.expect_column_values_to_be_in_set, 
                   column="education", value_set=education_categories)
    run_expectation(validator.expect_column_values_to_not_be_null, 
                   column="education")

In [0]:
print(captured_output.stdout)

Validation: [[PASSED]]
unexpected_count: 0, unexpected_percent: 0.0
Validation: [[PASSED]]
unexpected_count: 0, unexpected_percent: 0.0



#### Add expectations for income column

In [0]:
%%capture captured_output
if "income" in column_list:
    income_categories = ["<=50K", ">50K"]
    run_expectation(validator.expect_column_values_to_be_in_set, 
                   column="income", value_set=income_categories)
    run_expectation(validator.expect_column_values_to_not_be_null, 
                   column="income")

In [0]:
print(captured_output.stdout)

Validation: [[PASSED]]
unexpected_count: 0, unexpected_percent: 0.0
Validation: [[PASSED]]
unexpected_count: 0, unexpected_percent: 0.0



#### Add expectation for workclass column

In [0]:
%%capture captured_output
if "workclass" in column_list:
    workclass_categories = [
        "Private", "Self-emp-not-inc", "Local-gov", "State-gov", 
        "Self-emp-inc", "Federal-gov", "Without-pay", "Never-worked", "?"
    ]
    run_expectation(validator.expect_column_values_to_be_in_set, 
                   column="workclass", value_set=workclass_categories)

In [0]:
print(captured_output.stdout)

Validation: [[PASSED]]
unexpected_count: 0, unexpected_percent: 0.0



#### Statistical expectations to quantile expectations - check individual percentiles

In [0]:
%%capture captured_output
if "age" in column_list:
    # Check median (50th percentile)
    run_expectation(validator.expect_column_median_to_be_between,
                   column="age", min_value=30, max_value=50)
    
    # Check min and max as proxies for percentiles
    run_expectation(validator.expect_column_min_to_be_between,
                   column="age", min_value=16, max_value=25)
    run_expectation(validator.expect_column_max_to_be_between,
                   column="age", min_value=70, max_value=100)

In [0]:
print(captured_output.stdout)

Validation: [[PASSED]]
observed_value: 37.0
Validation: [[PASSED]]
observed_value: 17
Validation: [[PASSED]]
observed_value: 90



#### Cross-column analysis

In [0]:
%%capture captured_output
if "education" in column_list and "income" in column_list:
    # Alternative direct approach without row_condition
    print("\n=== Higher Education vs Income Analysis ===")
    
    # Create a crosstab analysis
    education_income_df = spark_df.crosstab("education", "income")
    
    # Show the raw crosstab data
    education_income_rows = education_income_df.collect()
    
    # Format results for better readability
    print("Education Level | <=50K | >50K | % Higher Income")
    print("------------------------------------------------")
    
    for row in education_income_rows:
        education = row["education_income"]
        if education in ["Bachelors", "Masters", "Doctorate", "Prof-school"]:
            # Get counts for each income level
            low_income = int(row["<=50K"]) if "<=50K" in row else 0
            high_income = int(row[">50K"]) if ">50K" in row else 0
            total = low_income + high_income
            
            # Calculate percentage
            if total > 0:
                high_income_pct = (high_income / total) * 100
                meets_expectation = "[[PASSED]]" if high_income_pct >= 50 else "[[FAILED]]"
                print(f"{education:15} | {low_income:5} | {high_income:4} | {high_income_pct:.2f}% {meets_expectation}")

In [0]:
print(captured_output.stdout)


=== Higher Education vs Income Analysis ===
Education Level | <=50K | >50K | % Higher Income
------------------------------------------------
Masters         |   764 |  959 | 55.66% [[PASSED]]
Bachelors       |  3133 | 2221 | 41.48% [[FAILED]]
Doctorate       |   107 |  306 | 74.09% [[PASSED]]
Prof-school     |   153 |  423 | 73.44% [[PASSED]]



In [0]:

%%capture captured_output

# Run full validation and print summary instead of building docs
results = validator.validate()
print("\n=== Validation Summary ===")
print(f"Validation successful: {results.success}")
print(f"Expectations evaluated: {len(results.results)}")
print(f"Expectations passed: {sum(1 for r in results.results if r.success)}")
print(f"Expectations failed: {sum(1 for r in results.results if not r.success)}")

# Print detailed failures
print("\n=== Failed Expectations ===")
for result in results.results:
    if not result.success:
        print(f"- {result.expectation_config.expectation_type}")
        if hasattr(result, 'result') and isinstance(result.result, dict):
            if 'unexpected_count' in result.result:
                print(f"  Unexpected values: {result.result['unexpected_count']} ({result.result.get('unexpected_percent', 'N/A')}%)")
            if 'partial_unexpected_list' in result.result:
                sample = result.result['partial_unexpected_list'][:5]
                print(f"  Sample unexpected values: {sample}")

In [0]:
print(captured_output.stdout)


=== Validation Summary ===
Validation successful: True
Expectations evaluated: 12
Expectations passed: 12
Expectations failed: 0

=== Failed Expectations ===



#### Save and Validate


In [0]:
%%capture captured_output
# Save the expectation suite
validator.save_expectation_suite(discard_failed_expectations=False)

# Run validation
results = validator.validate()

# Check validation results
print(f"Validation successful: {results.success}")

# Get detailed information about failures
if not results.success:
    for result in results.results:
        if not result.success:
            print(f"Failed expectation: {result.expectation_config.expectation_type}")
            print(f"Details: {result.result}")


In [0]:
print(captured_output.stdout)




#### Generate Data Documentation

In [0]:
# Build Data Docs to create HTML documentation
context.build_data_docs()


Out[35]: {'local_site': 'file:///databricks/driver/gx/uncommitted/data_docs/local_site/index.html'}

#### Create Checkpoints for Future Validation

In [0]:
%%capture captured_output

# Build a batch request from your existing batch definition
batch_request = {
    "datasource_name": "databricks_source",
    "data_asset_name": "adult_data",
    "batch_definition_name": "my_batch_definition",
    "batch_parameters": {"dataframe": spark_df}
}
try: 
    # Convert your ephemeral context to a file-backed context
    # file_context = context.convert_to_file_context()

    # Create a checkpoint for future validation
    checkpoint = context.add_or_update_checkpoint(
        name="adult_data_checkpoint",
        validations=[
            {
                "batch_request": batch_request,
                "expectation_suite_name": "adult_data_quality_suite",
            },
        ],
    )

    # checkpoint_result = checkpoint.run()
    print(f"Checkpoint validation successful: {checkpoint_result.success}")
except:
    print("An exception occurred")
