# 01 - Data Processing Pipeline

In [1]:
from __future__ import annotations

import json
import math
import re
import time
from datetime import datetime
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder

NOTEBOOK_DIR = Path.cwd()
PROJECT_ROOT = NOTEBOOK_DIR.parent
OUTPUT_DIR = NOTEBOOK_DIR
CACHE_DIR = NOTEBOOK_DIR / "data" / "xpt001_tiles"
CACHE_DIR.mkdir(parents=True, exist_ok=True)

RAW_TRANSACTIONS_PATH = NOTEBOOK_DIR / "transactions.csv"
ENGLISH_PATH = NOTEBOOK_DIR / "transactions_en.csv"
XPT_POINTS_PATH = NOTEBOOK_DIR / "xpt_points.csv"

## 1. Load raw sale records

In [2]:
# load the raw Japanese transaction extract generated by 00 notebook.
if not RAW_TRANSACTIONS_PATH.exists():
    raise FileNotFoundError("transactions.csv not found in test_notebooks/. Run 00_api_examples.ipynb first.")

raw_df = pd.read_csv(
    RAW_TRANSACTIONS_PATH,
    dtype={"MunicipalityCode": str, "AreaLabel": str},
    low_memory=False,
)
raw_df["OriginalIndex"] = raw_df.index
raw_df["MunicipalityCode"] = raw_df["MunicipalityCode"].astype(str).str.zfill(5)
print(
    f"Loaded {len(raw_df):,} raw transactions spanning "
    f"{int(raw_df['YearParam'].min())}-{int(raw_df['YearParam'].max())}"
)
raw_df.head()

Loaded 485,093 raw transactions spanning 2005-2025


Unnamed: 0,PriceCategory,Type,Region,MunicipalityCode,Prefecture,Municipality,DistrictName,TradePrice,PricePerUnit,FloorPlan,...,Remarks,CityCode,AreaLabel,YearParam,QuarterParam,TradePriceValue,AreaSqM,PeriodKey,PricePerSqM,OriginalIndex
0,不動産取引価格情報,宅地(土地と建物),商業地,13101,東京都,千代田区,外神田,230000000,,,...,,13101,Chiyoda Ward,2005,3,230000000.0,90.0,2005-Q3,2555556.0,0
1,不動産取引価格情報,宅地(土地と建物),商業地,13101,東京都,千代田区,神田和泉町,120000000,,,...,,13101,Chiyoda Ward,2005,3,120000000.0,95.0,2005-Q3,1263158.0,1
2,不動産取引価格情報,宅地(土地),商業地,13101,東京都,千代田区,神田佐久間町,6800000000,14000000.0,,...,,13101,Chiyoda Ward,2005,3,6800000000.0,1600.0,2005-Q3,4250000.0,2
3,不動産取引価格情報,宅地(土地と建物),商業地,13101,東京都,千代田区,神田佐久間町,150000000,,,...,,13101,Chiyoda Ward,2005,3,150000000.0,120.0,2005-Q3,1250000.0,3
4,不動産取引価格情報,中古マンション等,,13101,東京都,千代田区,九段南,20000000,,１ＬＤＫ,...,,13101,Chiyoda Ward,2005,3,20000000.0,20.0,2005-Q3,1000000.0,4


In [3]:
raw_df.columns

Index(['PriceCategory', 'Type', 'Region', 'MunicipalityCode', 'Prefecture',
       'Municipality', 'DistrictName', 'TradePrice', 'PricePerUnit',
       'FloorPlan', 'Area', 'UnitPrice', 'LandShape', 'Frontage',
       'TotalFloorArea', 'BuildingYear', 'Structure', 'Use', 'Purpose',
       'Direction', 'Classification', 'Breadth', 'CityPlanning',
       'CoverageRatio', 'FloorAreaRatio', 'Period', 'Renovation', 'Remarks',
       'CityCode', 'AreaLabel', 'YearParam', 'QuarterParam', 'TradePriceValue',
       'AreaSqM', 'PeriodKey', 'PricePerSqM', 'OriginalIndex'],
      dtype='object')

## 2. Clean and standardize numeric fields

