## Automate Data Quality Checks with Great Expectations
**Introduction**: In this activity, you will learn how to automate data quality checks using the Great Expectations framework. This includes setting up expectations and generating validation reports.

### Task 1: Setup and Initial Expectations

1. Objective: Set up Great Expectations and create initial expectations for a dataset.
2. Steps:
    - Install Great Expectations using pip.
    - Initialize a data context.
    - Create basic expectations on a sample dataset.
    - Eg., Implement a basic setup and expectation for column presence and type.

In [None]:
# Write your code from here

### Task 2: Validate Datasets and Generate Reports

1. Objective: Validate a dataset against defined expectations and generate a report.
2. Steps:
    - Execute the validation process on the dataset.
    - Review the validation results and generate a report.
    - Eg., Validate completeness and consistency expectations, and view the results.


In [None]:
# Write your code from here

### Task 3: Advanced Expectations and Scheduling

1. Objective: Create advanced expectations for conditional checks and automate the validation.
2. Steps:
    - Define advanced expectations based on complex conditions.
    - Use scheduling tools to automate periodic checks.
    - E.g., an expectation that customer IDs must be unique and schedule a daily check.

In [None]:
# Write your code from here

In [1]:
# This script demonstrates how to automate data quality checks using Great Expectations.
# It covers setting up a data context, defining expectations, validating data,
# and generating data quality reports.

# --- Prerequisites ---
# Before running this code, you need to install Great Expectations.
# Open your terminal or command prompt and run:
# pip install great_expectations pandas

import pandas as pd
import great_expectations as ge
from great_expectations.data_context import DataContext
import os
import shutil

# --- Task 1: Setup and Initial Expectations ---

print("--- Task 1: Setting up Great Expectations and Initial Expectations ---")

# Define a directory for the Great Expectations project.
# This will create a 'great_expectations' subdirectory in your current working directory.
ge_project_dir = "my_ge_project"

# Clean up previous GE project if it exists for a fresh start
if os.path.exists(ge_project_dir):
    print(f"Removing existing Great Expectations project directory: {ge_project_dir}")
    shutil.rmtree(ge_project_dir)

# Initialize a Great Expectations data context.
# This creates the 'great_expectations' directory with configuration files.
# The `ge.data_context.DataContext.create()` method is used for programmatic initialization.
# For interactive setup, you would typically run `great_expectations init` in the terminal.
print(f"Initializing Great Expectations data context in '{ge_project_dir}'...")
context = ge.data_context.DataContext.create(project_dir=ge_project_dir)
print("Great Expectations data context initialized.")

# Create a sample dataset using Pandas
print("\nCreating a sample dataset...")
data = {
    'customer_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 11], # Duplicate customer_id 1
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace', 'Heidi', 'Ivan', 'Judy', 'Alice', 'Kyle'],
    'email': ['alice@example.com', 'bob@example.com', 'charlie@example.com', 'david@example.com', 'eve@example.com',
              'frank@example.com', 'grace@example.com', 'heidi@example.com', 'ivan@example.com', 'judy@example.com',
              'alice@example.com', 'kyle@example.com'],
    'age': [25, 30, 35, 40, 28, 32, 29, 45, 22, 38, 25, None], # Missing age for Kyle
    'city': ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix', 'Philadelphia', 'San Antonio', 'San Diego', 'Dallas', 'San Jose', 'New York', 'Austin'],
    'registration_date': ['2023-01-15', '2022-03-20', '2023-07-01', '2021-11-10', '2023-02-28',
                          '2022-09-05', '2023-04-12', '2021-06-30', '2023-05-18', '2022-01-01',
                          '2023-01-15', '2023-10-25'],
    'order_count': [5, 12, 8, 20, 3, 15, 7, 25, 2, 10, 5, 6],
    'is_active': [True, True, False, True, True, False, True, True, True, False, True, True]
}
df = pd.DataFrame(data)
print(df.head())

# Add the DataFrame to a Great Expectations Data Source.
# Here we use an in-memory Pandas DataFrame as a data source.
# In a real-world scenario, you might connect to a database or files.
datasource_name = "my_pandas_datasource"
data_asset_name = "customer_data"

# Configure the datasource in the context
context.add_datasource(
    name=datasource_name,
    class_name="PandasDatasource",
    batch_spec_passthrough={"reader_method": "dataframe"},
)

# Get a batch of data from the datasource
# This is how Great Expectations interacts with your data for defining expectations.
batch_request = ge.core.batch_request.BatchRequest(
    datasource_name=datasource_name,
    data_asset_name=data_asset_name,
    data_connector_name="default_runtime_data_connector",
    data_connector_query={"batch_filter_parameters": {"batch_data": df}},
)
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="customer_data_suite"
)

