# HDB Resale Flat Prices ‚Äî ETL Pipeline

This notebook runs the full end-to-end ETL pipeline for HDB resale flat prices (Mar 2012 ‚Äì Dec 2016).

### Pipeline Stages
| Stage | Description |
|-------|-------------|
| **0. Download** | Fetch raw CSVs from data.gov.sg via API |
| **1. Load** | Read and align both CSV snapshots |
| **2.5. Profile (Raw)** | Statistical summary of raw master dataset (pre-cleaning baseline) |
| **3. Clean** | Type casting, null drops, lease recomputation |
| **4. Deduplicate** | Remove duplicate records, save audit file |
| **5. Validate** | Apply business rules, flag violations |
| **6. Anomaly Detection** | 3-sigma price outlier detection per town/flat type |
| **7. Profile (Cleaned)** | Statistical summary of final cleaned dataset (post-cleaning) |
| **8. Transform** | Create synthetic Resale Identifier |
| **9. Hash** | SHA-256 hash the Resale Identifier |

> **Prerequisites:** Ensure `download_hdb_data.py` is in the same folder as this notebook.
> Install dependencies: `pip install pandas requests`

## Stage 0 ‚Äî Download Raw Data

> **How this works:** `%run` executes `download_hdb_data.py` directly inside the notebook kernel,
> so all `print()` output appears here in real time.

> **Requirement:** `download_hdb_data.py` must be in the **same folder** as this notebook.

The script will:
1. Connect to the data.gov.sg API
2. Auto-discover matching datasets by keyword
3. Download both CSV files into the `hdb_data/` folder

In [None]:
import os

# ‚îÄ‚îÄ Preflight check ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
DOWNLOADER = 'download_hdb_data.py'

if not os.path.isfile(DOWNLOADER):
    raise FileNotFoundError(
        f"'{DOWNLOADER}' not found in '{os.getcwd()}'.\n"
        f"Please copy download_hdb_data.py into the same folder as this notebook."
    )

print(f'‚úì {DOWNLOADER} found ‚Äî starting download...')
print('=' * 60)

# ‚îÄ‚îÄ Run downloader in-kernel so all output is visible ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
%run download_hdb_data.py

In [None]:
# ‚îÄ‚îÄ Verify downloaded files exist before proceeding ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
EXPECTED_FILES = [
    os.path.join('hdb_data', 'Resale_Flat_Prices_Based_on_Registration_Date_From_Mar_2012_to_Dec_2014.csv'),
    os.path.join('hdb_data', 'Resale_Flat_Prices_Based_on_Registration_Date_From_Jan_2015_to_Dec_2016.csv'),
]

print('Checking downloaded files...')
all_found = True
for f in EXPECTED_FILES:
    if os.path.isfile(f):
        size_kb = os.path.getsize(f) / 1024
        print(f'  ‚úì {f}  ({size_kb:.1f} KB)')
    else:
        print(f'  ‚ùå MISSING: {f}')
        all_found = False

if not all_found:
    raise RuntimeError('Some files are missing. Re-run the download cell above before continuing.')
else:
    print('\n‚úÖ All files present ‚Äî safe to proceed to Stage 1.')

## Stage 1 ‚Äî Imports & Configuration

In [None]:
import os
import sys
import pandas as pd
import hashlib
import re
import time
from datetime import datetime

print('‚úì Libraries imported')

In [None]:
# ‚îÄ‚îÄ Output directories ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
RAW_DIR          = 'hdb_data'
OUTPUT_DIR       = 'output'
RAW_OUT_DIR      = os.path.join(OUTPUT_DIR, 'raw')
CLEANED_OUT_DIR  = os.path.join(OUTPUT_DIR, 'cleaned')
TRANSFORM_OUT_DIR= os.path.join(OUTPUT_DIR, 'transformed')
HASHED_OUT_DIR   = os.path.join(OUTPUT_DIR, 'hashed')
FAILED_OUT_DIR   = os.path.join(OUTPUT_DIR, 'failed')
AUDIT_OUT_DIR    = os.path.join(OUTPUT_DIR, 'audit')
PROFILE_OUT_DIR  = os.path.join(OUTPUT_DIR, 'profiling')