In [4]:
# helper functions reused across the cleaning pipeline.
PRICE_SPLIT_PATTERN = re.compile(r"-")
QUARTER_PATTERN = re.compile(r"(\d)(?:st|nd|rd|th) quarter (\d{4})")
ERA_OFFSET = {"Showa": 1925, "Heisei": 1988, "Reiwa": 2018}


def parse_price(value) -> float:
    if pd.isna(value) or value == "":
        return np.nan
    text = str(value).replace(",", "").strip()
    if text.isdigit():
        return float(text)
    if "-" in text:
        parts = [p for p in text.split("-") if p.strip().isdigit()]
        if parts:
            return sum(float(p) for p in parts) / len(parts)
    return np.nan


def parse_area(value) -> float:
    if pd.isna(value) or value == "":
        return np.nan
    text = str(value).replace(",", "").strip()
    try:
        return float(text)
    except ValueError:
        return np.nan


def parse_period(row) -> str:
    period_text = row.get("Period")
    year = int(row["YearParam"])
    quarter = int(row["QuarterParam"])
    if pd.notna(period_text):
        match = QUARTER_PATTERN.search(str(period_text))
        if match:
            return f"{match.group(2)}-Q{match.group(1)}"
    return f"{year}-Q{quarter}"


def parse_building_year(text) -> float:
    if pd.isna(text) or text == "":
        return np.nan
    text = str(text)
    match = re.search(r"(19|20)\d{2}", text)
    if match:
        return float(match.group())
    for era, offset in ERA_OFFSET.items():
        if era in text:
            digits = re.findall(r"\d+", text)
            if digits:
                return offset + float(digits[0])
    return np.nan

In [5]:
# clean and derive transaction-level metrics.
clean_df = raw_df.copy()
clean_df["TradePriceValue"] = clean_df["TradePrice"].apply(parse_price)
clean_df["AreaSqM"] = clean_df["Area"].apply(parse_area)
clean_df = clean_df.dropna(subset=["TradePriceValue", "AreaSqM", "YearParam", "QuarterParam"]).copy()
clean_df["YearParam"] = clean_df["YearParam"].astype(int)
clean_df["QuarterParam"] = clean_df["QuarterParam"].astype(int)
clean_df["PeriodKey"] = clean_df.apply(parse_period, axis=1)
clean_df["BuildingYearNum"] = clean_df["BuildingYear"].apply(parse_building_year)
clean_df["PricePerSqM"] = clean_df["TradePriceValue"] / clean_df["AreaSqM"]
clean_df = clean_df.dropna(subset=["PeriodKey"]).copy()
clean_df["SaleYear"] = clean_df["PeriodKey"].str.slice(0, 4).astype(int)
clean_df["BuildingAge"] = clean_df["SaleYear"] - clean_df["BuildingYearNum"]
clean_df.loc[clean_df["BuildingAge"] < 0, "BuildingAge"] = np.nan
clean_df["LogPrice"] = np.log(clean_df["TradePriceValue"])
clean_df["PeriodNum"] = clean_df["SaleYear"] * 4 + clean_df["QuarterParam"] - 1
print(f"{len(clean_df):,} transactions after cleaning ({len(clean_df)/len(raw_df):.1%} of input)")
clean_df.head()

485,093 transactions after cleaning (100.0% of input)


Unnamed: 0,PriceCategory,Type,Region,MunicipalityCode,Prefecture,Municipality,DistrictName,TradePrice,PricePerUnit,FloorPlan,...,TradePriceValue,AreaSqM,PeriodKey,PricePerSqM,OriginalIndex,BuildingYearNum,SaleYear,BuildingAge,LogPrice,PeriodNum
0,不動産取引価格情報,宅地(土地と建物),商業地,13101,東京都,千代田区,外神田,230000000,,,...,230000000.0,90.0,2005-Q3,2555556.0,0,1987.0,2005,18.0,19.25359,8022
1,不動産取引価格情報,宅地(土地と建物),商業地,13101,東京都,千代田区,神田和泉町,120000000,,,...,120000000.0,95.0,2005-Q3,1263158.0,1,1990.0,2005,15.0,18.603002,8022
2,不動産取引価格情報,宅地(土地),商業地,13101,東京都,千代田区,神田佐久間町,6800000000,14000000.0,,...,6800000000.0,1600.0,2005-Q3,4250000.0,2,,2005,,22.640188,8022
3,不動産取引価格情報,宅地(土地と建物),商業地,13101,東京都,千代田区,神田佐久間町,150000000,,,...,150000000.0,120.0,2005-Q3,1250000.0,3,1992.0,2005,13.0,18.826146,8022
4,不動産取引価格情報,中古マンション等,,13101,東京都,千代田区,九段南,20000000,,１ＬＤＫ,...,20000000.0,20.0,2005-Q3,1000000.0,4,2000.0,2005,5.0,16.811243,8022


