In [16]:
# Importing necessary libraries
from pyspark.sql import DataFrame
import os

## Problem Statement 1: Ensuring Data Accuracy
**Objective: Ensure that data across different files is accurate and correctly linked.**

Description:

1.	Load Data: Load data from all files into data frames using PySpark.
2.	Initial Validation: Check that data has been ingested correctly into data frames.
3.	Verify Data Accuracy:
    -	Confirm that Customer_ID in transactions.csv, interactions.csv, and customers.csv matches correctly.
    -	Check that Product_ID in transactions.csv is valid according to products.csv.
    -	Ensure that Sales_Rep_ID in transactions.csv matches entries in sales_team.csv.

In [17]:
%run utilities/common_utility.ipynb

24/09/20 05:08:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Initialize Spark session

In [18]:
spark = initialize_spark_session("Interaction Analysis")

24/09/20 05:08:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Logs Configuration

In [19]:
log_file_path = 'logs/analysis.log'
logger = initialize_logger(log_file_path)

logger.info("Logger initialized with dynamic path!")

2024-09-20 05:08:36,201 - logger - INFO - [92mLogger initialized with dynamic path![0m


In [20]:

def print_green(text: str):
    """
    Print text in green to the console and log it.
    """
    try:
        print(f"\033[92m{text}\033[0m")
        logger.info(f"{text}")
    except Exception as e:
        logger.error(f"Failed to print text: {e}")

def validate_ids(df1: DataFrame, df2: DataFrame, df1_col: str, df2_col: str, id_name: str):
    """
    Validate the presence of IDs from df1 in df2 and display any missing IDs.
    """
    try:
        # Ensure columns exist in both DataFrames
        if df1_col not in df1.columns or df2_col not in df2.columns:
            raise ValueError(f"Column '{df1_col}' or '{df2_col}' not found in the DataFrames.")

        # Perform the left anti join to find missing IDs
        missing_ids = df1.join(df2, df1[df1_col] == df2[df2_col], "left_anti") \
                         .select(df1[df1_col].alias(f"Missing_{id_name}"))
        
        if missing_ids.count() > 0:
            print_green(f"{id_name}s missing in {df2_col.split('_')[0]}.csv from {df1_col.split('_')[0]}.csv:")
            missing_ids.show()
            logger.info(f"{id_name}s missing in {df2_col.split('_')[0]}.csv from {df1_col.split('_')[0]}.csv:")
        else:
            print(f"No missing {id_name}s found.")
            logger.info(f"No missing {id_name}s found between {df1_col} and {df2_col}.")

    except AnalysisException as ae:
        logger.error(f"Error during DataFrame operation: {ae}")
    except ValueError as ve:
        logger.error(f"Validation error: {ve}")
    except Exception as e:
        logger.error(f"Unexpected error during validation: {e}")

def load_data_files(file_paths: dict) -> dict:
    """
    Load CSV files into Spark DataFrames, with error handling for missing files and invalid formats.
    """
    dfs = {}
    for name, file in file_paths.items():
        try:
            if not os.path.exists(file):
                raise FileNotFoundError(f"File '{file}' not found.")
            dfs[name] = spark.read.csv(file, header=True, inferSchema=True)
            logger.info(f"Loaded {file} successfully.")
        except FileNotFoundError as fnfe:
            logger.error(fnfe)
        except Exception as e:
            logger.error(f"Error loading {file}: {e}")
    
    return dfs

def display_dataframes(dfs: dict):
    """
    Display the first 5 records for each DataFrame, with error handling.
    """
    for name, df in dfs.items():
        try:
            if df is None:
                raise ValueError(f"DataFrame '{name}' is empty or not loaded.")
            
            print_green(f"{name.capitalize()} DataFrame:")
            df.show(5, truncate=False)
            logger.info(f"Displayed first 5 records for {name.capitalize()} DataFrame.")
        except ValueError as ve:
            logger.error(ve)
        except Exception as e:
            logger.error(f"Error displaying DataFrame {name}: {e}")

def validate_data(dfs: dict, validations: list):
    """
    Validate data between DataFrames based on the provided validation rules, with error handling.
    """
    for df1_name, df2_name, id_name in validations:
        try:
            print_green(f"Verifying {id_name} matches between {df1_name}.csv and {df2_name}.csv...")
            logger.info(f"Starting validation of {id_name} between {df1_name}.csv and {df2_name}.csv.")

            # Ensure DataFrames are loaded
            if df1_name not in dfs or df2_name not in dfs:
                raise ValueError(f"One of the DataFrames '{df1_name}' or '{df2_name}' is missing.")

            validate_ids(dfs[df1_name], dfs[df2_name], id_name, id_name, id_name)
        
        except ValueError as ve:
            logger.error(ve)
        except Exception as e:
            logger.error(f"Unexpected error during data validation: {e}")

def main():
    # Load data from CSV files into DataFrames
    try:
        customers_file_path = "Dataset/customers.csv"
        products_file_path = "Dataset/products.csv"
        transactions_file_path = "Dataset/transactions.csv"
        interactions_file_path = "Dataset/interactions.csv"
        sales_team_file_path = "Dataset/sales_team.csv"

        data_files = {
            "customers": customers_file_path,
            "products": products_file_path,
            "transactions": transactions_file_path,
            "interactions": interactions_file_path,
            "sales_team": sales_team_file_path
        }

        # Load DataFrames
        dfs = load_data_files(data_files)

        # Display initial records for each DataFrame
        display_dataframes(dfs)

        # Verify Data Accuracy
        validations = [
            ("transactions", "customers", "Customer_ID"),
            ("interactions", "customers", "Customer_ID"),
            ("transactions", "products", "Product_ID"),
            ("transactions", "sales_team", "Sales_Rep_ID")
        ]

        validate_data(dfs, validations)

        logger.info("Data validation completed successfully.")
    
    except Exception as e:
        logger.error(f"Unexpected error in main execution: {e}")

if __name__ == "__main__":
    main()

2024-09-20 05:08:36,361 - logger - INFO - [92mLoaded Dataset/customers.csv successfully.[0m
2024-09-20 05:08:36,465 - logger - INFO - [92mLoaded Dataset/products.csv successfully.[0m
2024-09-20 05:08:36,557 - logger - INFO - [92mLoaded Dataset/transactions.csv successfully.[0m
2024-09-20 05:08:36,657 - logger - INFO - [92mLoaded Dataset/interactions.csv successfully.[0m
2024-09-20 05:08:36,754 - logger - INFO - [92mLoaded Dataset/sales_team.csv successfully.[0m
2024-09-20 05:08:36,756 - logger - INFO - [92mCustomers DataFrame:[0m
2024-09-20 05:08:36,797 - logger - INFO - [92mDisplayed first 5 records for Customers DataFrame.[0m
2024-09-20 05:08:36,798 - logger - INFO - [92mProducts DataFrame:[0m
2024-09-20 05:08:36,838 - logger - INFO - [92mDisplayed first 5 records for Products DataFrame.[0m
2024-09-20 05:08:36,839 - logger - INFO - [92mTransactions DataFrame:[0m
2024-09-20 05:08:36,885 - logger - INFO - [92mDisplayed first 5 records for Transactions DataFrame.[0

[92mCustomers DataFrame:[0m
+------------------------------------+-----------------+----------------------+-------------+----------------+
|Customer_ID                         |Name             |Email                 |Phone        |Country         |
+------------------------------------+-----------------+----------------------+-------------+----------------+
|a85e6a90-78d5-490c-a53f-c58b2e57c59b|Shannon Deleon   |NULL                  |5878628895   |Japan           |
|babec972-ffb3-4c56-99c3-e8e3855adf0f|Christina Sanchez|craigprice@example.org|4832368495   |Haiti           |
|d74c33bd-69d9-4718-9e00-d1895a41ddac|Thomas Brown     |vjohnson@example.org  |(276)903-7065|Pakistan        |
|ff05ceba-f459-4714-a252-e03198d9934c|Lindsey Bradford |kathryn50@example.net |NULL         |Marshall Islands|
|f20755f6-8481-4904-afe6-504451ceded5|John Boyer       |jennifer15@example.org|(749)644-5721|New Caledonia   |
+------------------------------------+-----------------+----------------------+---

2024-09-20 05:08:36,976 - logger - INFO - [92mDisplayed first 5 records for Sales_team DataFrame.[0m
2024-09-20 05:08:36,977 - logger - INFO - [92mVerifying Customer_ID matches between transactions.csv and customers.csv...[0m
2024-09-20 05:08:36,980 - logger - INFO - [92mStarting validation of Customer_ID between transactions.csv and customers.csv.[0m
2024-09-20 05:08:37,096 - logger - INFO - [92mNo missing Customer_IDs found between Customer_ID and Customer_ID.[0m
2024-09-20 05:08:37,098 - logger - INFO - [92mVerifying Customer_ID matches between interactions.csv and customers.csv...[0m
2024-09-20 05:08:37,098 - logger - INFO - [92mStarting validation of Customer_ID between interactions.csv and customers.csv.[0m


+------------------------------------+-----------------+-------------+------------+--------------+
|Sales_Rep_ID                        |Name             |Region       |Sales_Target|Sales_Achieved|
+------------------------------------+-----------------+-------------+------------+--------------+
|0437b05a-9628-43f9-ac07-0b9a0dc96dcd|Brittany Taylor  |California   |41135       |14037.0       |
|4daeb6af-d7e9-4f99-91b3-6c912f45b740|Mitchell Williams|New Hampshire|32996       |21461.0       |
|f243144e-485f-4382-81ef-2a9a3c63f172|John Terry       |Kansas       |10385       |NULL          |
|9c44ee81-8254-45e1-af23-a4608ceb126c|Carolyn Miller   |Arizona      |23754       |17149.0       |
|3e97b5d8-933a-4860-bce7-2398af6c5613|Antonio Sparks   |Washington   |27101       |36413.0       |
+------------------------------------+-----------------+-------------+------------+--------------+
only showing top 5 rows

[92mVerifying Customer_ID matches between transactions.csv and customers.csv...[0m

2024-09-20 05:08:37,219 - logger - INFO - [92mNo missing Customer_IDs found between Customer_ID and Customer_ID.[0m
2024-09-20 05:08:37,220 - logger - INFO - [92mVerifying Product_ID matches between transactions.csv and products.csv...[0m
2024-09-20 05:08:37,222 - logger - INFO - [92mStarting validation of Product_ID between transactions.csv and products.csv.[0m
2024-09-20 05:08:37,344 - logger - INFO - [92mNo missing Product_IDs found between Product_ID and Product_ID.[0m
2024-09-20 05:08:37,345 - logger - INFO - [92mVerifying Sales_Rep_ID matches between transactions.csv and sales_team.csv...[0m
2024-09-20 05:08:37,346 - logger - INFO - [92mStarting validation of Sales_Rep_ID between transactions.csv and sales_team.csv.[0m


No missing Customer_IDs found.
[92mVerifying Product_ID matches between transactions.csv and products.csv...[0m
No missing Product_IDs found.
[92mVerifying Sales_Rep_ID matches between transactions.csv and sales_team.csv...[0m


2024-09-20 05:08:37,475 - logger - INFO - [92mNo missing Sales_Rep_IDs found between Sales_Rep_ID and Sales_Rep_ID.[0m
2024-09-20 05:08:37,476 - logger - INFO - [92mData validation completed successfully.[0m


No missing Sales_Rep_IDs found.