print("\nCreating basic expectations for the dataset...")
# Expectation 1: `customer_id` column exists
validator.expect_column_to_exist("customer_id")
# Expectation 2: `name` column exists
validator.expect_column_to_exist("name")
# Expectation 3: `age` column exists
validator.expect_column_to_exist("age")
# Expectation 4: `email` column exists
validator.expect_column_to_exist("email")
# Expectation 5: `city` column exists
validator.expect_column_to_exist("city")
# Expectation 6: `registration_date` column exists
validator.expect_column_to_exist("registration_date")
# Expectation 7: `order_count` column exists
validator.expect_column_to_exist("order_count")
# Expectation 8: `is_active` column exists
validator.expect_column_to_exist("is_active")


# Expectation 9: `customer_id` column values are integers
validator.expect_column_values_to_be_of_type("customer_id", "int")
# Expectation 10: `name` column values are strings
validator.expect_column_values_to_be_of_type("name", "str")
# Expectation 11: `age` column values are integers (allowing nulls for now)
validator.expect_column_values_to_be_of_type("age", "int")
# Expectation 12: `email` column values are strings
validator.expect_column_values_to_be_of_type("email", "str")
# Expectation 13: `city` column values are strings
validator.expect_column_values_to_be_of_type("city", "str")
# Expectation 14: `registration_date` column values match a date format
validator.expect_column_values_to_match_strftime_format("registration_date", "%Y-%m-%d")
# Expectation 15: `order_count` column values are integers
validator.expect_column_values_to_be_of_type("order_count", "int")
# Expectation 16: `is_active` column values are boolean
validator.expect_column_values_to_be_of_type("is_active", "bool")


# Save the expectation suite
validator.save_expectation_suite(discard_failed_expectations=False)
print("Initial expectation suite 'customer_data_suite' saved.")

# --- Task 2: Validate Datasets and Generate Reports ---

print("\n--- Task 2: Validating Datasets and Generating Reports ---")

# Execute the validation process
print("Executing validation process...")
# You can define a Checkpoint to run validations and build Data Docs.
# A Checkpoint is a configuration that bundles a set of validations.
checkpoint_name = "customer_data_checkpoint"
context.add_checkpoint(
    name=checkpoint_name,
    validator=validator, # Use the validator created earlier
    # You can also specify batch_request here if you want to reuse the checkpoint
    # for different batches of data.
    action_list=[
        {
            "name": "store_validation_result",
            "action_class": "StoreValidationResultAction",
        },
        {
            "name": "store_evaluation_parameter_metrics",
            "action_class": "StoreEvaluationParametersAction",
        },
        {
            "name": "update_data_docs",
            "action_class": "UpdateDataDocsAction",
        },
    ],
)

# Run the checkpoint
validation_result = context.run_checkpoint(checkpoint_name=checkpoint_name)

# Review the validation results
print("\nValidation Results Summary:")
if validation_result.success:
    print("Validation successful! All expectations passed.")
else:
    print("Validation failed. Some expectations did not pass.")
    for result in validation_result.results:
        if not result.success:
            print(f"  Failed Expectation: {result.expectation_config.expectation_type} for column '{result.expectation_config.column}'")
            print(f"    Details: {result.result}")

# Generate and open a Data Docs report
# Data Docs are automatically built when the `UpdateDataDocsAction` is included in the checkpoint.
# You can open them manually or programmatically.
print("\nGenerating and opening Data Docs report...")
# This command will open the Data Docs in your default web browser.
# In a real scenario, you might host these Data Docs on a web server.
context.open_data_docs()
print("Data Docs report generated and opened in your browser (if supported by your environment).")


# --- Task 3: Advanced Expectations and Scheduling ---

print("\n--- Task 3: Advanced Expectations and Scheduling ---")

# Define advanced expectations based on complex conditions.
# Reload the validator to add more expectations
validator_advanced = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="customer_data_suite_advanced" # Create a new suite for advanced expectations
)

print("\nAdding advanced expectations...")

# Expectation 17: `customer_id` must be unique
validator_advanced.expect_column_values_to_be_unique("customer_id")

# Expectation 18: `age` column values are between 18 and 99
validator_advanced.expect_column_values_to_be_between("age", min_value=18, max_value=99)

# Expectation 19: `email` column values match a regex pattern for email format
validator_advanced.expect_column_values_to_match_regex("email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")

# Expectation 20: `order_count` is never negative
validator_advanced.expect_column_values_to_be_between("order_count", min_value=0, max_value=None)

