In [48]:
import pandas as pd
from datetime import datetime
import os
import shutil

In [49]:
#importing the extracted files for transformation 
root_extract_dir = '../extract'
customers_data = pd.read_csv(f'{root_extract_dir}/customers_large.csv')
orders_data = pd.read_csv(f'{root_extract_dir}/orders_large.csv')
returns_data = pd.read_csv(f'{root_extract_dir}/returns_large.csv')

In [50]:
#converting the data to pd dataframe
customer_df = pd.DataFrame(customers_data)
orders_df = pd.DataFrame(orders_data)
returns_df = pd.DataFrame(returns_data)

In [51]:
# Initializing tracking dictionaries
rows_in = {'customers':len(customer_df),'orders':len(orders_df),'returns':len(returns_df)}
rows_out = {'customers':0,'orders':0,'returns':0}
processed_time = datetime.now()
summary={'customers':{},'orders':{},'returns':{}}

In [52]:
#customers Transformation

#Trim whitespace in name
customer_df['name'] = customer_df['name'].str.strip()
#Convert signup_date to datetime (handle format inconsistencies)
customer_df['signup_date'] = pd.to_datetime(customer_df['signup_date'], errors='coerce')
count_of_invalid_customer_date = customer_df['signup_date'].isna().sum()

#Drop invalid signup_date rows
customer_df = customer_df[customer_df['signup_date'].notna()]

#Drop rows with missing customer_id or name
missing_id_count = customer_df['customer_id'].isna().sum()
missing_name_count = customer_df['name'].isna().sum()
total_null_count = missing_id_count + missing_name_count

summary['customers']['missing_id_name_count']=total_null_count
summary['customers']['count_of_invalid_customer_date']=count_of_invalid_customer_date

customer_df.dropna(subset=['name','customer_id'], inplace=True)
customer_df.drop_duplicates(inplace=True)
rows_out['customers'] = len(customer_df)




In [53]:
#orders Transformation
#Convert order_date to datetime (log and drop invalid)
orders_df['order_date'] = pd.to_datetime(orders_df['order_date'], errors='coerce')
count_of_invalid_order_date = orders_df['order_date'].isna().sum()
orders_df.dropna(subset=['order_date'], inplace=True)

#Convert amount to float (handle missing or malformed values)
orders_df['amount'] = pd.to_numeric(orders_df['amount'], errors='coerce')
count_of_invalid_order_amount = orders_df['amount'].isna().sum()

#count of missing order_id
count_of_missing_order_id = orders_df['order_id'].isna().sum()
count_of_missing_customer_id = orders_df['customer_id'].isna().sum()
total_missing_orders_nulls = count_of_missing_order_id + count_of_missing_customer_id

#Drop rows with missing amount
orders_df.dropna(subset=['amount'], inplace=True)

#Drop rows with missing order_id or customer_id
orders_df.dropna(subset=['order_id','customer_id'], inplace=True)
#collect duplicates count before dropping
count_of_duplicates = orders_df.duplicated().sum()
# drop duplicates
orders_df.drop_duplicates(inplace=True)
#Normalise product_category to lowercase with underscores
orders_df['product_category'] = orders_df['product_category'].str.strip().str.lower().str.replace(' ', '_')

rows_out['orders']=len(orders_df)
summary['orders']['count_of_invalid_order_amount']=count_of_invalid_order_amount
summary['orders']['count_of_invalid_order_date']=count_of_invalid_order_date
summary['orders']['missing_order_id_customer_id_count']=total_missing_orders_nulls
summary['orders']['count_of_duplicates']=count_of_duplicates

In [54]:
# Returns Transformation
#remove rows with order_id not found in orders
count_missing_order_ids = returns_df['order_id'].isna().sum()
summary['returns']['missing_order_id_count']=count_missing_order_ids

#Ensure return_date is parsed to datetime
returns_df['return_date'] = pd.to_datetime(returns_df['return_date'], errors='coerce') #coerce errors to NaT
count_of_invalid_return_date = returns_df['return_date'].isna().sum()
summary['returns']['count_of_invalid_return_date']=count_of_invalid_return_date