## 3. Mesh enrichment via cached XPT001 tiles

In [6]:
# Step 5 - Bring in tile coordinates and construct 250m mesh IDs.
if not XPT_POINTS_PATH.exists():
    raise FileNotFoundError("xpt_points.csv not found. Provide the cache from 00 notebook.")

xpt_points_df = pd.read_csv(XPT_POINTS_PATH, dtype={"city_code": str}, low_memory=False)
xpt_points_df = xpt_points_df.rename(
    columns={
        "city_code": "MunicipalityCode",
        "city_name_ja": "Municipality",
        "prefecture_name_ja": "Prefecture",
        "district_name_ja": "DistrictName",
    }
)
for col in ["MunicipalityCode", "DistrictName"]:
    if col in xpt_points_df.columns:
        xpt_points_df[col] = xpt_points_df[col].astype(str)

xpt_points_df["MunicipalityCode"] = xpt_points_df["MunicipalityCode"].astype(str).str.zfill(5)

district_lookup = (
    xpt_points_df
    .dropna(subset=["Latitude", "Longitude"])
    .groupby(["MunicipalityCode", "DistrictName", "Year", "Quarter"], dropna=False)
    .agg(
        Latitude=("Latitude", "median"),
        Longitude=("Longitude", "median"),
        TileCount=("Latitude", "size"),
    )
    .reset_index()
)
print(f"Matched coordinates for {len(district_lookup):,} district-year-quarter combinations")

# Municipality-level fallback if a district name is blank or unmatched.
municipality_lookup = (
    district_lookup
    .groupby(["MunicipalityCode", "Year", "Quarter"], dropna=False)
    .agg(
        MunicipalityLat=("Latitude", "median"),
        MunicipalityLon=("Longitude", "median"),
        MunicipalityTileCount=("TileCount", "sum"),
    )
    .reset_index()
)

enriched_df = clean_df.merge(
    district_lookup,
    left_on=["MunicipalityCode", "DistrictName", "YearParam", "QuarterParam"],
    right_on=["MunicipalityCode", "DistrictName", "Year", "Quarter"],
    how="left",
)

enriched_df = enriched_df.merge(
    municipality_lookup,
    on=["MunicipalityCode", "Year", "Quarter"],
    how="left"
)

enriched_df.rename(columns={"Latitude": "RepLat", "Longitude": "RepLon"}, inplace=True)

enriched_df["Latitude"] = enriched_df["RepLat"].fillna(enriched_df["MunicipalityLat"])
enriched_df["Longitude"] = enriched_df["RepLon"].fillna(enriched_df["MunicipalityLon"])
enriched_df["TileCount"] = enriched_df["TileCount"].fillna(enriched_df["MunicipalityTileCount"])
enriched_df.drop(
    columns=["Year", "Quarter", "RepLat", "RepLon", "MunicipalityLat", "MunicipalityLon", "MunicipalityTileCount"],
    inplace=True,
    errors="ignore"
)

def meshcode_250m(lat: float, lon: float) -> str:
    lat1 = int(lat * 1.5)
    lon1 = int(lon - 100)
    lat2 = int((lat * 1.5 - lat1) * 8)
    lon2 = int((lon - 100 - lon1) * 8)
    lat3 = int(((lat * 1.5 - lat1) * 8 - lat2) * 10)
    lon3 = int(((lon - 100 - lon1) * 8 - lon2) * 10)
    return f"{lat1:02d}{lon1:02d}{lat2}{lon2}{lat3}{lon3}"

