In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count
from features.data_quality_validation.schema_definition import WINE_QUALITY_TYPE_SCHEMA
from features.data_quality_validation.schema_validation import SchemaValidation
from features.etl.etl_validation import ETLValidation
from features.utils.logger_config import logger
from features.utils.path_utils import FileLoader

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

wine_path = "./datasets/wine_quality_corrupted.csv"
df = FileLoader.load_csv_with_delimiter(spark, wine_path, delimiter=",")

25/04/04 01:43:17 WARN Utils: Your hostname, adonis-6.local resolves to a loopback address: 127.0.0.1; using 192.168.0.165 instead (on interface en0)
25/04/04 01:43:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/04 01:43:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/04 01:43:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
# ETL Transformation: Add 'alcohol_category' column
df_transformed = df.withColumn(
    "alcohol_category",
    when(col("alcohol") > 10, "High").otherwise("Low")
)

In [6]:
etl_validator = ETLValidation(df, df_transformed)

# Perform ETL validation checks
logger.info(f"Row Count Match: {etl_validator.validate_row_count()}")
logger.info(f"Column Validation: {etl_validator.validate_columns()}")
logger.info(f"Alcohol Aggregation Check: {etl_validator.validate_aggregates('alcohol')}")
logger.info(f"Completeness Check: {etl_validator.validate_completeness('quality')}")

01:43:22 INFO: Row Count Match: (1158, 1158, True)
01:43:22 INFO: Column Validation: {'missing': [], 'extra': ['alcohol_category']}
01:43:22 INFO: Alcohol Aggregation Check: {'sum_difference': 0.0, 'avg_difference': 0.0}
01:43:22 INFO: Completeness Check: {'missing_keys': []}


In [7]:
schema_validator = SchemaValidation(df_transformed)

logger.info(f"Data Type Validation: "
            f"{schema_validator.validate_data_types(WINE_QUALITY_TYPE_SCHEMA)}")
logger.info(f"Alcohol > 0 Check: "
            f"{schema_validator.validate_business_rules('alcohol', lambda x: x > 0)}")
logger.info(f"High Alcohol Check (High): "
            f"{schema_validator.validate_business_rules('alcohol_category', lambda x: x == 'High')}")
logger.info(f"High Alcohol Check (Low): "
            f"{schema_validator.validate_business_rules('alcohol_category', lambda x: x == 'Low')}")


01:43:22 INFO: Data Type Validation: {'fixed acidity': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'volatile acidity': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'citric acid': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'residual sugar': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'chlorides': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'free sulfur dioxide': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'total sulfur dioxide': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'density': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'pH': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'sulphates': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'alcohol': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'quality': {'expected': 'FloatType()', 'actual': 'DoubleType()'}, 'Id': {'expected': 'FloatType()', 'actual': 'DoubleType()'}}
01:43:23 INFO: Alcohol > 0 Check: (0, 1106)
01:43:23 INFO: 

In [8]:
# Stop Spark session
spark.stop()