for d in [RAW_OUT_DIR, CLEANED_OUT_DIR, TRANSFORM_OUT_DIR,
          HASHED_OUT_DIR, FAILED_OUT_DIR, AUDIT_OUT_DIR, PROFILE_OUT_DIR]:
    os.makedirs(d, exist_ok=True)

print('‚úì Output directories created')

In [None]:
# ‚îÄ‚îÄ Input CSV files ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
CSV_FILES = [
    'Resale_Flat_Prices_Based_on_Registration_Date_From_Mar_2012_to_Dec_2014.csv',
    'Resale_Flat_Prices_Based_on_Registration_Date_From_Jan_2015_to_Dec_2016.csv'
]
CSV_PATHS = [os.path.join(RAW_DIR, f) for f in CSV_FILES]

EXPECTED_START = pd.Period('2012-03', freq='M')
EXPECTED_END   = pd.Period('2016-12', freq='M')

print('‚úì CSV paths configured')
for p in CSV_PATHS:
    exists = '‚úì Found' if os.path.isfile(p) else '‚ùå Missing'
    print(f'  {exists}: {p}')

In [None]:
# ‚îÄ‚îÄ Validation reference sets ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
VALID_TOWNS = {
    'ANG MO KIO','BEDOK','BISHAN','BUKIT BATOK','BUKIT MERAH',
    'BUKIT PANJANG','BUKIT TIMAH','CENTRAL AREA','CHOA CHU KANG','CLEMENTI',
    'GEYLANG','HOUGANG','JURONG EAST','JURONG WEST','KALLANG/WHAMPOA',
    'MARINE PARADE','PASIR RIS','PUNGGOL','QUEENSTOWN','SEMBAWANG',
    'SENGKANG','SERANGOON','TAMPINES','TOA PAYOH','WOODLANDS','YISHUN'
}

VALID_FLAT_TYPES = {'1 ROOM','2 ROOM','3 ROOM','4 ROOM','5 ROOM','EXECUTIVE','MULTI-GENERATION'}

VALID_FLAT_MODELS = {
    '2-room','Adjoined flat','Apartment','DBSS','Improved','Improved-Maisonette',
    'Maisonette','Model A','Model A2','Model A-Maisonette','Multi Generation',
    'New Generation','Premium Apartment','Premium Apartment Loft','Premium Maisonette',
    'Simplified','Standard','Terrace','Type S1','Type S2'
}

VALID_STOREY_FORMAT = r'^\d{2} TO \d{2}$'

print(f'‚úì Validation sets loaded: {len(VALID_TOWNS)} towns, {len(VALID_FLAT_TYPES)} flat types, {len(VALID_FLAT_MODELS)} flat models')

## Stage 2 ‚Äî Load & Align Raw Snapshots

Reads both CSV files and aligns their columns (via `reindex`) before concatenating into one master DataFrame.

In [None]:
def timed_step(name, func, *args, **kwargs):
    start = time.time()
    result = func(*args, **kwargs)
    elapsed = time.time() - start
    print(f'‚è± {name} executed in {elapsed:.2f}s')
    return result

def load_and_align_snapshots():
    print('üì¶ Loading and aligning CSV snapshots...')
    dfs = []
    for path in CSV_PATHS:
        print(f'   - {path}')
        df = pd.read_csv(path)
        print(f'     ‚Üí {len(df):,} rows, {len(df.columns)} columns')
        dfs.append(df)
    all_columns = sorted(set().union(*(df.columns for df in dfs)))
    aligned_dfs = [df.reindex(columns=all_columns) for df in dfs]
    master_df = pd.concat(aligned_dfs, ignore_index=True)
    return master_df

df = timed_step('Load & Align CSVs', load_and_align_snapshots)

raw_file = os.path.join(RAW_OUT_DIR, 'hdb_resale_raw.csv')
df.to_csv(raw_file, index=False)
print(f'üíæ Raw dataset saved: {raw_file}')
print(f'\nShape: {df.shape[0]:,} rows √ó {df.shape[1]} columns')
df.head(3)

## Stage 2.5 ‚Äî Data Profiling (Pre-Cleaning Baseline)

Profiles the **raw master dataset** immediately after loading ‚Äî before any type casting,
cleaning, deduplication, or validation takes place.

