#### **This Notebook is to perform cleaning functions to all the csv files made from the Models Cars Data **

#### **install profiler to have a great understanding of the data**

In [0]:
%pip install sweetviz
%restart_python

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


#### **Import libraries**

In [0]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import sweetviz as sv

In [0]:

# Create Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Data Cleaning and Ingestion") \
    .getOrCreate()

def load_data(path):
    df = spark.read.csv(path, header=True, inferSchema=True)
    # df.show(10)
    # df.printSchema()
    return df

def to_pandas(spark_df):
    pandas_df = spark_df.toPandas()
    return pandas_df



In [0]:
%fs ls dbfs:/tmp

path,name,size,modificationTime
dbfs:/tmp/cleaned_customers.csv,cleaned_customers.csv,11756,1728725311441
dbfs:/tmp/cleaned_employees.csv,cleaned_employees.csv,1781,1728725310472
dbfs:/tmp/cleaned_offices.csv,cleaned_offices.csv,597,1728725313181
dbfs:/tmp/cleaned_orderdetails.csv,cleaned_orderdetails.csv,2728,1728725317407
dbfs:/tmp/cleaned_orders.csv,cleaned_orders.csv,7592,1728725315808
dbfs:/tmp/cleaned_payments.csv,cleaned_payments.csv,3315,1728725312355
dbfs:/tmp/cleaned_productlines.csv,cleaned_productlines.csv,3474,1728725314014
dbfs:/tmp/cleaned_products.csv,cleaned_products.csv,27666,1728725314885
dbfs:/tmp/cleaned_warehouses.csv,cleaned_warehouses.csv,86,1728725316596
dbfs:/tmp/customers.csv,customers.csv,11446,1728405968451


In [0]:
TABLES = {"customers", "employees", "offices", "orderdetails", "orders", "payments", "productlines", "products", "warehouses"}
for table in TABLES:
    input_path = f"dbfs:/tmp/{table}.csv"
    spark_df = load_data(input_path)
    df = to_pandas(spark_df)
    # print(f"Analyzing Table: {table}")
    # report = sv.analyze(df)
    # report.show_notebook()

#### **Clean All Tables**

In [0]:
# Define a mapping of SQL column types to Pandas data types for each table
SCHEMA = {
    "customers": {
        "customer_number": "int64",
        "customer_name": "str",
        "contact_last_name": "str",
        "contact_first_name": "str",
        "phone": "str",
        "address_line1": "str",
        "address_line2": "str",
        "city": "str",
        "state": "str",
        "postal_code": "str",
        "country": "str",
        "sales_rep_employee_number": "float64",  # Using float64 since it's INT in SQL but may contain NaN
        "credit_limit": "float64"
    },
    "employees": {
        "employee_number": "float64",
        "last_name": "str",
        "first_name": "str",
        "extension": "str",
        "email": "str",
        "office_code": "str",
        "reports_to": "float64",
        "job_title": "str"
    },
    "offices": {
        "office_code": "str",
        "city": "str",
        "phone": "str",
        "address_line1": "str",
        "address_line2": "str",
        "state": "str",
        "country": "str",
        "postal_code": "str",
        "territory": "str"
    },
    "orderdetails": {
        "order_number": "int64",
        "product_code": "str",
        "quantity_ordered": "int64",
        "price_each": "float64",
        "orderline_number": "int64"
    },
    "orders": {
        "order_number": "int64",
        "order_date": "datetime64",
        "required_date": "datetime64",
        "shipped_date": "datetime64",
        "status": "str",
        "comments": "str",
        "customer_number": "int64"
    },
    "payments": {
        "customer_number": "int64",
        "check_number": "str",
        "payment_date": "datetime64",
        "amount": "float64"
    },
    "productlines": {
        "product_line": "str",
        "text_description": "str",
        "html_description": "str",
        "image": "str"
    },
    "products": {
        "product_code": "str",
        "product_name": "str",
        "product_line": "str",
        "product_scale": "str",
        "product_vendor": "str",
        "product_description": "str",
        "quantity_instock": "int64",
        "warehouse_code": "str",
        "buy_price": "float64",
        "msrp": "float64"
    },
    "warehouses": {
        "warehouse_code": "str",
        "warehouse_name": "str",
        "warehousepctcap": "float64"
    }
}