has_coord = enriched_df["Latitude"].notna() & enriched_df["Longitude"].notna()
enriched_df.loc[has_coord, "Mesh250m"] = enriched_df.loc[has_coord, ["Latitude", "Longitude"]].apply(
    lambda row: meshcode_250m(row["Latitude"], row["Longitude"]), axis=1
)

if "AreaLabel" in enriched_df.columns:
    fallback_labels = (
        enriched_df["AreaLabel"].astype("string").str.strip().where(lambda s: s.str.len() > 0)
    )

    def label_to_placeholder(label):
        if label is None or pd.isna(label):
            return None
        slug = re.sub(r"[^0-9A-Za-z]+", "_", str(label)).strip("_")
        return f"Ward::{slug}" if slug else None

    fallback_mesh = fallback_labels.map(label_to_placeholder)
    enriched_df["Mesh250m"] = enriched_df["Mesh250m"].fillna(fallback_mesh)

mesh_codes = (
    enriched_df["Mesh250m"].astype("string").str.strip()
)
valid_mask = mesh_codes.str.fullmatch(r"\d{8}") | mesh_codes.str.startswith("Ward::")
enriched_df["Mesh250m"] = mesh_codes.where(valid_mask)

enriched_df = enriched_df.dropna(subset=["Mesh250m"]).copy()
enriched_df["Mesh250m"] = enriched_df["Mesh250m"].astype(str)
invalid_values = enriched_df.loc[
    ~(
        enriched_df["Mesh250m"].str.fullmatch(r"\d{8}")
        | enriched_df["Mesh250m"].str.startswith("Ward::")
    ),
    "Mesh250m",
].unique()
if len(invalid_values) > 0:
    raise ValueError(f"Unexpected mesh ids after enrichment: {invalid_values[:5]}")

numeric_mesh_rows = int(enriched_df["Mesh250m"].str.fullmatch(r"\d{8}").sum())
placeholder_rows = int((enriched_df["Mesh250m"].str.startswith("Ward::")).sum())
print(f"{numeric_mesh_rows:,} rows with numeric meshes; {placeholder_rows:,} ward placeholders")

TRANSACTIONS_WITH_MESH = OUTPUT_DIR / "transactions_with_mesh.parquet"
enriched_df.to_parquet(TRANSACTIONS_WITH_MESH, index=False)
TRANSACTIONS_WITH_MESH

Matched coordinates for 167,473 district-year-quarter combinations
484,550 rows with numeric meshes; 543 ward placeholders


WindowsPath('c:/Users/ignit/OneDrive/Desktop/Study/GeorgiaTech/CSE6242 - Fall 2025/Project/test_notebooks/transactions_with_mesh.parquet')

In [7]:
xpt_points_df

Unnamed: 0,AreaSource,Year,Quarter,Longitude,Latitude,point_in_time_name_ja,price_information_category_name_ja,Prefecture,MunicipalityCode,Municipality,...,u_building_total_floor_area_ja,u_construction_year_ja,front_road_azimuth_name_ja,u_front_road_width_ja,front_road_type_name_ja,land_use_name_ja,u_building_coverage_ratio_ja,u_floor_area_ratio_ja,future_use_purpose_name_ja,remark_renovation_name_ja
0,Tokyo,2005,3,139.564712,35.917042,2005年第3四半期,不動産取引価格情報,埼玉県,11101,さいたま市西区,...,95㎡,2003年,北西,4.0m,私道,第１種低層住居専用地域,50%,100%,,
1,Tokyo,2005,3,139.564712,35.917042,2005年第3四半期,不動産取引価格情報,埼玉県,11101,さいたま市西区,...,,,東,4.0m,私道,第１種中高層住居専用地域,60%,200%,,
2,Tokyo,2005,3,139.564712,35.917042,2005年第3四半期,不動産取引価格情報,埼玉県,11101,さいたま市西区,...,"2,000㎡以上",1986年,南,10.0m,県道,第２種住居地域,60%,200%,,
3,Tokyo,2005,3,139.564712,35.917042,2005年第3四半期,不動産取引価格情報,埼玉県,11101,さいたま市西区,...,105㎡,1987年,北,10.0m,市道,第１種中高層住居専用地域,60%,200%,,
4,Tokyo,2005,3,139.564712,35.917042,2005年第3四半期,不動産取引価格情報,埼玉県,11101,さいたま市西区,...,110㎡,2005年,西,4.0m,市道,第１種住居地域,60%,200%,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
831165,Sendai,2025,1,141.022493,38.317636,2025年第1四半期,不動産取引価格情報,宮城県,04203,塩竈市,...,,,西,4.0m,道路,第１種低層住居専用地域,50%,80%,その他,
831166,Sendai,2025,1,141.022493,38.317636,2025年第1四半期,不動産取引価格情報,宮城県,04203,塩竈市,...,870㎡,1979年,南,24.9m,市道,商業地域,80%,500%,事務所,
831167,Sendai,2025,1,141.022493,38.317636,2025年第1四半期,不動産取引価格情報,宮城県,04203,塩竈市,...,,,南西,1.8m,私道,第１種住居地域,60%,200%,その他,
831168,Sendai,2025,1,141.022493,38.317636,2025年第1四半期,不動産取引価格情報,宮城県,04203,塩竈市,...,310㎡,,東,10.0m,市道,商業地域,80%,500%,その他,


