# HDB Technical Test for Senior Data Engineer — Code Execution

End-to-end pipeline: Download → Merge → DQC → Separate valid/failed → Transform

All results are saved to the corresponding `data/` subfolders.

## 0. Setup

In [1]:
import sys
import os
sys.path.insert(0, '..')

import requests
import pandas as pd
import hashlib
from pathlib import Path
from datetime import datetime
from dateutil.relativedelta import relativedelta

print("Working directory:", os.getcwd())

Working directory: /Users/jxy/Desktop/500k/HDB/notebooks


## 1. Download Data

### Define Variables

In [182]:
API_BASE_URL = "https://data.gov.sg/api/action/datastore_search"
API_KEY = 'v2:bb4763c933c4126263a5dd2c70c0a8918fddbdbae474b1129924057961fc4e4a:sNbOsXx5fkHYP5OmBDUWSuesAdWGb6-M'
headers = {"x-api-key": API_KEY} if API_KEY else {}

DATASET_ID = {
    "1990_1999":    "d_ebc5ab87086db484f88045b47411ebc5",
    "2000_2012feb": "d_43f493c6c50d54243cc1eab0df142d6a",
    "2012mar_2014": "d_2d5ff9ea31397b66239f245f57751537",
    "2015_2016":    "d_ea9ed51da2787afaf8e51f827c304208",
    "2017_onwards": "d_8b84c4ee58e3cfc0ece0d773c8ca6abc",
}

RAW_DATA_DIR    = "../data/raw"
STAGE_DATA_DIR  = "../data/stage"
PROD_DATA_DIR   = "../data/prod"
FAILED_DATA_DIR = "../data/failed"

### Define Download Function

In [188]:
import time

def download(resource_id, limit=10000, sleep_secs=12):
    """
    Fetch all records for a resource_id via paginated API calls.
    """
    records = []
    offset  = 0

    while True:
        url = f"{API_BASE_URL}?resource_id={resource_id}&limit={limit}&offset={offset}"

        data  = requests.get(url, headers=headers, timeout=60).json()
        batch = data["result"]["records"]
        total = data["result"].get("total", 0)

        if not batch:
            break

        records.extend(batch)
        offset += len(batch)
        print(f"  fetched {offset:>7,} / {total:,}", end="\r")

        if offset >= total:
            break

        time.sleep(sleep_secs)

    return pd.DataFrame(records)

### Download and Save data into raw folder

In [192]:
raw_dfs = {}
for name, resource_id in DATASET_ID.items():
    print(f"Downloading {name}...")
    df = download(resource_id)
    raw_dfs[name] = df

    out_path = f"{RAW_DATA_DIR}/{name}.csv"
    df.to_csv(out_path, index=False)
    print(f"  done: {len(df):>7,} rows  → {out_path}")

Downloading 1990_1999...
  done: 287,196 rows  → ../data/raw/1990_1999.csv
Downloading 2000_2012feb...
  done: 369,651 rows  → ../data/raw/2000_2012feb.csv
Downloading 2012mar_2014...
  done:  52,203 rows  → ../data/raw/2012mar_2014.csv
Downloading 2015_2016...
  done:  37,153 rows  → ../data/raw/2015_2016.csv
Downloading 2017_onwards...
  done: 225,421 rows  → ../data/raw/2017_onwards.csv


## 2. Merge

### Some Data Cleaning

In [239]:
# drop _id column
for name in raw_dfs:
    if "_id" in raw_dfs[name].columns:
        raw_dfs[name] = raw_dfs[name].drop(columns="_id")
        print(f"Dropped _id from {name}")

# drop remaining_lease as it will be recalculated uniformly in transform step
for name in ["2015_2016", "2017_onwards"]:
    if "remaining_lease" in raw_dfs[name].columns:
        raw_dfs[name] = raw_dfs[name].drop(columns=["remaining_lease"])
        print(f"Dropped remaining_lease from {name}")

Dropped _id from 1990_1999
Dropped _id from 2000_2012feb
Dropped _id from 2012mar_2014
Dropped _id from 2015_2016
Dropped _id from 2017_onwards