def validate_data_types(df, table_name):
    """
    Validates the data types of a DataFrame's columns based on the schema defined for the table.
    
    Args:
        df (pd.DataFrame): The DataFrame to validate.
        table_name (str): The name of the table.
    
    Returns:
        pd.DataFrame: DataFrame with coerced data types.
    """
    if table_name not in SCHEMA:
        raise ValueError(f"Schema for table '{table_name}' not found.")
    
    schema = SCHEMA[table_name]
    
    for column, expected_dtype in schema.items():
        if column in df.columns:
            print(f"Validating column {column} for table {table_name} (expected: {expected_dtype})")
            try:
                # Handle datetime columns
                if expected_dtype.startswith("datetime"):
                    df[column] = pd.to_datetime(df[column], errors='coerce')
                # Handle numeric columns
                elif expected_dtype.startswith("float") or expected_dtype.startswith("int"):
                    df[column] = pd.to_numeric(df[column], errors='coerce')
                # For strings, ensure the type is string
                elif expected_dtype == "str":
                    df[column] = df[column].astype(str)
            except Exception as e:
                print(f"Error validating column {column}: {e}")
    
    return df


In [0]:
# Function to clean and validate each table
def clean_and_validate(df, table_name):
    """
    Cleans and validates a DataFrame based on the table schema.
    
    Args:
        df (pd.DataFrame): The DataFrame to clean and validate.
        table_name (str): The name of the table.
    
    Returns:
        pd.DataFrame: Cleaned and validated DataFrame.
    """
    # Remove duplicates
    df = df.drop_duplicates()
    
    # Validate data types
    df = validate_data_types(df, table_name)
    
    # Handle missing values (optional, customize based on table/column)
    df = df.dropna(how='all')  # Drop rows where all columns are NaN
    df = df.fillna(value="")   # Example: Fill NaNs with empty strings or default values
    
    return df

# Example of how to use this for one of the tables
# Assuming df is your DataFrame for 'customers'
# df_cleaned = clean_and_validate(df, 'customers')

#### **Save data into temp_cleaned**

In [0]:
import os

# Function to save the cleaned data to DBFS
def save_cleaned_data(df, output_path):
    """
    Saves the cleaned data to the specified DBFS path.
    
    Args:
        df (pd.DataFrame): The DataFrame to save.
        output_path (str): The path in DBFS to save the cleaned DataFrame.
    """
    # Step 1: Save to a local temporary path
    local_path = "/tmp/cleaned_temp.csv"
    df.to_csv(local_path, index=False)
    
    # Step 2: Move the local file to DBFS
    dbutils.fs.cp(f"file:{local_path}", output_path)
    print(f"Cleaned data saved to {output_path} in DBFS.")

In [0]:
# Main function to run the data cleaning pipeline
def run_data_pipeline():
    for table in TABLES:
        input_path = f"dbfs:/tmp/{table}.csv"
        output_path = f"dbfs:/tmp/cleaned_{table}.csv"
        
        # Load data from DBFS
        print(f"Loading data for {table} from {input_path}...")
        spark_df = load_data(input_path)
        df = to_pandas(spark_df)
        
        # Clean and validate the data
        print(f"Analyzing and cleaning Table: {table}")
        df_cleaned = clean_and_validate(df, table)
        
        # Save the cleaned data back to DBFS
        save_cleaned_data(df_cleaned, output_path)
        print(f"Cleaned data for {table} saved to {output_path}.")

# Run the data pipeline
run_data_pipeline()

Loading data for employees from dbfs:/tmp/employees.csv...
Analyzing and cleaning Table: employees
Validating column extension for table employees (expected: str)
Validating column email for table employees (expected: str)
Cleaned data saved to dbfs:/tmp/cleaned_employees.csv in DBFS.
Cleaned data for employees saved to dbfs:/tmp/cleaned_employees.csv.
Loading data for customers from dbfs:/tmp/customers.csv...
Analyzing and cleaning Table: customers
Validating column phone for table customers (expected: str)
Validating column city for table customers (expected: str)
Validating column state for table customers (expected: str)
Validating column country for table customers (expected: str)
Cleaned data saved to dbfs:/tmp/cleaned_customers.csv in DBFS.
Cleaned data for customers saved to dbfs:/tmp/cleaned_customers.csv.
Loading data for payments from dbfs:/tmp/payments.csv...
Analyzing and cleaning Table: payments
Validating column amount for table payments (expected: float64)
Cleaned data 