This baseline snapshot serves two purposes:
1. **Audit trail** ‚Äî documents the original data quality at ingestion time.
2. **Before/after comparison** ‚Äî compare `profile_raw.csv` with `profile_cleaned.csv`
   (Stage 7) to quantify exactly how many rows, nulls, and duplicates were removed.

> ‚ö†Ô∏è `profile_dataset()` is defined in Stage 7. If running cells out of order, run Stage 7 first.
> Numeric stats (price min/max/mean) will appear as NaN here ‚Äî type casting happens in Stage 3.

> Output: `output/profiling/profile_raw.csv`

In [None]:
# Null counts are restricted to source columns only.
# Derived cols (remaining_lease, price_anomaly, block_numeric, year_month)
# are excluded to keep pre- and post-cleaning profiles comparable.
SOURCE_COLS_FOR_PROFILING = [
    'block', 'flat_model', 'flat_type', 'floor_area_sqm',
    'lease_commence_date', 'month', 'resale_price',
    'storey_range', 'street_name', 'town'
]

def profile_dataset(df, label='dataset'):
    """
    Statistical profile for a DataFrame.
    label : 'raw_master' or 'cleaned_final' ‚Äî stored in output for easy comparison.
    Null counts cover SOURCE_COLS_FOR_PROFILING only (no derived columns).
    """
    profile = {'profile_label': label}
    profile['total_rows']    = len(df)
    profile['total_columns'] = len(df.columns)

    # Null counts ‚Äî source columns only, skip if column absent (raw vs cleaned differ)
    for col in SOURCE_COLS_FOR_PROFILING:
        if col in df.columns:
            profile[f"null_count_{col}"] = int(df[col].isna().sum())

    # Numeric distributions
    for col in ['resale_price', 'floor_area_sqm']:
        if col in df.columns:
            profile[f"{col}_min"]    = df[col].min()
            profile[f"{col}_max"]    = df[col].max()
            profile[f"{col}_mean"]   = round(float(df[col].mean()), 2)
            profile[f"{col}_median"] = df[col].median()

    profile['duplicate_rows'] = int(df.duplicated().sum())
    return profile

profile_raw = timed_step('Profile Raw Dataset', profile_dataset, df, label='raw_master')
path = os.path.join(PROFILE_OUT_DIR, 'profile_raw.csv')
pd.DataFrame([profile_raw]).to_csv(path, index=False)

print(f'üìä Post-cleaning profiling report saved: {path}')

print('\n‚îÄ‚îÄ Key Statistics (Cleaned) ‚îÄ‚îÄ')
print(f"  Rows          : {profile_raw['total_rows']:,}")
print(f"  Columns       : {profile_raw['total_columns']}")
print(f"  Duplicates    : {profile_raw['duplicate_rows']}")
print(f"  Price min     : ${profile_raw['resale_price_min']:,.0f}")
print(f"  Price max     : ${profile_raw['resale_price_max']:,.0f}")
print(f"  Price mean    : ${profile_raw['resale_price_mean']:,.0f}")
print(f"  Price median  : ${profile_raw['resale_price_median']:,.0f}")

## Stage 3 ‚Äî Data Cleaning

- Cast `month`, `resale_price`, `floor_area_sqm` to correct types
- Drop rows with nulls in critical fields
- Recompute `remaining_lease` from `lease_commence_date`

In [None]:
# Type casting
df['month']          = pd.to_datetime(df['month'], format='%Y-%m', errors='coerce')
df['resale_price']   = pd.to_numeric(df['resale_price'], errors='coerce')
df['floor_area_sqm'] = pd.to_numeric(df['floor_area_sqm'], errors='coerce')

before = len(df)
df = df.dropna(subset=['month', 'resale_price', 'floor_area_sqm'])
after = len(df)
print(f'‚úì Type casting complete')
print(f'  Rows dropped (null critical fields): {before - after:,}')
print(f'  Rows remaining: {after:,}')