In [8]:

# start from mesh-enriched transactions before layering English labels.
main_features = enriched_df.copy()
# Track whether mesh ids come from real coordinates or ward-level placeholders.
main_features["Mesh250m"] = main_features["Mesh250m"].astype("string").str.strip()
main_features["MeshSource"] = np.where(
    main_features["Mesh250m"].str.fullmatch(r"\d{8}"),
    "observed_mesh",
    "ward_placeholder",
)
main_features["MunicipalityCode"] = main_features["MunicipalityCode"].astype(str).str.zfill(5)

# Attach the optional English export when available.
ENGLISH_PATH = NOTEBOOK_DIR / "transactions_en.csv"
if ENGLISH_PATH.exists():
    trans_en = pd.read_csv(ENGLISH_PATH, dtype={"MunicipalityCode": str}, low_memory=False)
    trans_en["OriginalIndex"] = trans_en.index
    english_pairs = [
        ("Municipality", "Municipality_en"),
        ("Prefecture", "Prefecture_en"),
        ("DistrictName", "DistrictName_en"),
        ("Type", "Type_en"),
        ("Structure", "Structure_en"),
        ("Use", "Use_en"),
        ("CityPlanning", "CityPlanning_en"),
    ]
    available = [src for src, _ in english_pairs if src in trans_en.columns]
    rename_map = {src: dst for src, dst in english_pairs if src in available}
    english_subset = trans_en[available + ["OriginalIndex"]].rename(columns=rename_map)
    main_features = main_features.merge(english_subset, on="OriginalIndex", how="left")
else:
    print("transactions_en.csv not found; skipping English label enrichment.")


# helper to convert PeriodKey into a sortable integer timeline.
def period_key_to_index(period: str) -> int:
    year, quarter = period.split("-Q")
    return int(year) * 4 + int(quarter) - 1


# Aggregate transactions to mesh-quarter summaries (counts, medians, dispersion).
# Limit mesh-quarter panel to observations with true 250m mesh ids.
numeric_mesh_mask = main_features["Mesh250m"].str.fullmatch(r"\d{8}").fillna(False)
mesh_quarter = (
    main_features[numeric_mesh_mask]
    .groupby(["Mesh250m", "PeriodKey"], dropna=False)
    .agg(
        mesh_transaction_count=("PricePerSqM", "size"),
        mesh_median_ppsqm=("PricePerSqM", "median"),
        mesh_mean_ppsqm=("PricePerSqM", "mean"),
        mesh_price_std=("PricePerSqM", "std"),
        mesh_price_iqr=("PricePerSqM", lambda s: s.quantile(0.75) - s.quantile(0.25)),
        mesh_avg_age=("BuildingAge", "mean"),
        mesh_avg_area=("AreaSqM", "mean"),
    )
    .reset_index()
)
mesh_quarter["PeriodNum"] = mesh_quarter["PeriodKey"].apply(period_key_to_index)