# Expectation 21: Conditional check: If `is_active` is True, then `order_count` must be greater than 0.
# This requires a custom expectation or a more complex approach.
# For simplicity, let's use a common pattern:
# Expectation: `order_count` column values are greater than 0 if `is_active` is True
# This is a more complex expectation that might require a custom `Expectation` class
# or a more direct data manipulation for validation.
# For demonstration, we'll use a simpler form:
validator_advanced.expect_column_pair_values_to_be_in_set(
    column_A="is_active",
    column_B="order_count",
    value_pairs=[(True, count) for count in range(1, 1000)] # Assuming order_count won't exceed 999
)
# A more robust way for conditional logic might involve filtering the DataFrame first
# and then applying expectations to the filtered subset.
# Example:
# active_customers_df = df[df['is_active'] == True]
# if not active_customers_df.empty:
#     active_customer_validator = context.get_validator(
#         batch_request=ge.core.batch_request.BatchRequest(
#             datasource_name=datasource_name,
#             data_asset_name="active_customer_data",
#             data_connector_name="default_runtime_data_connector",
#             data_connector_query={"batch_filter_parameters": {"batch_data": active_customers_df}},
#         ),
#         expectation_suite_name="active_customer_suite"
#     )
#     active_customer_validator.expect_column_values_to_be_between("order_count", min_value=1, max_value=None)
#     active_customer_validator.save_expectation_suite(discard_failed_expectations=False)


# Save the advanced expectation suite
validator_advanced.save_expectation_suite(discard_failed_expectations=False)
print("Advanced expectation suite 'customer_data_suite_advanced' saved.")

# Run validation with advanced expectations
checkpoint_advanced_name = "customer_data_advanced_checkpoint"
context.add_checkpoint(
    name=checkpoint_advanced_name,
    validator=validator_advanced,
    action_list=[
        {
            "name": "store_validation_result",
            "action_class": "StoreValidationResultAction",
        },
        {
            "name": "store_evaluation_parameter_metrics",
            "action_class": "StoreEvaluationParametersAction",
        },
        {
            "name": "update_data_docs",
            "action_class": "UpdateDataDocsAction",
        },
    ],
)
validation_result_advanced = context.run_checkpoint(checkpoint_name=checkpoint_advanced_name)

print("\nAdvanced Validation Results Summary:")
if validation_result_advanced.success:
    print("Advanced validation successful! All expectations passed.")
else:
    print("Advanced validation failed. Some expectations did not pass.")
    for result in validation_result_advanced.results:
        if not result.success:
            print(f"  Failed Expectation: {result.expectation_config.expectation_type} for column '{result.expectation_config.column}'")
            print(f"    Details: {result.result}")

context.open_data_docs() # Open Data Docs again to see the advanced suite results

# --- Automate Periodic Checks (Conceptual) ---

print("\n--- Automating Periodic Checks (Conceptual) ---")
print("Great Expectations validations can be automated using various scheduling tools.")
print("Here are some common approaches:")

print("\n1. Cron Jobs (Linux/macOS) / Task Scheduler (Windows):")
print("   You can set up a cron job to run this Python script at a specific interval (e.g., daily).")
print("   Example cron entry (runs daily at 2 AM):")
print("   0 2 * * * /usr/bin/python3 /path/to/your/script.py")

print("\n2. Apache Airflow:")
print("   For more complex data pipelines, Airflow is an excellent choice.")
print("   You would define a DAG (Directed Acyclic Graph) that includes a task to run your Great Expectations checkpoint.")
print("   Example Airflow DAG snippet:")
print("   from airflow import DAG")
print("   from airflow.operators.bash import BashOperator")
print("   from datetime import datetime")
print("\n   with DAG('data_quality_check', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:")
print("       run_ge_validation = BashOperator(")
print("           task_id='run_ge_validation',")
print("           bash_command='python /path/to/your/script.py', # Or a more specific GE command")
print("       )")

print("\n3. Prefect / Dagster / Other Orchestration Tools:")
print("   Similar to Airflow, these tools provide robust frameworks for orchestrating data workflows,")
print("   including tasks for data validation with Great Expectations.")

print("\n4. CI/CD Pipelines (e.g., Jenkins, GitLab CI, GitHub Actions):")
print("   Integrate Great Expectations validation into your CI/CD pipeline to ensure data quality")
print("   before deploying new data or models to production.")
print("   For example, run a validation step after a data transformation job completes.")

print("\nTo truly automate, you would replace the direct script execution with calls from these schedulers.")
print("The Data Docs can then be hosted on a web server for easy access to validation reports.")



ImportError: cannot import name 'DataContext' from 'great_expectations.data_context' (/home/vscode/.local/lib/python3.10/site-packages/great_expectations/data_context/__init__.py)