In [None]:
def recompute_remaining_lease(df):
    today = pd.Timestamp.today()
    def compute(row):
        lease_start = row.get('lease_commence_date')
        if pd.isna(lease_start):
            return None
        end = pd.Timestamp(year=int(lease_start), month=1, day=1) + pd.DateOffset(years=99)
        if end < today:
            return '0 years 0 months'
        months_remaining = (end.year - today.year) * 12 + (end.month - today.month)
        return f'{months_remaining//12} years {months_remaining%12} months'
    df['remaining_lease'] = df.apply(compute, axis=1)
    return df

df = recompute_remaining_lease(df)
print('‚úì remaining_lease recomputed')
print(f'  Sample values:')
print(df[['lease_commence_date','remaining_lease']].drop_duplicates().head(5).to_string(index=False))

## Stage 4 ‚Äî Deduplication

Identifies duplicate records where all fields except `resale_price` are identical.
Keeps the row with the **higher** price and saves removed duplicates to audit.

In [None]:
def deduplicate_dataset(df):
    """
    Remove duplicates sharing the same composite key (all columns except
    resale_price). Keeps the higher-priced row; discards the lower.
    Discarded rows are tagged with failure_reason for the failed dataset.
    """
    key_cols   = [c for c in df.columns if c != 'resale_price']
    df_sorted  = df.sort_values('resale_price', ascending=False)
    df_cleaned = df_sorted.drop_duplicates(subset=key_cols, keep='first')
    df_dupes   = df_sorted.loc[~df_sorted.index.isin(df_cleaned.index)].copy()
    if not df_dupes.empty:
        df_dupes['failure_reason'] = 'duplicate_key_lower_price'
    return df_cleaned, df_dupes

df_cleaned, df_duplicates = deduplicate_dataset(df)

print(f'‚úì Deduplication complete')
print(f'  Original rows : {len(df):,}')
print(f'  Duplicates    : {len(df_duplicates):,}')
print(f'  Clean rows    : {len(df_cleaned):,}')

if not df_duplicates.empty:
    path = os.path.join(AUDIT_OUT_DIR, 'duplicates.csv')
    df_duplicates.to_csv(path, index=False)
    print(f'  ‚ö†Ô∏è  Duplicates saved: {path}')
else:
    print('  ‚úì No duplicates found')

## Stage 5 ‚Äî Business Rule Validation

Applies 6 domain-specific rules row by row. Any failing row is captured with a `comments` column.

In [None]:
def extra_validation(df):
    """
    Vectorised business-rule validation. Applies each rule across the full
    column at once rather than row-by-row ‚Äî far faster on large DataFrames.

    Rules:
    1. resale_price      > 0
    2. floor_area_sqm    > 0 
    3. town              in VALID_TOWNS
    4. flat_type         in VALID_FLAT_TYPES
    5. flat_model        in VALID_FLAT_MODELS
    6. storey_range      matches DD TO DD format AND lower ‚â§ upper
    7. month             within Mar 2012 ‚Äì Dec 2016
    """
    issues = pd.DataFrame(index=df.index)

    # Rule 1: resale_price > 0
    issues['invalid_resale_price'] = df['resale_price'] <= 0

    # Rule 2: floor_area_sqm > 0
    issues['invalid_floor_area_sqm'] = (df['floor_area_sqm'] <= 0)

    # Rules 3-5: categorical membership
    issues['invalid_town']       = ~df['town'].isin(VALID_TOWNS)
    issues['invalid_flat_type']  = ~df['flat_type'].isin(VALID_FLAT_TYPES)
    issues['invalid_flat_model'] = ~df['flat_model'].isin(VALID_FLAT_MODELS)

    # Rule 6: storey_range format AND logical order (lower storey ‚â§ upper storey)
    fmt_ok = df['storey_range'].astype(str).str.match(VALID_STOREY_FORMAT, na=False)
    def _logical_order(val):
        m = re.match(VALID_STOREY_FORMAT, str(val))
        if not m: return False
        lo, hi = int(str(val)[:2]), int(str(val)[6:8])
        return lo <= hi
    logical_ok = df['storey_range'].apply(_logical_order)
    issues['invalid_storey_range'] = ~(fmt_ok & logical_ok)

    # Rule 7: month within expected range
    month_period = df['month'].dt.to_period('M')
    issues['month_out_of_range'] = (month_period < EXPECTED_START) | (month_period > EXPECTED_END)

    # Build failed rows with consolidated comments
    rule_cols = issues.columns.tolist()
    any_fail  = issues.any(axis=1)
    df_fail   = df.loc[any_fail].copy()
    df_fail['comments']       = issues[any_fail].apply(
        lambda row: '; '.join(col for col in rule_cols if row[col]), axis=1
    )
    df_fail['failure_reason'] = 'rule_violation'
    return df_fail