In [241]:
# Verify all datasets now have the same columns
for name, rdf in raw_dfs.items():
    print(f"{name:15s}: {list(rdf.columns)}")

1990_1999      : ['month', 'town', 'flat_type', 'block', 'street_name', 'storey_range', 'floor_area_sqm', 'flat_model', 'lease_commence_date', 'resale_price']
2000_2012feb   : ['month', 'town', 'flat_type', 'block', 'street_name', 'storey_range', 'floor_area_sqm', 'flat_model', 'lease_commence_date', 'resale_price']
2012mar_2014   : ['month', 'town', 'flat_type', 'block', 'street_name', 'storey_range', 'floor_area_sqm', 'flat_model', 'lease_commence_date', 'resale_price']
2015_2016      : ['month', 'town', 'flat_type', 'block', 'street_name', 'storey_range', 'floor_area_sqm', 'flat_model', 'lease_commence_date', 'resale_price']
2017_onwards   : ['month', 'town', 'flat_type', 'block', 'street_name', 'storey_range', 'floor_area_sqm', 'flat_model', 'lease_commence_date', 'resale_price']


### Merge 5 dataset and Save into raw folder

In [243]:
df = pd.concat(raw_dfs.values(), ignore_index=True, sort=False)
print(f"Merged: {len(df):,} rows, {df.shape[1]} columns")

merged_path = f"{RAW_DATA_DIR}/merged_raw.csv"
df.to_csv(merged_path, index=False)
print(f"Saved → {merged_path}")

Merged: 971,624 rows, 10 columns
Saved → ../data/raw/merged_raw.csv


In [245]:
df.head()

Unnamed: 0,month,town,flat_type,block,street_name,storey_range,floor_area_sqm,flat_model,lease_commence_date,resale_price
0,1990-01,ANG MO KIO,1 ROOM,309,ANG MO KIO AVE 1,10 TO 12,31,IMPROVED,1977,9000
1,1990-01,ANG MO KIO,1 ROOM,309,ANG MO KIO AVE 1,04 TO 06,31,IMPROVED,1977,6000
2,1990-01,ANG MO KIO,1 ROOM,309,ANG MO KIO AVE 1,10 TO 12,31,IMPROVED,1977,8000
3,1990-01,ANG MO KIO,1 ROOM,309,ANG MO KIO AVE 1,07 TO 09,31,IMPROVED,1977,6000
4,1990-01,ANG MO KIO,3 ROOM,216,ANG MO KIO AVE 1,04 TO 06,73,NEW GENERATION,1976,47200


## 3. DQC — same checks as the DAG

Each check function returns a boolean mask (True = row failed).
We also write a 0/1 result file per check to `data/stage/dqc_results/` — exactly what the production DAG does, so `separate_valid_failed()` can consume them.

### Define DQC Config

In [None]:
DQ_CHECKS = {
    "null": [
        "month", "town", "flat_type", "block", "street_name",
        "storey_range", "floor_area", "flat_model",
        "lease_commence_date", "resale_price",
    ],
    "categorical": {
        "town": {"allowed_values": [
            "ANG MO KIO", "BEDOK", "BISHAN", "BUKIT BATOK", "BUKIT MERAH",
            "BUKIT PANJANG", "BUKIT TIMAH", "CENTRAL AREA", "CHOA CHU KANG",
            "CLEMENTI", "GEYLANG", "HOUGANG", "JURONG EAST", "JURONG WEST",
            "KALLANG/WHAMPOA", "LIM CHU KANG", "MARINE PARADE", "PASIR RIS",
            "QUEENSTOWN", "SEMBAWANG", "SENGKANG", "SERANGOON", "TAMPINES",
            "TOA PAYOH", "WOODLANDS", "YISHUN",
        ]},
        "flat_type": {"allowed_values": [
            "1 ROOM", "2 ROOM", "3 ROOM", "4 ROOM", "5 ROOM",
            "EXECUTIVE", "MULTI-GENERATION",
        ]},
        "flat_model": {"allowed_values": [
            "2-ROOM", "APARTMENT", "IMPROVED", "IMPROVED-MAISONETTE",
            "MAISONETTE", "MODEL A", "MODEL A-MAISONETTE", "MULTI GENERATION",
            "NEW GENERATION", "PREMIUM APARTMENT", "SIMPLIFIED",
            "STANDARD", "TERRACE",
        ]},
    },
    "string_format": {
        "storey_range": {"pattern": r"^\d{2} TO \d{2}$"},
    },
    "date_format": {
        "month":               {"fmt": "%Y-%m"},
        "lease_commence_date": {"fmt": "%Y"},
    },
}

