In [0]:
import unittest

raw_table = "raw_cdp"
data_table = "cdp_data"
aggregated_table = "system_tags_aggregated"

In [0]:
# Arrange: Pre-defined date range
date_range = spark.sql(f"""
SELECT DATE_TRUNC('month', DATEADD(MONTH, -1, CURRENT_DATE()))::DATE
""").first()[0]
print(date_range)

In [0]:
# Act: Raw data availability : Last 1 month
raw_data = spark.sql(f"""
SELECT COUNT(*) > 0 AS has_row FROM {raw_table}
WHERE DATE_TRUNC('month', bill_billing_period_start_date)::DATE = '{date_range}'
""").first()
print(raw_data.has_row)

In [0]:
# Act: Data availability : Last 1 month
data = spark.sql(f""" 
SELECT COUNT(*) > 0 AS has_row FROM {data_table}
WHERE DATE_TRUNC('month', Date)::DATE = '{date_range}'
""").first()
print(data.has_row)

In [0]:
# Act: Aggregated availability : Last 1 month
aggregated = spark.sql(f""" 
SELECT COUNT(*) > 0 AS has_row FROM {aggregated_table}
WHERE Vendor = 'Amazon' 
AND System = 'CDP' 
AND YearMonth = '{date_range}'
""").first()
print(aggregated.has_row)

In [0]:
# Act: Data imported : Last 3 month
data_imported_list = spark.sql(f"""
SELECT DATE_TRUNC('month', bill_billing_period_start_date)::DATE AS StartDate, 
COUNT(*) AS row_count FROM {raw_table}
GROUP BY StartDate
HAVING StartDate >= DATE_TRUNC('month', DATEADD(MONTH, -3, CURRENT_DATE()))::DATE
ORDER BY StartDate DESC
""").collect()
display(data_imported_list)

In [0]:
# Act : Latest, Previous month data and average row count
latest_row_count = data_imported_list[0][1]
previous_row_count = data_imported_list[1][1]
average_row_count = round(sum(row[1] for row in data_imported_list) / len(data_imported_list))

print(f"latest_row_count: {latest_row_count}")
print(f"previous_row_count: {previous_row_count}")
print(f"average_row_count: {average_row_count}")

In [0]:
# Act: Threshold
threshold = 8 # 8% Average-Growth
percentage = 1 + threshold / 100
average_row_count_after_threshold = round(average_row_count * percentage)

print(f"threshold: {threshold}%")
print(f"average_row_count_after_threshold: {average_row_count_after_threshold}")

In [0]:
# Assert
class TestAvailability(unittest.TestCase):

    def test_raw_data_available(self):
        assert raw_data.has_row, f"Raw data for {raw_table} from {date_range} is not available"

    def test_data_available(self):
        assert data.has_row, f"Data for {data_table} from {date_range} is not available"

    def test_aggregated_data_available(self):
        assert aggregated.has_row, f"Aggregated data for {aggregated_table} from {date_range} is not available"


In [0]:
# Assert
class TestImported_Within_Average_Threshold(unittest.TestCase):

    def test_average_row_count_after_threshold_is_greater_than_latest_row_count(self):
        assert average_row_count_after_threshold > latest_row_count, f"Average row count growth of latest row count for {raw_table} on {date_range} is not within the average {threshold}% threshold"


In [0]:
# Run Test 
test_availability = unittest.TestLoader().loadTestsFromTestCase(TestAvailability)
test_imported_within_average_threshold = unittest.TestLoader().loadTestsFromTestCase(TestImported_Within_Average_Threshold)

suite = unittest.TestSuite([test_availability, test_imported_within_average_threshold])

assert unittest.TextTestRunner(verbosity=2).run(suite).wasSuccessful(), "Test failed. Refer logs"