df_rules_fail = timed_step('Business Rule Validation', extra_validation, df_cleaned)

print(f'‚úì Validation complete')
print(f'  Rule violations : {len(df_rules_fail):,} rows')

if not df_rules_fail.empty:
    path = os.path.join(AUDIT_OUT_DIR, 'rule_violations.csv')
    df_rules_fail.to_csv(path, index=False)
    print(f'  ‚ö†Ô∏è  Violations saved: {path}')
    print(df_rules_fail['comments'].value_counts().head(10))
else:
    print('  ‚úì No rule violations found')

## Stage 6 ‚Äî Anomaly Detection (3-Sigma)

Detects statistically unusual resale prices using the **3-sigma (Z-score) method**.

### Why 3-Sigma?
Based on the Empirical Rule (68-95-99.7 Rule):

| Sigma | Data Coverage | Frequency | Decision |
|-------|--------------|-----------|----------|
| 1œÉ | ~68% | 1 in 3 | Too sensitive |
| 2œÉ | ~95% | 1 in 20 | Too many false positives |
| **3œÉ** | **~99.7%** | **1 in 370** | **Selected threshold ‚úì** |

### Why localised grouping (per Town + Flat Type)?
Each flat is compared only against its own peer group (same town, same flat type) ‚Äî
so a \$900k Executive flat is never unfairly penalised for being more expensive than a 3-room flat next door.

> **Assumption:** Prices within each Town + Flat Type group are approximately normally distributed.
> If heavily skewed, consider Median Absolute Deviation (MAD) as a more robust alternative.

In [None]:
def detect_anomalous_prices(df):
    """
    3-sigma anomaly detection per (town, flat_type) peer group.
    Works on a copy ‚Äî never mutates the caller's DataFrame.
    Groups with < 3 members are skipped (std is undefined/meaningless).
    """
    df_copy = df.copy()
    anomalies_list = []

    for (town, flat_type), group in df_copy.groupby(['town', 'flat_type']):
        if len(group) < 3:
            continue
        mean  = group['resale_price'].mean()
        std   = group['resale_price'].std()
        lower = mean - 3 * std
        upper = mean + 3 * std
        anomalies = group[
            (group['resale_price'] < lower) | (group['resale_price'] > upper)
        ].copy()
        if not anomalies.empty:
            anomalies['failure_reason'] = 'price_anomaly_3sigma'
            anomalies_list.append(anomalies)

    return pd.concat(anomalies_list) if anomalies_list else pd.DataFrame()

df_anomalies = timed_step('Anomaly Detection', detect_anomalous_prices, df_cleaned)

print(f'‚úì Anomaly detection complete')
print(f'  Anomalous rows: {len(df_anomalies):,}')

if not df_anomalies.empty:
    path = os.path.join(AUDIT_OUT_DIR, 'anomalies.csv')
    df_anomalies.to_csv(path, index=False)
    print(f'  ‚ö†Ô∏è  Anomalies saved: {path}')
    print('\n  Top anomalous towns:')
    print(df_anomalies['town'].value_counts().head(5))
else:
    print('  ‚úì No anomalies detected')

## Stage 6b ‚Äî Combine Failed Records & Finalise Cleaned Dataset

In [None]:
# Merge all failed records
failed_records = pd.concat([df_duplicates, df_rules_fail, df_anomalies]).drop_duplicates()
print(f'Total failed records (dupes + violations + anomalies): {len(failed_records):,}')

if not failed_records.empty:
    path = os.path.join(FAILED_OUT_DIR, 'hdb_resale_failed.csv')
    failed_records.to_csv(path, index=False)
    print(f'  ‚ö†Ô∏è  Failed records saved: {path}')

# Remove failed from cleaned
df_cleaned_final = df_cleaned.loc[~df_cleaned.index.isin(failed_records.index)]
cleaned_file = os.path.join(CLEANED_OUT_DIR, 'hdb_resale_cleaned.csv')
df_cleaned_final.to_csv(cleaned_file, index=False)

