# E-commerce Data Cleaning with DuckDB
## Data Quality Assessment and Cleaning Walkthrough

This notebook demonstrates a systematic approach to cleaning e-commerce product data 
spread across multiple files and formats. We'll use DuckDB as our primary processing 
engine, taking advantage of its ability to directly query different file formats
and perform efficient transformations.

Our source files:
1. main_catalog.csv - Primary product catalog
2. inventory_update.xlsx - Recent inventory updates
3. price_list.json - Latest pricing information
4. category_mapping.parquet - Category standardization mapping

First, let's set up our environment and file paths.

In [40]:
import duckdb
from pathlib import Path

Define project directory structure.
Note: Adjust this base path according to your setup.

In [41]:
PROJECT_ROOT = Path.home() / 'freelance'
DATA_DIR = PROJECT_ROOT / 'testing' / 'synthetic-data' / 'ecommerce'

Verify data directory exists.

In [42]:
if not DATA_DIR.exists():
    raise FileNotFoundError(f"Data directory not found: {DATA_DIR}")

Create path objects for our data files.

In [43]:
MAIN_CATALOG = DATA_DIR / 'main_catalog.csv'
INVENTORY_UPDATE = DATA_DIR / 'inventory_update.xlsx'
PRICE_LIST = DATA_DIR / 'price_list.json'
CATEGORY_MAPPING = DATA_DIR / 'category_mapping.parquet'

Verify all required files exist.

In [44]:
required_files = {
    'Main Catalog': MAIN_CATALOG,
    'Inventory Update': INVENTORY_UPDATE,
    'Price List': PRICE_LIST,
    'Category Mapping': CATEGORY_MAPPING
}

missing_files = [name for name, path in required_files.items() if not path.exists()]
if missing_files:
    raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}")

print("Data files found:")
for name, path in required_files.items():
    print(f"- {name}: {path}")

Data files found:
- Main Catalog: /home/james/freelance/testing/synthetic-data/ecommerce/main_catalog.csv
- Inventory Update: /home/james/freelance/testing/synthetic-data/ecommerce/inventory_update.xlsx
- Price List: /home/james/freelance/testing/synthetic-data/ecommerce/price_list.json
- Category Mapping: /home/james/freelance/testing/synthetic-data/ecommerce/category_mapping.parquet


Initialize DuckDB and load required extensions.

In [45]:
conn = duckdb.connect(':memory:')
conn.execute("INSTALL spatial;")
conn.execute("LOAD spatial;")

<duckdb.duckdb.DuckDBPyConnection at 0x7c8b421a9370>

Create standardized SKUs table.