DUPLICATE_CHECK = {"key_columns": None}

RESALE_PRICE_OUTLIER_CHECK = {
    "column":        "resale_price",
    "threshold_pct": 0.20,
    "group_by":      ["month", "flat_type", "block", "street_name", "storey_range", "floor_area"],
}

### Define DQC Functions

In [None]:
# Check functions — mirrors data_operations/validate.py

def check_null(df, column):
    return df[column].isna()

def check_categorical(df, column, allowed_values):
    return ~df[column].isin(allowed_values)

def check_string_format(df, column, pattern):
    return ~df[column].astype(str).str.match(pattern, na=False)

def check_date_format(df, column, fmt):
    return pd.to_datetime(df[column].astype(str), format=fmt, errors="coerce").isna()

def check_duplicates(df, key_columns=None):
    if key_columns is None:
        key_columns = [c for c in df.columns if c != "resale_price"]
    df_sorted = df.sort_values("resale_price", ascending=False)
    keep_mask = ~df_sorted.duplicated(subset=key_columns, keep="first")
    return ~keep_mask.sort_index()

def check_resale_price_outlier(df, column, threshold_pct, group_by):
    group_mean = df.groupby(group_by)[column].transform("mean")
    return ~df[column].between(group_mean * (1 - threshold_pct), group_mean * (1 + threshold_pct))

print("Check functions defined.")

In [None]:
# DQC Group 1 — null, categorical, string_format, date_format
# Each check saves its 0/1 result to dqc_results/ AND accumulates into fail_sum

df["fail_sum"] = 0

for check_type, check_config in DQ_CHECKS.items():
    items = [(col, {}) for col in check_config] if isinstance(check_config, list) else check_config.items()
    for column, params in items:
        if check_type == "null":
            mask = check_null(df, column)
        elif check_type == "categorical":
            mask = check_categorical(df, column, **params)
        elif check_type == "string_format":
            mask = check_string_format(df, column, **params)
        elif check_type == "date_format":
            mask = check_date_format(df, column, **params)

        # Save result file (mirrors production DAG)
        result_file = f"{DQC_RESULTS_DIR}/{check_type}__{column}.csv"
        mask.astype(int).to_csv(result_file, index=True, header=True)

        fails = mask.sum()
        df["fail_sum"] += mask.astype(int)
        print(f"{check_type:15s} | {column:25s} | {fails:6,} fails")

In [None]:
# DQC Group 2 — duplicates
mask = check_duplicates(df, **DUPLICATE_CHECK)

mask.astype(int).to_csv(f"{DQC_RESULTS_DIR}/duplicates.csv", index=True, header=True)

dup_fails = mask.sum()
df["fail_sum"] += mask.astype(int)
print(f"Duplicates: {dup_fails:,} rows flagged")

In [None]:
# DQC Group 3 — resale price outlier
df["resale_price"] = pd.to_numeric(df["resale_price"], errors="coerce")

mask = check_resale_price_outlier(df, **RESALE_PRICE_OUTLIER_CHECK)

mask.astype(int).to_csv(f"{DQC_RESULTS_DIR}/resale_price_outlier.csv", index=True, header=True)

outlier_fails = mask.sum()
df["fail_sum"] += mask.astype(int)
print(f"Price outliers: {outlier_fails:,} rows flagged")

In [None]:
# DQC summary
print(f"Total rows:      {len(df):,}")
print(f"Rows with fails: {(df['fail_sum'] > 0).sum():,}")
print(f"\nfail_sum distribution:")
print(df["fail_sum"].value_counts().sort_index())

## 4. Separate Valid / Non-valid