mesh_quarter_sorted = mesh_quarter.sort_values(["Mesh250m", "PeriodNum"]).reset_index(drop=True)
lag_cols = ["mesh_transaction_count", "mesh_median_ppsqm", "mesh_avg_age"]
# Derive lagged metrics within each mesh to support forecasting features.
mesh_lag1 = mesh_quarter_sorted.groupby("Mesh250m")[lag_cols].shift(1)
mesh_lag1.columns = [f"mesh_lag1_{col}" for col in lag_cols]
mesh_lagged = pd.concat(
    [mesh_quarter_sorted[["Mesh250m", "PeriodKey"]], mesh_lag1], axis=1
)
main_features = main_features.merge(mesh_lagged, on=["Mesh250m", "PeriodKey"], how="left")

# Label-encode categorical columns used downstream.
encoders = {
    "Municipality": "Municipality_encoded",
    "PeriodKey": "Period_encoded",
    "Type": "Type_encoded",
    "Structure": "Structure_encoded",
    "Use": "Use_encoded",
    "CityPlanning": "CityPlanning_encoded",
}

for source_col, encoded_col in encoders.items():
    if source_col in main_features.columns:
        encoder = LabelEncoder()
        main_features[encoded_col] = encoder.fit_transform(main_features[source_col].fillna("Unknown"))

main_features.head()

Unnamed: 0,PriceCategory,Type,Region,MunicipalityCode,Prefecture,Municipality,DistrictName,TradePrice,PricePerUnit,FloorPlan,...,CityPlanning_en,mesh_lag1_mesh_transaction_count,mesh_lag1_mesh_median_ppsqm,mesh_lag1_mesh_avg_age,Municipality_encoded,Period_encoded,Type_encoded,Structure_encoded,Use_encoded,CityPlanning_encoded
0,不動産取引価格情報,宅地(土地と建物),商業地,13101,東京都,千代田区,外神田,230000000,,,...,Commercial Zone,,,,9,0,2,8,2,1
1,不動産取引価格情報,宅地(土地と建物),商業地,13101,東京都,千代田区,神田和泉町,120000000,,,...,Commercial Zone,,,,9,0,2,23,47,1
2,不動産取引価格情報,宅地(土地),商業地,13101,東京都,千代田区,神田佐久間町,6800000000,14000000.0,,...,Commercial Zone,,,,9,0,1,0,0,1
3,不動産取引価格情報,宅地(土地と建物),商業地,13101,東京都,千代田区,神田佐久間町,150000000,,,...,Commercial Zone,,,,9,0,2,8,31,1
4,不動産取引価格情報,中古マンション等,,13101,東京都,千代田区,九段南,20000000,,１ＬＤＫ,...,Commercial Zone,,,,9,0,0,14,29,1


In [9]:
main_features[["MunicipalityCode", 'Latitude', 'Longitude', 'Mesh250m', 'LogPrice']]

Unnamed: 0,MunicipalityCode,Latitude,Longitude,Mesh250m,LogPrice
0,13101,35.698362,139.773484,53394631,19.253590
1,13101,35.698362,139.773484,53394631,18.603002
2,13101,35.698362,139.773484,53394631,22.640188
3,13101,35.698362,139.773484,53394631,18.826146
4,13101,35.691487,139.737457,53394528,16.811243
...,...,...,...,...,...
485088,04105,38.322796,140.880700,57403780,16.906553
485089,04105,38.322796,140.880700,57403780,17.727534
485090,04105,38.317872,140.882396,57403780,16.213406
485091,04105,38.317872,140.882396,57403780,16.705882


In [10]:
MAIN_PARQUET = OUTPUT_DIR / "main_features.parquet"
MAIN_CSV = OUTPUT_DIR / "main_features.csv"
MESH_CSV = OUTPUT_DIR / "mesh_quarter_features.csv"
MESH_PARQUET = OUTPUT_DIR / "mesh_quarter_features.parquet"
DATA_DICTIONARY = OUTPUT_DIR / "data_dictionary.md"

main_features.to_parquet(MAIN_PARQUET, index=False)
main_features.to_csv(MAIN_CSV, index=False)
mesh_quarter.to_csv(MESH_CSV, index=False)
mesh_quarter.to_parquet(MESH_PARQUET, index=False)