In [46]:
conn.execute(f"""
CREATE TABLE IF NOT EXISTS clean_skus AS
WITH all_skus AS (
    SELECT DISTINCT sku FROM (
        SELECT sku FROM '{MAIN_CATALOG}'
        UNION ALL
        SELECT sku FROM st_read('{INVENTORY_UPDATE}')
        UNION ALL
        SELECT sku FROM read_json('{PRICE_LIST}', format='auto')
        UNION ALL
        SELECT sku FROM '{CATEGORY_MAPPING}'
    ) t
    WHERE sku IS NOT NULL
)
SELECT 
    sku as original_sku,
    CASE 
        WHEN LENGTH(sku) = 10 
        AND SUBSTRING(sku, 3, 1) = '-'
        AND SUBSTRING(sku, 6, 1) = '-'
        AND SUBSTRING(sku, 7) ~ '^[0-9]+$'
        THEN sku
        ELSE UPPER(SUBSTRING(sku, 1, 2)) || '-' ||
             UPPER(SUBSTRING(sku, 4, 2)) || '-' ||
             LPAD(REGEXP_REPLACE(SUBSTRING(sku, 7), '[^0-9]', '', 'g'), 4, '0')
    END as clean_sku
FROM all_skus;
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7c8b421a9370>

Verify SKU cleaning results.

In [47]:
conn.execute("""
SELECT 
    COUNT(*) as total_skus,
    COUNT(DISTINCT original_sku) as unique_original_skus,
    COUNT(DISTINCT clean_sku) as unique_clean_skus
FROM clean_skus;
""")
print("SKU Cleaning Results:")
print(conn.fetchdf())

SKU Cleaning Results:
   total_skus  unique_original_skus  unique_clean_skus
0        9187                  9187               9187


Create standardized prices table.

In [48]:
conn.execute(f"""
CREATE TABLE IF NOT EXISTS clean_prices AS
WITH price_sources AS (
    SELECT 
        cs.clean_sku,
        CASE 
            WHEN mc.price LIKE '$%' THEN CAST(REPLACE(mc.price, '$', '') AS DECIMAL(10,2))
            WHEN mc.price IS NOT NULL THEN CAST(mc.price AS DECIMAL(10,2))
            ELSE NULL
        END as catalog_price,
        CASE 
            WHEN pl.price LIKE '$%' THEN CAST(REPLACE(pl.price, '$', '') AS DECIMAL(10,2))
            WHEN pl.price IS NOT NULL THEN CAST(pl.price AS DECIMAL(10,2))
            ELSE NULL
        END as list_price
    FROM clean_skus cs
    LEFT JOIN '{MAIN_CATALOG}' mc ON cs.original_sku = mc.sku
    LEFT JOIN read_json('{PRICE_LIST}', format='auto') pl ON cs.original_sku = pl.sku
)
SELECT 
    clean_sku,
    COALESCE(list_price, catalog_price) as final_price,
    catalog_price IS NOT NULL as had_catalog_price,
    list_price IS NOT NULL as had_list_price
FROM price_sources;
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7c8b421a9370>

Verify price cleaning results.

In [49]:
conn.execute("""
SELECT 
    COUNT(*) as total_prices,
    COUNT(DISTINCT clean_sku) as unique_skus,
    ROUND(AVG(final_price), 2) as avg_price,
    COUNT(*) FILTER (WHERE had_catalog_price) as from_catalog,
    COUNT(*) FILTER (WHERE had_list_price) as from_price_list
FROM clean_prices;
""")
print("\nPrice Cleaning Results:")
print(conn.fetchdf())


Price Cleaning Results:
   total_prices  unique_skus  avg_price  from_catalog  from_price_list
0          9521         9187     253.87          5450             3771


Create standardized inventory table.

In [50]:
conn.execute(f"""
CREATE TABLE IF NOT EXISTS clean_inventory AS
WITH inventory_sources AS (
    SELECT 
        cs.clean_sku,
        GREATEST(COALESCE(mc.inventory, 0), 0) as catalog_inventory,
        mc.last_updated as catalog_date,
        GREATEST(COALESCE(iu.inventory, 0), 0) as update_inventory,
        iu.last_updated as update_date
    FROM clean_skus cs
    LEFT JOIN '{MAIN_CATALOG}' mc ON cs.original_sku = mc.sku
    LEFT JOIN st_read('{INVENTORY_UPDATE}') iu ON cs.original_sku = iu.sku
)
SELECT 
    clean_sku,
    CASE 
        WHEN update_date > catalog_date OR catalog_date IS NULL THEN update_inventory
        ELSE catalog_inventory
    END as final_inventory,
    GREATEST(update_date, catalog_date) as last_updated
FROM inventory_sources;
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7c8b421a9370>

Verify inventory cleaning results.

In [51]:
conn.execute("""
SELECT 
    COUNT(*) as total_inventory_records,
    COUNT(DISTINCT clean_sku) as unique_skus,
    ROUND(AVG(final_inventory), 2) as avg_inventory,
    COUNT(*) FILTER (WHERE final_inventory > 0) as in_stock_items
FROM clean_inventory;
""")
print("\nInventory Cleaning Results:")
print(conn.fetchdf())


Inventory Cleaning Results:
   total_inventory_records  unique_skus  avg_inventory  in_stock_items
0                     9463         9187          29.63            5620


Create standardized categories table.

In [52]:
conn.execute(f"""
CREATE TABLE IF NOT EXISTS clean_categories AS
WITH category_sources AS (
    SELECT 
        cs.clean_sku,
        COALESCE(
            -- Custom capitalization: uppercase first letter, lowercase rest
            CONCAT(UPPER(LEFT(TRIM(cm.category), 1)), LOWER(RIGHT(TRIM(cm.category), LENGTH(TRIM(cm.category))-1))),
            CONCAT(UPPER(LEFT(TRIM(mc.category), 1)), LOWER(RIGHT(TRIM(mc.category), LENGTH(TRIM(mc.category))-1)))
        ) as category,
        COALESCE(cm.subcategory, mc.subcategory) as subcategory
    FROM clean_skus cs
    LEFT JOIN '{CATEGORY_MAPPING}' cm ON cs.original_sku = cm.sku
    LEFT JOIN '{MAIN_CATALOG}' mc ON cs.original_sku = mc.sku
)
SELECT 
    clean_sku,
    category,
    subcategory
FROM category_sources;
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7c8b421a9370>

Verify category cleaning results.

In [53]:
conn.execute("""
SELECT 
    COUNT(*) as total_category_records,
    COUNT(DISTINCT clean_sku) as unique_skus,
    COUNT(DISTINCT category) as unique_categories,
    COUNT(DISTINCT subcategory) as unique_subcategories
FROM clean_categories;
""")
print("\nCategory Cleaning Results:")
print(conn.fetchdf())


Category Cleaning Results:
   total_category_records  unique_skus  unique_categories  \
0                    9625         9187                  7   

   unique_subcategories  
0                    26  


Let's also look at our category values to verify the capitalization worked.

In [54]:
conn.execute("""
SELECT DISTINCT category, subcategory
FROM clean_categories
WHERE category IS NOT NULL
ORDER BY category, subcategory
LIMIT 10;
""")
print("\nSample of Cleaned Categories:")
print(conn.fetchdf())


Sample of Cleaned Categories:
  category    subcategory
0             Accessories
1              Appliances
2           Arts & Crafts
3                   Audio
4                Building
5                 Cameras
6                Children
7                Clothing
8                Cookware
9                   Decor


Create final combined table.

In [55]:
conn.execute(f"""
CREATE TABLE IF NOT EXISTS combined_products AS
SELECT 
    s.clean_sku,
    s.original_sku,
    -- Get product name from main catalog
    mc.product_name,
    -- Get standardized category and subcategory
    c.category,
    c.subcategory,
    -- Get cleaned price
    p.final_price as price,
    -- Get cleaned inventory
    i.final_inventory as inventory,
    i.last_updated as inventory_date,
    -- Add data quality flags
    CASE 
        WHEN p.final_price IS NULL THEN 'Missing price'
        WHEN i.final_inventory IS NULL THEN 'Missing inventory'
        WHEN c.category IS NULL THEN 'Missing category'
        WHEN c.subcategory IS NULL THEN 'Missing subcategory'
        ELSE 'Complete'
    END as record_status
FROM clean_skus s
LEFT JOIN '{MAIN_CATALOG}' mc ON s.original_sku = mc.sku
LEFT JOIN clean_prices p ON s.clean_sku = p.clean_sku
LEFT JOIN clean_inventory i ON s.clean_sku = i.clean_sku
LEFT JOIN clean_categories c ON s.clean_sku = c.clean_sku;
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7c8b421a9370>

Analyze the combined dataset.

In [56]:
conn.execute("""
SELECT 
    record_status,
    COUNT(*) as count,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as percentage
FROM combined_products
GROUP BY record_status
ORDER BY count DESC;
""")
print("\nRecord Completeness Analysis:")
print(conn.fetchdf())


Record Completeness Analysis:
         record_status  count  percentage
0             Complete  12402       77.36
1        Missing price   2863       17.86
2  Missing subcategory    767        4.78


Check value ranges in combined data.

In [57]:
conn.execute("""
SELECT 
    COUNT(*) as total_records,
    COUNT(DISTINCT clean_sku) as unique_products,
    ROUND(MIN(price), 2) as min_price,
    ROUND(MAX(price), 2) as max_price,
    ROUND(AVG(price), 2) as avg_price,
    MIN(inventory) as min_inventory,
    MAX(inventory) as max_inventory,
    ROUND(AVG(inventory), 2) as avg_inventory,
    COUNT(DISTINCT category) as unique_categories,
    COUNT(DISTINCT subcategory) as unique_subcategories
FROM combined_products;
""")
print("\nCombined Dataset Statistics:")
print(conn.fetchdf())


Combined Dataset Statistics:
   total_records  unique_products  min_price  max_price  avg_price  \
0          16032             9187      10.09     504.68      265.4   

   min_inventory  max_inventory  avg_inventory  unique_categories  \
0            0.0           99.0          36.95                  7   

   unique_subcategories  
0                    26  


Export the cleaned dataset.
For Excel export, we'll cast decimals to float to avoid GDAL issues.

In [58]:
conn.execute("""
CREATE OR REPLACE VIEW export_view AS
SELECT 
    clean_sku,
    original_sku,
    product_name,
    category,
    subcategory,
    CAST(price AS FLOAT) as price,
    CAST(inventory AS INTEGER) as inventory,
    inventory_date,
    record_status
FROM combined_products;
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7c8b421a9370>

Export as Parquet.

In [59]:
conn.execute(f"""
COPY combined_products 
TO '{DATA_DIR}/cleaned_combined_products.parquet' 
(FORMAT PARQUET);
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7c8b421a9370>

Export as CSV.

In [60]:
conn.execute(f"""
COPY combined_products 
TO '{DATA_DIR}/cleaned_combined_products.csv'
(HEADER, DELIMITER ',');
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7c8b421a9370>

Export as Excel (using modified view for compatibility).

In [61]:
conn.execute(f"""
COPY export_view 
TO '{DATA_DIR}/cleaned_combined_products.xlsx'
(FORMAT GDAL, DRIVER 'xlsx');
""")

print("\nExported cleaned dataset to:")
print(f"- {DATA_DIR}/cleaned_combined_products.parquet (full precision)")
print(f"- {DATA_DIR}/cleaned_combined_products.csv (full precision)")
print(f"- {DATA_DIR}/cleaned_combined_products.xlsx (floating point precision)")


Exported cleaned dataset to:
- /home/james/freelance/testing/synthetic-data/ecommerce/cleaned_combined_products.parquet (full precision)
- /home/james/freelance/testing/synthetic-data/ecommerce/cleaned_combined_products.csv (full precision)
- /home/james/freelance/testing/synthetic-data/ecommerce/cleaned_combined_products.xlsx (floating point precision)


Verify the exports worked.

In [62]:
conn.execute("""
SELECT 
    COUNT(*) as row_count,
    COUNT(DISTINCT clean_sku) as unique_skus,
    MIN(price) as min_price,
    MAX(price) as max_price,
    MIN(inventory) as min_inventory,
    MAX(inventory) as max_inventory
FROM export_view;
""")
print("\nExport Verification:")
print(conn.fetchdf())


Export Verification:
   row_count  unique_skus  min_price   max_price  min_inventory  max_inventory
0      16032         9187      10.09  504.679993              0             99
