In [1]:
!pip install pytest

StatementMeta(, 11c0119a-c7a5-49ed-9a8b-33faa8524d54, 3, Finished, Available, Finished)



#### Imports
----------

In [2]:
from uuid import UUID
from pyspark.sql.functions import *

StatementMeta(, 11c0119a-c7a5-49ed-9a8b-33faa8524d54, 4, Finished, Available, Finished)

#### References
----------

In [3]:
class TestEntity:
    """
    A class used to represent a test plan on a entity

    ...

    Attributes
    ----------
    workspace_id : UUID
        ID of the workspace where the model is located.
    workspace_name : str
        Name of the workspace where the model is located.
    entity_name : str
        Name of the entity
    entity : DataFrame
        Represents the content of the entity
    primary_key : str
        Name of the primary key column of the entity

    Methods
    -------
    test_number_rows()
        Contrasts the number of entity rows against expected rows
    test_missing_values()
        Checks if primary key has missing values
    test_duplicated_values()
        Checks if primary key has duplicated values
    """

    def __init__(self, workspace:UUID, lakehouse:UUID, entity_name:str):
        """
        Parameters
        ----------
        entity_name : str
            Name of the entity
        workspace : UUID
            ID of the workspace where the model is located.
        """
        self.workspace_id = workspace

        self.lakehouse_id = lakehouse
        
        self.entity_name = entity_name

        self.path = f"abfss://{self.workspace_id}@onelake.dfs.fabric.microsoft.com/{self.lakehouse_id}/Tables/{self.entity_name}"

        # Get entity dataframe
        self.entity = spark.read.format("delta").load(self.path)

        try:
            # Look up primary key
            self.primary_key = self.entity.select([c for c in self.entity.columns if c.startswith("bk_")]).columns[0]
        except Exception as e:
            self.primary_key = ""
    
    def test_total_rows(self):
        """
        Contrasts the total rows of the entity against expected rows

        Raises
        -------
        AssertionError
            If result not equals to expected value.
        """

        rows = spark.read.format("delta").load(f"abfss://{self.workspace_id}@onelake.dfs.fabric.microsoft.com/{self.lakehouse_id}/Tables/tablerowcount") \
                .where(col("TableName") == self.entity_name).collect()[0][1]

        try:
            assert self.entity.count() == rows, f"\033[1;31mFAILED\033[0m\n\t\033[31mFound a different number of rows in {self.entity_name} table\033[0m"
        except AssertionError as e:
            print(e)
            return 1
        else:
            print("\033[1;32mPASSED\033[0m")
            return 0


    def test_missing_values(self):
        """
        Checks if primary key has missing values

        Raises
        -------
        AssertionError
            If result not equals to expected value.
        """

        missing = self.entity.where(isnan(self.primary_key) | col(self.primary_key).isNull() | (col(self.primary_key) == "NULL")).count()
        try:
            assert missing == 0, f"\033[1;31mFAILED\033[0m\n\t\033[31mFound missing values in primary key of {self.entity_name} table\033[0m"
        except AssertionError as e:
            print(e)
            return 1
        else:
            print("\033[1;32mPASSED\033[0m")
            return 0
    
    def test_duplicated_values(self):
        """
        Checks if primary key has duplicated values

        Raises
        -------
        AssertionError
            If result not equals to expected value.
        """

        duplicates = self.entity \
                        .groupby(self.primary_key) \
                        .count() \
                        .where("count > 1").select(self.primary_key)
        
        try:
            assert duplicates.count() == 0, f"\033[1;31mFAILED\033[0m\n\t\033[31mFound the following duplicated values in primary key of {self.entity_name} table: { [ value[self.primary_key] for value in duplicates.collect()]}\033[0m"
        except AssertionError as e:
            print(e)
            return 1
        else:
            print("\033[1;32mPASSED\033[0m")
            return 0

StatementMeta(, 11c0119a-c7a5-49ed-9a8b-33faa8524d54, 5, Finished, Available, Finished)

In [4]:
tables = {"dim_currencyexchange", "dim_customer", "dim_date", "dim_product", "dim_store", "fact_orders", "fact_sales"}
errors = 0

for table_name in tables:

    entity_name = table_name
    print("\nInitializing test session...")
    test_entity = TestEntity("7ad88f57-163e-48fe-ab1b-3b3150611dad", "909d6a07-61fe-4866-b576-41cd2a0ec437", table_name)

    print("========================================= test session starts =========================================\n")
    print(f"\033[01mEntity:\033[0m {entity_name} \n")
    print("------------------------------------- TEST 01 - test_total_rows --------------------------------------\n")
    
    errors += test_entity.test_total_rows()
    
    if entity_name.startswith("dim_"):
        print("------------------------------------ TEST 02 - test_missing_values ------------------------------------\n")
        errors += test_entity.test_missing_values()
        print("----------------------------------- TEST 03 - test_duplicated_values ----------------------------------\n")
        errors += test_entity.test_duplicated_values()

    print("=======================================================================================================\n")

StatementMeta(, 11c0119a-c7a5-49ed-9a8b-33faa8524d54, 6, Finished, Available, Finished)


Initializing test session...

[01mEntity:[0m fact_sales 

------------------------------------- TEST 01 - test_total_rows --------------------------------------

[1;32mPASSED[0m


Initializing test session...

[01mEntity:[0m fact_orders 

------------------------------------- TEST 01 - test_total_rows --------------------------------------

[1;31mFAILED[0m
	[31mFound a different number of rows in fact_orders table[0m


Initializing test session...

[01mEntity:[0m dim_customer 

------------------------------------- TEST 01 - test_total_rows --------------------------------------

[1;32mPASSED[0m
------------------------------------ TEST 02 - test_missing_values ------------------------------------

[1;32mPASSED[0m
----------------------------------- TEST 03 - test_duplicated_values ----------------------------------

[1;32mPASSED[0m


Initializing test session...

[01mEntity:[0m dim_currencyexchange 

------------------------------------- TEST 01 - test_total_rows 

In [5]:
if errors > 0:
    raise Exception("At least one of the above tests has failed. Please, check it")

StatementMeta(, 11c0119a-c7a5-49ed-9a8b-33faa8524d54, 7, Finished, Available, Finished)

Exception: At least one of the above tests has failed. Please, check it