# SDP Integration Tests Using Expectations
This Spark Declarative Pipeline is a simple example pipeline that includes a few integration checks using expectations on a simple project to teach the basics. There are additional expectations you can set or unit tests you can build to streamline the project, but we're keeping it simple.

Please check out the following resources for more information.

- [Manage data quality with pipeline expectations](https://docs.databricks.com/aws/en/ldp/expectations#manage-data-quality-with-pipeline-expectations)

- [Expectation recommendations and advanced patterns](https://docs.databricks.com/aws/en/ldp/expectation-patterns#expectation-recommendations-and-advanced-patterns)

- [Applying software development & DevOps best practices to Spark Declarative Pipelines](https://www.databricks.com/blog/applying-software-development-devops-best-practices-delta-live-table-pipelines)

## Obtain Configuration Variable for the Target Environment
This path will use the configuration variable set in the Spark Declarative Pipeline for **development, stage and production**.

- If target is **development** or **stage** run all integration tests. 
- If target is **production**, only run the gold table integration test.

In [0]:
from pyspark import pipelines as dp

## Store the target configuration environment in the variable targert
target = spark.conf.get("target")

### Create a Dictionary for Integration Test Values

Create a dictionary containing the necessary values for integration tests in both **development** and **stage** environments. There are several approaches to achieve this, but this is a straightforward method.

For more information, refer to the [Portable and Reusable Expectations](https://docs.databricks.com/en/delta-live-tables/expectation-patterns.html#portable-and-reusable-expectations) documentation.



In [0]:
## Based on the deployed target, obtain the specific validation metrics for the tables.
target_integration_tests_validation = {
    'development': {
        'health_bronze': {
            'total_rows': 7500
        },
        'health_silver': {
            'total_rows': 7500
        }
    },
    'stage': {
        'health_bronze': {
            'total_rows': 35000
        },
        'health_silver': {
            'total_rows': 35000
        }
    }
}


## Store the expected values for the total rows in the tables tables in the variables based on the target if in development or stage
if target in ('development', 'stage'):
    total_expected_bronze = target_integration_tests_validation[target]['health_bronze']['total_rows']
    total_expected_silver = target_integration_tests_validation[target]['health_silver']['total_rows']

### Create a Function to Count the Total Number of Rows in a Table
The `test_count_table_total_rows` function creates a materialized view that counts the total number of rows in the specified table.

In [0]:
def test_count_table_total_rows(table_name, total_count, target):
    '''
    Count the number of rows in the specified table and compare with the expected values for development and stage data. 
    Fail the update if the count does not match the specified values.
    '''
    @dp.table(
        name=f"TEST_{target}_{table_name}_total_rows_verification",
        comment=f"Confirms all rows were ingested from the {target} raw data to {table_name}"
    )

    @dp.expect_all_or_fail({"valid count": f"total_rows = {total_count}"}) 

    def count_table_total_rows():
        return spark.sql(f"""
            SELECT COUNT(*) AS total_rows FROM LIVE.{table_name}
        """)

### Create a Function to Confirm the Column Values in the Gold Materialized View
The `test_gold_table_columns` function creates a materialized view that checks the values in the columns **Age_Group** and **HighCholest_Group** in **chol_age_agg**.

In [0]:
def test_gold_table_columns():
    '''
    This function will check unique values in the columns Age_Group and HighCholest_Group in the gold table chol_age_agg.

    This confirms that the distinct values for these columns in the gold table are correct.
    ''' 
    ## Set expectations for the columns
    check_silver_calc_columns = {
        "valid age group": "Age_Group in ('0-9', '10-19', '20-29', '30-39', '40-49', '50+', 'Unknown')",
        "valid cholest group": "HighCholest_Group in ('Normal', 'Above Average', 'High', 'Unknown')"
    }

    @dp.table(comment="Check age group and high cholest group in the gold table")

    ## Fail if expectations are not met
    @dp.expect_all_or_fail(check_silver_calc_columns)

    def test_calculated_columns_age_cholesterol():
        return (dp
                .read("chol_age_agg")
                .select("Age_Group", "HighCholest_Group")
            )

### Execute the Specified Integration Tests
Execute the specified integration tests based on the target environment.

In [0]:
## Run the specified tests based on the target environment (development, stage or production)

if target in ('development','stage'):  ## Dynamic integration test for dev or stage tables
    test_count_table_total_rows('health_bronze',  total_expected_bronze, target)
    test_count_table_total_rows('health_silver',  total_expected_silver, target)
    test_gold_table_columns()
elif target == 'production':  ## Only test the gold table in production
    test_gold_table_columns()