In [1]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import time
import os
import psutil

In [6]:
# Define file paths
csv_file = "data.csv"
parquet_file = "data.parquet"
cleaned_parquet_file = "clean_data.parquet"

# Function to check memory usage
def memory_usage():
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / (1024 * 1024)  # Convert bytes to MB

### 📌 Step 2: Non-Optimized Processing (Pandas)
start_time_pandas = time.time()
memory_start_pandas = memory_usage()

# Load CSV using Pandas (Non-Optimized)
df_pandas = pd.read_csv(csv_file, low_memory=False)

# Cleaning using Pandas
df_pandas = df_pandas.drop_duplicates()  # Remove duplicates
df_pandas.fillna(df_pandas.mean(numeric_only=True), inplace=True)  # Fill NaNs for numeric columns
df_pandas.fillna(df_pandas.mode().iloc[0], inplace=True)  # Fill NaNs for categorical columns

# Outlier Removal (Z-score method)
def remove_outliers(data, threshold=3):
    mean = data.mean()
    std = data.std()
    return data[(np.abs(data - mean) / std) < threshold]

for col in df_pandas.select_dtypes(include=["float64", "int64"]).columns:
    df_pandas[col] = remove_outliers(df_pandas[col])

# Normalize Numeric Features
numeric_cols = df_pandas.select_dtypes(include=["float64", "int64"]).columns
df_pandas[numeric_cols] = (df_pandas[numeric_cols] - df_pandas[numeric_cols].mean()) / df_pandas[numeric_cols].std()

memory_end_pandas = memory_usage()
time_pandas = time.time() - start_time_pandas

print(f"🔴 Non-Optimized Processing Completed in {time_pandas:.2f} seconds")
print(f"🔴 Memory Used (Pandas): {memory_end_pandas - memory_start_pandas:.2f} MB")



### 📊 Step 5: Performance Comparison
print("\n📊 Performance Comparison")
print("------------------------------------------------")
print(f"🔴 Pandas Processing Time: {time_pandas:.2f} sec")
print(f"🔴 Pandas Memory Usage   : {memory_end_pandas - memory_start_pandas:.2f} MB")

🔴 Non-Optimized Processing Completed in 25.26 seconds
🔴 Memory Used (Pandas): 2.14 MB

📊 Performance Comparison
------------------------------------------------
🔴 Pandas Processing Time: 25.26 sec
🔴 Pandas Memory Usage   : 2.14 MB


In [7]:
### 🚀 Step 1: Convert CSV to Parquet (Fast Storage)
print("Converting CSV to Parquet...")
df = pd.read_csv(csv_file, low_memory=False)  # Load CSV
df.to_parquet(parquet_file, engine="pyarrow")  # Convert to Parquet
print("Conversion complete!")

### 🚀 Step 3: Optimized Processing (Dask)
start_time_dask = time.time()
memory_start_dask = memory_usage()

# Load Parquet Using Dask (Optimized)
df_dask = dd.read_parquet(parquet_file)

# Data Cleaning using Dask
df_dask = df_dask.drop_duplicates()

# Fill NaNs for numeric and categorical columns
for col in df_dask.columns:
    if df_dask[col].dtype in ["float64", "int64"]:
        df_dask[col] = df_dask[col].fillna(df_dask[col].mean())
    else:
        df_dask[col] = df_dask[col].fillna(df_dask[col].mode().compute()[0])

# Outlier Removal (Z-score method)
for col in df_dask.columns:
    if df_dask[col].dtype in ["float64", "int64"]:
        df_dask[col] = df_dask[col].map_partitions(remove_outliers)

# Normalize Numeric Features
df_dask = df_dask.assign(**{
    col: df_dask[col].map_partitions(lambda x: (x - x.mean()) / x.std(), meta=(col, 'f8'))
    for col in numeric_cols
})


# Compute results (converts Dask DataFrame to Pandas)
df_dask = df_dask.compute()

memory_end_dask = memory_usage()
time_dask = time.time() - start_time_dask

print(f"✅ Optimized Processing (Dask + Parquet) Completed in {time_dask:.2f} seconds")
print(f"✅ Memory Used (Dask): {memory_end_dask - memory_start_dask:.2f} MB")

### 📌 Step 4: Save Cleaned Data (Optimized)
df_dask.to_parquet(cleaned_parquet_file, engine="pyarrow")
print("Cleaned dataset saved!")

print(f"✅ Dask Processing Time  : {time_dask:.2f} sec (🔼 {time_pandas / time_dask:.2f}x Faster)")
print(f"✅ Dask Memory Usage     : {memory_end_dask - memory_start_dask:.2f} MB (🔽 Less Memory Used)")

Converting CSV to Parquet...
Conversion complete!
✅ Optimized Processing (Dask + Parquet) Completed in 121.60 seconds
✅ Memory Used (Dask): 389.48 MB
Cleaned dataset saved!
✅ Dask Processing Time  : 121.60 sec (🔼 0.21x Faster)
✅ Dask Memory Usage     : 389.48 MB (🔽 Less Memory Used)