Rows with `fail_sum > 0` failed at least one check.
Saved to `data/stage/validated.csv` and `data/failed/non_valid_records.csv`.

In [None]:
df_valid     = df[df["fail_sum"] == 0].drop(columns=["fail_sum"]).reset_index(drop=True)
df_non_valid = df[df["fail_sum"] > 0].reset_index(drop=True)

validated_path  = f"{STAGE_DATA_DIR}/validated.csv"
non_valid_path  = f"{FAILED_DATA_DIR}/non_valid_records.csv"

df_valid.to_csv(validated_path, index=False)
df_non_valid.to_csv(non_valid_path, index=False)

print(f"Valid:     {len(df_valid):,}  → {validated_path}")
print(f"Non-valid: {len(df_non_valid):,}  → {non_valid_path}")

df_valid.head()

In [None]:
# Inspect non-valid rows
print("Sample non-valid rows:")
df_non_valid.head(20)

## 5. Transform

Three transformations applied to valid rows only:
1. **Remaining lease** — years + months left on the 99-year HDB lease
2. **Resale identifier** — a short coded ID per transaction
3. **Hash identifier** — SHA-256 hash of the resale identifier

Results saved to `data/prod/transformed.csv` (with identifier, without hash) and `data/prod/hashed.csv` (with hash, without identifier).

In [None]:
# Transformation 1: Remaining lease
reference_date = datetime.now()

df_valid["remaining_lease"] = df_valid["lease_commence_date"].apply(
    lambda y: "{} years {} months".format(
        max(relativedelta(datetime(int(y), 1, 1) + relativedelta(years=99), reference_date).years, 0),
        max(relativedelta(datetime(int(y), 1, 1) + relativedelta(years=99), reference_date).months, 0),
    )
)

print("Sample remaining_lease values:")
df_valid[["lease_commence_date", "remaining_lease"]].drop_duplicates().sort_values("lease_commence_date").head(10)

In [None]:
# Transformation 2: Resale identifier
# Format: S + block_digits(3) + avg_price_digits(2) + month_digits(2) + town_initial
df_valid["resale_price"] = pd.to_numeric(df_valid["resale_price"])

avg = df_valid.groupby(["month", "town", "flat_type"])["resale_price"].mean().rename("avg_price")
df_valid = df_valid.join(avg, on=["month", "town", "flat_type"])

block_digits = (
    df_valid["block"].astype(str)
    .str.replace(r"\D", "", regex=True)
    .str[:3].str.zfill(3)
)
price_digits = df_valid["avg_price"].astype(int).astype(str).str[:2].str.zfill(2)
month_digits = pd.to_datetime(df_valid["month"], format="%Y-%m").dt.strftime("%m")
town_char    = df_valid["town"].str.strip().str[0].str.upper()

df_valid["resale_identifier"] = "S" + block_digits + price_digits + month_digits + town_char
df_valid = df_valid.drop(columns=["avg_price"])

print(f"Unique identifiers: {df_valid['resale_identifier'].nunique():,} / {len(df_valid):,} rows")
df_valid[["block", "month", "town", "resale_price", "resale_identifier"]].head()

In [None]:
# Transformation 3: Hash identifier
df_valid["resale_identifier_hash"] = df_valid["resale_identifier"].apply(
    lambda x: hashlib.sha256(x.encode()).hexdigest()
)

print(f"Unique hashes: {df_valid['resale_identifier_hash'].nunique():,}")
df_valid[["resale_identifier", "resale_identifier_hash"]].head()

In [None]:
# Save final outputs
df_transformed = df_valid.drop(columns=["resale_identifier_hash"])
df_hashed      = df_valid.drop(columns=["resale_identifier"])

transformed_path = f"{PROD_DATA_DIR}/transformed.csv"
hashed_path      = f"{PROD_DATA_DIR}/hashed.csv"

df_transformed.to_csv(transformed_path, index=False)
df_hashed.to_csv(hashed_path, index=False)

print(f"transformed: {df_transformed.shape}  → {transformed_path}")
print(f"hashed:      {df_hashed.shape}  → {hashed_path}")