In [10]:
import pandas as pd
import os
import json
import glob
import logging

# Configure logging to record validation outputs
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("DataValidationNotebook")

# Define the path to the data folder, starting from the current working directory
current_dir = os.path.abspath('')
project_root = os.path.join(current_dir, os.pardir)  # Move one level up to the project root
data_folder = os.path.normpath(os.path.join(project_root, 'root', 'dags', 'data'))
logger.info(f"Data folder path: {data_folder}")

# Check if the data folder exists
if os.path.exists(data_folder):
    logger.info("Data folder exists.")
else:
    logger.error("Data folder does not exist.")
    

# Function to check if a file exists and log the result
def check_file_exists(filepath):
    if os.path.exists(filepath):
        logger.info(f"File exists: {filepath}")
        return True
    else:
        logger.error(f"File does not exist: {filepath}")
        return False


2024-11-11 15:30:52,998 - INFO - Data folder path: c:\Users\Vicente Renart\Desktop\bees_data_pipeline\root\dags\data
2024-11-11 15:30:52,999 - INFO - Data folder exists.


In [11]:
# Bronze Layer
def load_bronze_data():
    """Loads Bronze layer data and returns it as a DataFrame."""
    bronze_path = os.path.join(data_folder, "bronze_breweries.json")
    if not check_file_exists(bronze_path):
        return None
    
    try:
        with open(bronze_path, 'r') as file:
            bronze_data = json.load(file)
            bronze_df = pd.DataFrame(bronze_data)
            logger.info("Bronze Layer Data Loaded Successfully")
            logger.info(f"Bronze Layer Data Sample:\n{bronze_df.head()}")
            return bronze_df
    except Exception as e:
        logger.error(f"Error loading Bronze layer data: {e}")
        return None

# Load Bronze data
bronze_df = load_bronze_data()

2024-11-11 15:30:55,991 - INFO - File exists: c:\Users\Vicente Renart\Desktop\bees_data_pipeline\root\dags\data\bronze_breweries.json
2024-11-11 15:30:56,002 - INFO - Bronze Layer Data Loaded Successfully
2024-11-11 15:30:56,014 - INFO - Bronze Layer Data Sample:
                                     id                     name brewery_type  \
0  5128df48-79fc-4f0f-8b52-d06be54d0cec         (405) Brewing Co        micro   
1  9c5a66c8-cc13-416f-a5d9-0a769c87d318         (512) Brewing Co        micro   
2  34e8c68b-6146-453f-a4b9-1f6cd99a5ada  1 of Us Brewing Company        micro   
3  ef970757-fe42-416f-931d-722451f1f59c     10 Barrel Brewing Co        large   
4  6d14b220-8926-4521-8d19-b98a2d6ec3db     10 Barrel Brewing Co        large   

               address_1 address_2 address_3            city state_province  \
0         1716 Topeka St      None      None          Norman       Oklahoma   
1  407 Radam Ln Ste F200      None      None          Austin          Texas   
2    8100 Wa

In [15]:
# Function to load and compare data between Bronze and Silver layers
def validate_state_data(selected_state):
    """Validates that data for a specific state in the Silver Layer matches the Bronze Layer."""
    bronze_file_path = os.path.join(data_folder, 'bronze_breweries.json')
    silver_file_path = os.path.join(data_folder, 'silver_breweries', f"{selected_state}.parquet")
    
    # Check if both files exist
    if not check_file_exists(bronze_file_path) or not check_file_exists(silver_file_path):
        return

    try:
        # Load Bronze and Silver data
        df_bronze = pd.read_json(bronze_file_path)
        df_silver = pd.read_parquet(silver_file_path)

        # Filter Bronze data for the selected state
        df_bronze_state = df_bronze[df_bronze['state'] == selected_state]
        
        # Display sample rows from both datasets for comparison
        logger.info(f"Bronze Layer Data for state '{selected_state}':\n{df_bronze_state.head()}")
        logger.info(f"Silver Layer Data for state '{selected_state}':\n{df_silver.head()}")
        
        # Validate that the Silver data contains only the selected state
        silver_states = df_silver['state'].unique()
        
        if selected_state in silver_states and len(silver_states) == 1:
            logger.info(f"Validation passed: Data in Silver Layer for '{selected_state}' is consistent with Bronze Layer.")
        else:
            logger.warning(f"Validation failed: Silver Layer data for '{selected_state}' contains data from other states or is incomplete.")
    
    except Exception as e:
        logger.error(f"Error comparing data for state '{selected_state}': {e}")

# Example usage
validate_state_data("California")  # Change the state here to validate a different one

2024-11-11 15:32:42,308 - INFO - File exists: c:\Users\Vicente Renart\Desktop\bees_data_pipeline\root\dags\data\bronze_breweries.json
2024-11-11 15:32:42,309 - INFO - File exists: c:\Users\Vicente Renart\Desktop\bees_data_pipeline\root\dags\data\silver_breweries\California.parquet
2024-11-11 15:32:42,321 - INFO - Bronze Layer Data for state 'California':
                                      id                        name  \
