# 03_prepare_final (housing / CSV)
**Authors**: <fill names> | **Owner**: <name> | **Reviewer**: <name> | **Date**: 2025-11-07

**Purpose:** Deterministic pipeline CSV -> CSV

> These notebooks assume the first ingest saved **CSV** outputs into `intermediate_data/` (not Parquet).
> Paths used:
> - Raw CSV: `price_paid_records/price_paid_records.csv`
> - Combined CSV: `intermediate_data/housing_all.csv`
> - Optional partitioned by year: `intermediate_data/partitioned_csv/year=YYYY.csv`


In [6]:
!pip install pandas --quiet


[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: C:\Users\User\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [7]:
!pip install pyarrow --quiet

from pathlib import Path
import pandas as pd
from pandas.api.types import is_numeric_dtype
import pyarrow as pa
import pyarrow.parquet as pq
import shutil

# Paths
INT_DIR = Path("../../data/intermediate/housing")
PROC_DIR = Path("../../data/housing")
INT_DIR.mkdir(parents=True, exist_ok=True)
PROC_DIR.mkdir(parents=True, exist_ok=True)

csv_path = INT_DIR / "housing_all.csv"
assert csv_path.exists(), f"Expected combined CSV at: {csv_path}"

# Outputs: ONE cleaned+standardized dataset in CSV + Parquet (dataset)
clean_csv      = PROC_DIR / "housing_cleaned_standardized.csv"
clean_parq_dir = PROC_DIR

# Reset outputs
if clean_csv.exists():
    clean_csv.unlink()
if clean_parq_dir.exists():
    shutil.rmtree(clean_parq_dir)
clean_parq_dir.mkdir(parents=True, exist_ok=True)



[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: C:\Users\User\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [8]:
def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = (
        df.columns.str.strip()
                  .str.lower()
                  .str.replace(r"\s+", "_", regex=True)
                  .str.replace(r"[^0-9a-zA-Z_]+", "", regex=True)
    )
    return df

def pipe_clean_chunk(df: pd.DataFrame) -> pd.DataFrame:
    df = normalize_columns(df).copy()

    # price -> numeric
    if "price" in df.columns:
        df["price"] = pd.to_numeric(df["price"], errors="coerce")

    # region-like columns -> keep as string
    for c in ["region", "property_type", "tenure"]:
        if c in df.columns:
            df[c] = df[c].astype("string")

    # year / month -> numeric (we'll cast to float later)
    for c in ["year", "month"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    # Chunk-level NA handling + outlier clipping for price
    if "price" in df.columns:
        median_price = df["price"].median()
        df["price"] = df["price"].fillna(median_price)

        q1, q3 = df["price"].quantile([0.25, 0.75])
        iqr = q3 - q1
        lo, hi = q1 - 1.5 * iqr, q3 + 1.5 * iqr
        df["price"] = df["price"].clip(lo, hi)

    # simple new-build flag
    df["is_new_build"] = False
    for c in df.columns:
        # only treat actual text columns as text
        if "new" in c and df[c].dtype in ("object", "string"):
            df["is_new_build"] = df[c].astype("string").str.contains("new", case=False, na=False)
            break

    # drop useless index-ish columns if present
    for c in ["unnamed_0", "index"]:
        if c in df.columns:
            df = df.drop(columns=c)

    return df


In [9]:
clean_header_written = False
total_clean_rows = 0

for i, chunk in enumerate(pd.read_csv(csv_path, chunksize=200_000, low_memory=False), start=1):
    # 1) Clean the chunk
    chunk_clean = pipe_clean_chunk(chunk)

    # 2) STANDARDIZE DTYPES
    from pandas.api.types import is_numeric_dtype
    for col in chunk_clean.columns:
        if is_numeric_dtype(chunk_clean[col]):
            chunk_clean[col] = chunk_clean[col].astype("float64")
        else:
            chunk_clean[col] = chunk_clean[col].astype("string")

    # 3) Append to cleaned CSV (full dataset)
    chunk_clean.to_csv(
        clean_csv,
        mode="a",
        index=False,
        header=not clean_header_written,
    )
    clean_header_written = True
    total_clean_rows += len(chunk_clean)

    # 4) Append to Parquet *dataset* (one file per chunk)
    table_clean = pa.Table.from_pandas(chunk_clean, preserve_index=False)
    pq.write_to_dataset(
        table_clean,
        root_path=clean_parq_dir,
        basename_template="part-{i}.parquet",  # <-- literal template, NOT f-string
    )

    if i % 10 == 0:
        print(f"Processed {i} chunks | clean rows so far: {total_clean_rows:,}")

print("Done.")
print("Cleaned & standardized CSV ->", clean_csv)
print("Cleaned & standardized Parquet dataset dir ->", clean_parq_dir)


Processed 10 chunks | clean rows so far: 2,000,000
Processed 20 chunks | clean rows so far: 4,000,000
Processed 30 chunks | clean rows so far: 6,000,000
Processed 40 chunks | clean rows so far: 8,000,000
Processed 50 chunks | clean rows so far: 10,000,000
Processed 60 chunks | clean rows so far: 12,000,000
Processed 70 chunks | clean rows so far: 14,000,000
Processed 80 chunks | clean rows so far: 16,000,000
Processed 90 chunks | clean rows so far: 18,000,000
Processed 100 chunks | clean rows so far: 20,000,000
Processed 110 chunks | clean rows so far: 22,000,000
Processed 120 chunks | clean rows so far: 24,000,000
Processed 130 chunks | clean rows so far: 26,000,000
Processed 140 chunks | clean rows so far: 28,000,000
Processed 150 chunks | clean rows so far: 30,000,000
Processed 160 chunks | clean rows so far: 32,000,000
Processed 170 chunks | clean rows so far: 34,000,000
Processed 180 chunks | clean rows so far: 36,000,000
Processed 190 chunks | clean rows so far: 38,000,000
Proces