print(f'\nüíæ Cleaned dataset saved: {cleaned_file}')
print(f'   Final rows: {len(df_cleaned_final):,}')
df_cleaned_final.head(3)

## Stage 7 ‚Äî Data Profiling (Post-Cleaning)

Generates a statistical summary of the **final cleaned dataset** (after deduplication, validation, and anomaly removal).
Compare `profile_cleaned.csv` against `profile_raw.csv` (produced in Stage 2.5) to quantify the cleaning impact.

In [None]:
def profile_dataset(df, label="dataset"):
    """Statistical profile for a DataFrame. `label` distinguishes raw vs cleaned profiles."""
    profile = {'profile_label': label}
    profile['total_rows']    = len(df)
    profile['total_columns'] = len(df.columns)

    # Null counts ‚Äî source columns only, skip if column absent (raw vs cleaned differ)
    for col in SOURCE_COLS_FOR_PROFILING:
        if col in df.columns:
            profile[f"null_count_{col}"] = int(df[col].isna().sum())

    # Numeric distributions
    for col in ['resale_price', 'floor_area_sqm']:
        if col in df.columns:
            profile[f"{col}_min"]    = df[col].min()
            profile[f"{col}_max"]    = df[col].max()
            profile[f"{col}_mean"]   = round(float(df[col].mean()), 2)
            profile[f"{col}_median"] = df[col].median()

    profile['duplicate_rows'] = int(df.duplicated().sum())
    return profile

path = os.path.join(PROFILE_OUT_DIR, "profile_cleaned.csv")
profile_clean = timed_step("Profile Cleaned Dataset", profile_dataset, df_cleaned_final, label="cleaned_final")
pd.DataFrame([profile_clean]).to_csv(path, index=False)
print(f"üìä Post-cleaning profile saved: {path}")
print(f'üìä Profiling report saved: {path}')

# Display key stats
print('\n‚îÄ‚îÄ Key Statistics ‚îÄ‚îÄ')
print(f"  Rows          : {profile_clean['total_rows']:,}")
print(f"  Columns       : {profile_clean['total_columns']}")
print(f"  Duplicates    : {profile_clean['duplicate_rows']}")
print(f"  Price min     : ${profile_clean['resale_price_min']:,.0f}")
print(f"  Price max     : ${profile_clean['resale_price_max']:,.0f}")
print(f"  Price mean    : ${profile_clean['resale_price_mean']:,.0f}")
print(f"  Price median  : ${profile_clean['resale_price_median']:,.0f}")

## Stage 8 ‚Äî Transformation: Resale Identifier

Creates a synthetic `Resale Identifier` field encoding location, price context, and timing:

```
Format:  S + block_numeric(3) + avg_price_prefix(2) + month_num(2) + town_initial(1)
Example: S042450303A
```

In [None]:
def create_resale_identifier(df):
    """
    Build the Resale Identifier per specification:
      S + block_numeric(3) + avg_price_prefix(2) + month_num(2) + town_initial(1)

    block_numeric : Remove ALL non-digit characters, zero-pad to 3, truncate to 3.
    avg_price_prefix : First 2 digits of integer average resale_price
                       grouped by (year-month, town, flat_type).
    month_num     : Zero-padded 2-digit transaction month.
    town_initial  : First character of the town name.

    Intermediate columns (block_numeric, year_month) are dropped before returning.
    """
    df_copy = df.copy()
    if 'block' not in df_copy.columns:
        df_copy['block'] = '000'

    # Remove ALL non-digit chars, zero-pad to 3, truncate to exactly 3 digits
    df_copy['block_numeric'] = (
        df_copy['block'].astype(str)
        .str.replace(r'[^\d]', '', regex=True)
        .str.zfill(3)
        .str[:3]
    )

    df_copy['year_month'] = df_copy['month'].dt.to_period('M')
    avg_price = df_copy.groupby(['year_month', 'town', 'flat_type'])['resale_price'].transform('mean')

    df_copy['Resale Identifier'] = (
        'S' +
        df_copy['block_numeric'] +
        avg_price.astype(int).astype(str).str[:2] +
        df_copy['month'].dt.month.astype(str).str.zfill(2) +
        df_copy['town'].str[0]
    )

    # Drop intermediate helper columns so they don't pollute the output files
    df_copy.drop(columns=['block_numeric', 'year_month'], inplace=True)
    return df_copy