3   ef970757-fe42-416f-931d-722451f1f59c        10 Barrel Brewing Co   
12  5ae467af-66dc-4d7f-8839-44228f89b596   101 North Brewing Company   
30  4788221a-a03b-458c-9084-4cadd69ade6d  14 Cannons Brewing Company   
44  fe6b9893-b93e-43d5-a9f6-3e0c89a3f13c        1850 Brewing Company   

   brewery_type                  address_1 address_2  address_3  \
3         large                  1501 E St      None        NaN   
12       closed        1304 Scott St Ste D      None        NaN   
30        micro  31125 Via Colinas Ste 907      None        NaN   
44        micr

In [18]:
# Function to validate all states in Silver Layer against the Gold Layer
def validate_all_states_in_gold_layer():
    """Validates that data for each state in the Silver Layer is correctly represented in the Gold Layer."""
    silver_path = os.path.join(data_folder, 'silver_breweries')
    gold_path = os.path.join(data_folder, "gold_breweries_aggregated.parquet")
    
    # Check if Gold Layer file exists
    if not check_file_exists(gold_path):
        logger.error("Gold Layer validation cannot proceed without the Gold file.")
        return
    
    # Load Gold Layer data
    try:
        gold_df = pd.read_parquet(gold_path)
        logger.info("Gold Layer Data Loaded Successfully.")
        logger.info(f"Gold Layer Data Sample:\n{gold_df.head()}")  # Display sample of Gold Layer Data
    except Exception as e:
        logger.error(f"Error loading Gold layer data: {e}")
        return

    # List to store states with validation issues
    invalid_states = []
    silver_aggregated_data = []

    # Iterate over each state file in the Silver Layer directory and aggregate counts by state and brewery_type
    for file_name in os.listdir(silver_path):
        if file_name.endswith(".parquet"):
            state_name = file_name.replace(".parquet", "")
            silver_file_path = os.path.join(silver_path, file_name)
            
            try:
                # Load Silver Layer data for the specific state
                df_silver = pd.read_parquet(silver_file_path)
                logger.info(f"Silver Layer Data for state '{state_name}' Loaded Successfully.")
                logger.info(f"Silver Layer Data Sample for state '{state_name}':\n{df_silver.head()}")  # Display sample of Silver Layer Data
                
                # Aggregate Silver data by brewery_type for the current state
                state_aggregated = (
                    df_silver.groupby("brewery_type")
                    .size()
                    .reset_index(name="brewery_count")
                )
                state_aggregated["state"] = state_name  # Add state column to match Gold format
                
                # Append the state data to the list for comparison later
                silver_aggregated_data.append(state_aggregated)
                logger.info(f"Aggregated Silver Data for state '{state_name}':\n{state_aggregated}")  # Display aggregated data for the state
            
            except Exception as e:
                logger.error(f"Error loading or validating data for state '{state_name}': {e}")
                invalid_states.append(state_name)

    # Combine all aggregated Silver data for final comparison with the Gold Layer
    if silver_aggregated_data:
        silver_aggregated_df = pd.concat(silver_aggregated_data, ignore_index=True)
        logger.info("Combined Aggregated Silver Data:\n")
        logger.info(silver_aggregated_df.head())  # Display combined aggregated Silver data

        # Validate that the Silver aggregated data matches the Gold data
        mismatches = silver_aggregated_df.merge(
            gold_df, 
            on=["state", "brewery_type", "brewery_count"], 
            how="outer", 
            indicator=True
        ).query('_merge != "both"')

        # Log mismatch details if there are discrepancies
        if not mismatches.empty:
            logger.warning("Validation issues found between Silver and Gold layers for the following records:")
            logger.warning(mismatches)
            invalid_states.extend(mismatches["state"].unique())
        else:
            logger.info("Validation passed: All states and brewery types in the Silver Layer are correctly represented in the Gold Layer.")
    else:
        logger.warning("No Silver data found for aggregation or comparison with the Gold Layer.")

    # Final summary of validation results
    if invalid_states:
        logger.warning("Validation failed for the following states:")
        for state in set(invalid_states):
            logger.warning(f"- {state}")
    else:
        logger.info("Validation passed: All states in the Gold Layer are consistent with the Silver Layer.")

# Run validation for all states in the Silver Layer
validate_all_states_in_gold_layer()


2024-11-11 15:34:51,377 - INFO - File exists: c:\Users\Vicente Renart\Desktop\bees_data_pipeline\root\dags\data\gold_breweries_aggregated.parquet
2024-11-11 15:34:51,381 - INFO - Gold Layer Data Loaded Successfully.
2024-11-11 15:34:51,384 - INFO - Gold Layer Data Sample:
  brewery_type           state  brewery_count
0      brewpub        Colorado              1
1      brewpub        New York              2
2      brewpub  North Carolina              1
3      brewpub            Ohio              1
4      brewpub          Oregon              2
2024-11-11 15:34:51,388 - INFO - Silver Layer Data for state 'Arizona' Loaded Successfully.
2024-11-11 15:34:51,392 - INFO - Silver Layer Data Sample for state 'Arizona':
                                      id  \
20  fb94830f-6196-4f59-9189-c9060b778085   
21  0faa0fb2-fffa-416d-9eab-46f67477c8ef   
48  4f4b5b34-d572-4dff-a18f-47e507c073e6   

                                             name brewery_type     city  \
20                        12