# Drop rows with missing order_id
returns_df.dropna(subset=['order_id'], inplace=True)

# Drop rows with invalid return_date
returns_df.dropna(subset=['return_date'], inplace=True)

rows_out['returns']=len(returns_df)

In [55]:
# collect and collate summary statistics
report = {
    'Customers': {
        'Rows In': rows_in['customers'],
        'Rows Out': rows_out['customers'],
        'Missing ID/Name Count': summary['customers']['missing_id_name_count'],
        'Invalid Signup Date Count': summary['customers']['count_of_invalid_customer_date'],
        'Nulls Dropped': summary['customers']['missing_id_name_count'] + summary['customers']['count_of_invalid_customer_date']
    },
    'Orders': {
        'Rows In': rows_in['orders'],
        'Rows Out': rows_out['orders'],
        'Invalid Order Date Count': summary['orders']['count_of_invalid_order_date'],
        'Invalid Order Amount Count': summary['orders']['count_of_invalid_order_amount'],
        'Missing Order ID/Customer ID Count': summary['orders']['missing_order_id_customer_id_count'],
        'Duplicates Dropped': summary['orders']['count_of_duplicates'],
        'Nulls Dropped': summary['orders']['count_of_invalid_order_date'] + summary['orders']['count_of_invalid_order_amount'] + summary['orders']['missing_order_id_customer_id_count']
    },
    'Returns': {
        'Rows In': rows_in['returns'],
        'Rows Out': rows_out['returns'],
        'Missing Order ID Count': summary['returns']['missing_order_id_count'],
        'Invalid Return Date Count': summary['returns']['count_of_invalid_return_date'],
        'Nulls Dropped': summary['returns']['missing_order_id_count'] + summary['returns']['count_of_invalid_return_date']
    }
}

In [56]:

# Save Transformed DataFrames to CSV files and save into the transform directory
#Re-save data as CSV files for Loading step without index

#check to be sure the directory to save the processed files exists and create it if it does not exist and delete any existing files in the directory
cleaned_data_dir = '../data/processed'

if not os.path.exists(cleaned_data_dir):
    os.makedirs(cleaned_data_dir)
    
#delete any existing files in the directory
for filename in os.listdir(cleaned_data_dir):
    file_path = os.path.join(cleaned_data_dir, filename) # get the full file path
    try:
        if os.path.isfile(file_path) or os.path.islink(file_path): # check if it's a file or a link
            os.unlink(file_path) # delete the file or link
        elif os.path.isdir(file_path): # check if it's a directory
            shutil.rmtree(file_path) # delete the directory and its contents
    except Exception as e:
        print(f'Failed to delete existing file {file_path}. Reason: {e}') # log any errors
        SystemExit(1)
        
#save the dataframes as CSV files in the processed directory
customers_data.to_csv('../data/processed/customers_large.csv', index=False)
orders_data.to_csv('../data/processed/orders_large.csv', index=False)
returns_data.to_csv('../data/processed/returns_large.csv', index=False)
    
print("Data processed and saved successfully....................")

#print report summary
print("\nData Transformation Summary Report:")
for table, stats in report.items():
    print(f"\n{table} Table:")
    for stat_name, value in stats.items():
        print(f"  {stat_name}: {value}")


Data processed and saved successfully....................

Data Transformation Summary Report:

Customers Table:
  Rows In: 1000
  Rows Out: 985
  Missing ID/Name Count: 10
  Invalid Signup Date Count: 5
  Nulls Dropped: 15

Orders Table:
  Rows In: 5000
  Rows Out: 4788
  Invalid Order Date Count: 109
  Invalid Order Amount Count: 43
  Missing Order ID/Customer ID Count: 60
  Duplicates Dropped: 0
  Nulls Dropped: 212

Returns Table:
  Rows In: 500
  Rows Out: 500
  Missing Order ID Count: 0
  Invalid Return Date Count: 0
  Nulls Dropped: 0