def dedup_by_identifier(df):
    """
    Second deduplication pass after Resale Identifier creation.
    Two rows can share an identifier if block, town, month are identical and
    their group average price rounds to the same 2-digit prefix.
    Keep the higher price; discard the lower.
    """
    df_sorted = df.sort_values('resale_price', ascending=False)
    df_clean  = df_sorted.drop_duplicates(subset=['Resale Identifier'], keep='first')
    df_dupes  = df_sorted.loc[~df_sorted.index.isin(df_clean.index)].copy()
    if not df_dupes.empty:
        df_dupes['failure_reason'] = 'duplicate_resale_identifier_lower_price'
    return df_clean, df_dupes


df_transformed = create_resale_identifier(df_cleaned_final)
df_transformed, df_id_dupes = dedup_by_identifier(df_transformed)

if not df_id_dupes.empty:
    failed_path = os.path.join(FAILED_OUT_DIR, 'hdb_resale_failed.csv')
    df_id_dupes.to_csv(failed_path, mode='a', header=not os.path.exists(failed_path), index=False)
    print(f'  ‚ö†Ô∏è  Identifier duplicates: {len(df_id_dupes):,} rows appended to failed dataset')

path = os.path.join(TRANSFORM_OUT_DIR, 'hdb_resale_transformed.csv')
df_transformed.to_csv(path, index=False)
print(f'üíæ Transformed dataset saved: {path}')
print(f'   Final rows: {len(df_transformed):,}')
print('\nSample Resale Identifiers:')
print(df_transformed[['town', 'flat_type', 'resale_price', 'Resale Identifier']].head(5).to_string(index=False))

## Stage 9 ‚Äî Hashing

Applies **SHA-256** hashing to the `Resale Identifier` field, producing `Resale Identifier Hashed`.

- Anonymises the synthetic key while preserving uniqueness
- Deterministic: same input always produces the same 64-character hex output

In [None]:
def hash_resale_identifier(df):
    df['Resale Identifier Hashed'] = df['Resale Identifier'].apply(
        lambda x: hashlib.sha256(str(x).encode()).hexdigest()
    )
    return df

df_hashed = hash_resale_identifier(df_transformed)

path = os.path.join(HASHED_OUT_DIR, 'hdb_resale_hashed.csv')
df_hashed.to_csv(path, index=False)
print(f'üíæ Hashed dataset saved: {path}')
print('\nSample hashes:')
print(df_hashed[['Resale Identifier','Resale Identifier Hashed']].head(3).to_string(index=False))

## ‚úÖ ETL Complete ‚Äî Final Summary

In [None]:
print('=' * 60)
print('‚úÖ HDB RESALE ETL PIPELINE COMPLETE')
print('=' * 60)
print(f'  Final rows     : {len(df_hashed):,}')
print(f'  Final columns  : {len(df_hashed.columns)}')
print(f'  Date range     : {df_hashed["month"].min().date()} ‚Üí {df_hashed["month"].max().date()}')
print()
print('Output files:')
outputs = [
    (RAW_OUT_DIR,       'hdb_resale_raw.csv'),
    (CLEANED_OUT_DIR,   'hdb_resale_cleaned.csv'),
    (TRANSFORM_OUT_DIR, 'hdb_resale_transformed.csv'),
    (HASHED_OUT_DIR,    'hdb_resale_hashed.csv'),
    (FAILED_OUT_DIR,    'hdb_resale_failed.csv'),
    (AUDIT_OUT_DIR,     'duplicates.csv'),
    (AUDIT_OUT_DIR,     'rule_violations.csv'),
    (AUDIT_OUT_DIR,     'anomalies.csv'),
    (PROFILE_OUT_DIR,   'profile_cleaned.csv'),
]
for folder, fname in outputs:
    path = os.path.join(folder, fname)
    status = '‚úì' if os.path.isfile(path) else '‚ö†Ô∏è '
    print(f'  {status} {path}')