In [5]:
import polars as pl
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime
import re

In [6]:
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)
RAW_PATH = "../data/raw/complaints.csv" 
OUTPUT_PATH = DATA_DIR / "processed/filtered_complaints.csv"
SUMMARY_PATH = DATA_DIR / "eda_summary.md"

In [7]:
CHUNK_SIZE = 100_000
TARGET_PRODUCTS = [
    "Credit card", "Personal loan", "Buy Now, Pay Later (BNPL)",
    "Savings account", "Money transfers"
]

In [8]:
total_rows = 0
null_counts = defaultdict(int)
narrative_lengths = []
product_counts = Counter()
monthly_counts = Counter()
missing_narratives = 0
duplicates = Counter()

In [10]:
def clean_narrative(text: str) -> str:
    if not text: return ""
    text = text.lower()
    text = re.sub(r"[^a-z0-9\s.,!?]", "", text)
    for phrase in [
        r"i am writing to file a complaint",
        r"please assist me",
        r"this is regarding my account"
    ]:
        text = re.sub(phrase, "", text, flags=re.IGNORECASE)
    return re.sub(r"\s+", " ", text).strip()
writer = None

In [11]:
# --- Chunked EDA ---
print("🚀 Starting chunked EDA...")
for chunk in pl.read_csv(CSV_PATH, low_memory=True, batch_size=CHUNK_SIZE).iter_chunks():
    total_rows += chunk.height

    # Nulls
    for col in chunk.columns:
        null_counts[col] += chunk[col].null_count()

    # Products
    if "Product" in chunk.columns:
        product_counts.update(chunk["Product"].drop_nulls().to_list())

    # Duplicates (approximate)
    if "Consumer complaint narrative" in chunk.columns:
        for text in chunk["Consumer complaint narrative"].drop_nulls():
            duplicates[text.strip()] += 1

    # Narrative length
    if "Consumer complaint narrative" in chunk.columns:
        for text in chunk["Consumer complaint narrative"]:
            if text is None:
                missing_narratives += 1
                continue
            words = len(str(text).split())
            narrative_lengths.append(words)

    # Time trend
    if "Date received" in chunk.columns:
        try:
            dates = pl.Series("received", chunk["Date received"]).str.strptime(pl.Date, "%Y-%m-%d")
            months = dates.dt.strftime("%Y-%m")
            monthly_counts.update(months.drop_nulls().to_list())
        except:
            pass

    # --- Filtered data for output ---
    if all(col in chunk.columns for col in ["Product", "Consumer complaint narrative"]):
        filtered = chunk.filter(
            (pl.col("Product").is_in(TARGET_PRODUCTS)) &
            (pl.col("Consumer complaint narrative").is_not_null())
        ).with_columns([
            pl.col("Consumer complaint narrative").apply(clean_narrative).alias("cleaned_narrative")
        ])
        if writer is None:
            writer = open(OUTPUT_PATH, "w", encoding="utf-8")
            writer.write(filtered.write_csv(separator=",", include_header=True))
        else:
            writer.write(filtered.write_csv(separator=",", include_header=False))

🚀 Starting chunked EDA...


NameError: name 'CSV_PATH' is not defined

In [3]:
print("🧩 Loading dataset (lazy)...")
lazy_df = pl.scan_csv(RAW_PATH)

🧩 Loading dataset (lazy)...


In [4]:
schema = lazy_df.collect_schema()
print("📘 Schema Overview:")
for col, dtype in schema.items():
    print(f" - {col}: {dtype}")

📘 Schema Overview:
 - Date received: String
 - Product: String
 - Sub-product: String
 - Issue: String
 - Sub-issue: String
 - Consumer complaint narrative: String
 - Company public response: String
 - Company: String
 - State: String
 - ZIP code: String
 - Tags: String
 - Consumer consent provided?: String
 - Submitted via: String
 - Date sent to company: String
 - Company response to consumer: String
 - Timely response?: String
 - Consumer disputed?: String
 - Complaint ID: Int64


In [None]:
total_rows = lazy_df.select(pl.len()).collect()[0, 0]
print(f"📊 Total Rows: {total_rows:,}")