# 特徴量エンジニアリング

## Library Import

In [1]:
# データの取り扱いに関するライブラリ
import numpy as np # 高速計算
import pandas as pd # 表データの扱い

import datetime as dt

import warnings
warnings.filterwarnings('ignore')

In [2]:
# 自身がファイルを格納したディレクトリを指定
intermediate_path = '../output/intermediate_file/'

# スクリプトのバージョン指定
preprocessing_ver = 4
geo_ver = 1
fe_ver = 3

## File Import

In [3]:
train_df_fe = pd.read_parquet(f'{intermediate_path}train_df_preprocessed_v{preprocessing_ver}.parquet')
test_df_fe = pd.read_parquet(f'{intermediate_path}test_df_preprocessed_v{preprocessing_ver}.parquet')

fe_cols = test_df_fe.columns.to_list()

In [4]:
date_col = 'target_ym'
target_col = 'money_room'

## 国土数値情報と結合

In [5]:
train_df_geo = pd.read_parquet(f'{intermediate_path}train_df_geo_v{geo_ver}.parquet')
test_df_geo = pd.read_parquet(f'{intermediate_path}test_df_geo_v{geo_ver}.parquet')

In [6]:
pkey_cols = ['target_ym', 'building_id', 'unit_id']

train_df_fe = train_df_fe.merge(train_df_geo, on=pkey_cols)
test_df_fe = test_df_fe.merge(test_df_geo, on=pkey_cols)

## 都道府県・市区町村情報のTE

In [7]:
# 作成される特徴量：'Prefecture name_te', 'City/town/village name_te'
adress_cols = ['Prefecture name', 'City/town/village name']

global_mean = train_df_fe[target_col].mean()

for col in adress_cols:
    # Step1: train でカテゴリごとの平均を計算
    mapping = train_df_fe.groupby(col)[target_col].mean()

    # Step2: train に map を適用
    train_df_fe[col + '_te'] = np.log1p(train_df_fe[col].map(mapping))

    # Step3: test にも map を適用（未知カテゴリは global_mean）
    test_df_fe[col + '_te'] = np.log1p(test_df_fe[col].map(mapping).fillna(global_mean))

## 面積比

In [8]:
# 作成される特徴量：'area_ratio'
train_df_fe['area_ratio'] = train_df_fe['senyu_area'] / train_df_fe['kukaku_area']
test_df_fe['area_ratio'] = test_df_fe['senyu_area'] / test_df_fe['kukaku_area']

## 相対階数

In [9]:
# 作成される特徴量：'relative_floor'
train_df_fe['relative_floor'] = train_df_fe['room_floor'] / train_df_fe['floor_count']
test_df_fe['relative_floor']  = test_df_fe['room_floor'] / test_df_fe['floor_count']

## 密度

In [10]:
# 作成される特徴量：'unit_land_density', 'area_per_room'
for df in [train_df_fe, test_df_fe]:
    # 2) 敷地あたり専有面積密度: 専有面積 / 区画面積
    df['unit_land_density'] = df['senyu_area'] / df['kukaku_area']
    df.loc[df['kukaku_area'] <= 0, 'unit_land_density'] = np.nan

    # 3) 面積 / 部屋数: 1部屋あたり専有面積
    df['area_per_room'] = df['senyu_area'] / df['madori_number_all']
    df.loc[df['madori_number_all'] <= 0, 'area_per_room'] = np.nan

## 豪邸検出

In [11]:
# 作成される特徴量：'land_building_ratio'
for df in [train_df_fe, test_df_fe]:
    df['land_building_ratio'] = df['kukaku_area'] / df['nobeyuka_area']
    df.loc[df['nobeyuka_area'] <= 0, 'land_building_ratio'] = np.nan

## 面積と築年の交互作用

In [12]:
# 作成される特徴量：'senyu_area_x_built_diff', 'area_per_room_x_built_diff'
for df in [train_df_fe, test_df_fe]:
    # 2) 専有面積 × 築年数
    df['senyu_area_x_built_diff'] = df['senyu_area'] * df['built_diff']

    # 3) 1部屋あたり面積 × 築年数
    #   → 同じ築年数でも「広くてゆとりのある間取り」のプレミアムを表現
    df['area_per_room_x_built_diff'] = df['area_per_room'] * df['built_diff']

## building_idごとの統合特徴量

In [13]:
# 作成される特徴量：'building_senyu_area_median', 'building_room_floor_max', 'building_unit_count'
len_train = len(train_df_fe)

# --- train + test を結合 ---
combined_df = pd.concat([train_df_fe, test_df_fe], ignore_index=True)

# --- 1) building_id ごとの median(senyu_area) ---
building_senyu_area_median = (
    combined_df.groupby('building_id')['senyu_area']
               .median()
               .rename('building_senyu_area_median')
)

# --- 2) building_id ごとの max(room_floor) ---
building_room_floor_max = (
    combined_df.groupby('building_id')['room_floor']
               .max()
               .rename('building_room_floor_max')
)

# --- 3) building_id ごとの unit_count（件数） ---
building_unit_count = (
    combined_df.groupby('building_id')['unit_id']  # unit_id がなければ建物内 index をカウントでもOK
               .count()
               .rename('building_unit_count')
)

# --- まとめて結合 ---
combined_df = combined_df.join(building_senyu_area_median, on='building_id')
combined_df = combined_df.join(building_room_floor_max,   on='building_id')
combined_df = combined_df.join(building_unit_count,       on='building_id')

# --- 再び train / test に分割 ---
train_df_fe = combined_df.iloc[:len_train].copy()  # 元の train 行数を使う
test_df_fe  = combined_df.iloc[len_train:].copy()


## 近傍価格特徴量

In [14]:
from sklearn.neighbors import BallTree

def add_multi_radius_neighbor_features(
    train_df,
    test_df,
    target_col='money_room',
    lat_col='lat',
    lon_col='lon',
    building_id_col='building_id',
    building_category_col='building_category',
    radius_list_m=[300, 500, 1000, 2000]
):
    """
    距離ごと（300m, 500m, 1km, 2km） × building_category別（house, mansion, all）
    の近傍集計特徴量を追加する。

    作る特徴量例：
      - mean_price_300m, median_price_300m（実数）
      - mean_price_300m_log, median_price_300m_log（log）
      - std_price_300m, iqr_price_300m
      - count_neighbors_300m

      - mean_price_300m_house, mean_price_300m_house_log
      - mean_price_300m_mansion, mean_price_300m_mansion_log
      - …

    注意：
      test 側は train のみを近傍に使う。
      train 側は「同じ building_id」は除外する。
    """

    train_df = train_df.copy()
    test_df  = test_df.copy()

    # ------------------------------
    # 基本セット
    # ------------------------------
    train_coords = np.radians(train_df[[lat_col, lon_col]].to_numpy())
    test_coords  = np.radians(test_df[[lat_col, lon_col]].to_numpy())

    y_train = train_df[target_col].to_numpy()
    cats    = train_df[building_category_col].to_numpy()
    bids    = train_df[building_id_col].to_numpy()

    n_train = len(train_df)

    tree = BallTree(train_coords, metric='haversine')
    R = 6371.0

    # global values
    g_mean    = np.nanmean(y_train)
    g_median  = np.nanmedian(y_train)
    g_log_mean = np.log1p(g_mean)
    g_log_median = np.log1p(g_median)
    g_std     = np.nanstd(y_train)
    g_iqr     = np.nanpercentile(y_train, 75) - np.nanpercentile(y_train, 25)

    # category mask (高速化のため in advance)
    mask_house   = (cats == 'house')
    mask_mansion = (cats == 'mansion')

    # =======================================================
    # Train (高速化)
    # =======================================================
    for r in radius_list_m:

        r_rad = (r / 1000) / R

        # ★1回で全行の近傍 index を取得
        neigh_list = tree.query_radius(train_coords, r=r_rad, return_distance=False)

        # 出力用の NumPy 配列（非常に高速）
        mean_all      = np.full(n_train, g_mean)
        median_all    = np.full(n_train, g_median)
        mean_log_all  = np.full(n_train, g_log_mean)
        median_log_all= np.full(n_train, g_log_median)
        std_all       = np.full(n_train, g_std)
        iqr_all       = np.full(n_train, g_iqr)
        cnt_all       = np.zeros(n_train, dtype=int)

        # category
        mean_house = np.full(n_train, g_mean)
        mean_mansion = np.full(n_train, g_mean)
        mean_house_log = np.full(n_train, g_log_mean)
        mean_mansion_log = np.full(n_train, g_log_mean)

        cnt_house = np.zeros(n_train, dtype=int)
        cnt_mansion = np.zeros(n_train, dtype=int)

        # ★ Python ループは「train 行数分」の1回だけ
        for i, neigh_idx in enumerate(neigh_list):

            neigh_idx = neigh_idx[bids[neigh_idx] != bids[i]]
            if len(neigh_idx) == 0:
                continue

            prices = y_train[neigh_idx]

            # all category
            m = prices.mean()
            md = np.median(prices)
            mean_all[i] = m
            median_all[i] = md
            mean_log_all[i] = np.log1p(m)
            median_log_all[i] = np.log1p(md)
            cnt_all[i] = len(neigh_idx)

            if len(neigh_idx) > 1:
                std_all[i] = prices.std()
                q75, q25 = np.percentile(prices, [75, 25])
                iqr_all[i] = q75 - q25

            # category split
            neigh_cat = cats[neigh_idx]

            # house
            idx_h = neigh_idx[neigh_cat == 'house']
            if len(idx_h) > 0:
                p = y_train[idx_h]
                mean_house[i] = p.mean()
                mean_house_log[i] = np.log1p(p.mean())
                cnt_house[i] = len(idx_h)

            # mansion
            idx_m = neigh_idx[neigh_cat == 'mansion']
            if len(idx_m) > 0:
                p = y_train[idx_m]
                mean_mansion[i] = p.mean()
                mean_mansion_log[i] = np.log1p(p.mean())
                cnt_mansion[i] = len(idx_m)

        # ★ 一括で DataFrame に代入（高速）
        train_df[f'mean_price_{r}m'] = mean_all
        train_df[f'median_price_{r}m'] = median_all
        train_df[f'mean_price_{r}m_log'] = mean_log_all
        train_df[f'median_price_{r}m_log'] = median_log_all
        train_df[f'std_price_{r}m'] = std_all
        train_df[f'iqr_price_{r}m'] = iqr_all
        train_df[f'count_neighbors_{r}m'] = cnt_all

        train_df[f'mean_price_{r}m_house'] = mean_house
        train_df[f'mean_price_{r}m_house_log'] = mean_house_log
        train_df[f'count_neighbors_{r}m_house'] = cnt_house

        train_df[f'mean_price_{r}m_mansion'] = mean_mansion
        train_df[f'mean_price_{r}m_mansion_log'] = mean_mansion_log
        train_df[f'count_neighbors_{r}m_mansion'] = cnt_mansion

    # =======================================================
    # Test（train のみ近傍）
    # =======================================================
    for r in radius_list_m:

        r_rad = (r / 1000) / R
        neigh_list = tree.query_radius(test_coords, r=r_rad, return_distance=False)

        n = len(test_df)

        mean_all      = np.full(n, g_mean)
        median_all    = np.full(n, g_median)
        mean_log_all  = np.full(n, g_log_mean)
        median_log_all= np.full(n, g_log_median)
        std_all       = np.full(n, g_std)
        iqr_all       = np.full(n, g_iqr)
        cnt_all       = np.zeros(n, dtype=int)

        mean_house = np.full(n, g_mean)
        mean_mansion = np.full(n, g_mean)
        mean_house_log = np.full(n, g_log_mean)
        mean_mansion_log = np.full(n, g_log_mean)
        cnt_house = np.zeros(n, dtype=int)
        cnt_mansion = np.zeros(n, dtype=int)

        for i, neigh_idx in enumerate(neigh_list):
            if len(neigh_idx) == 0:
                continue

            prices = y_train[neigh_idx]
            cats_sub = cats[neigh_idx]

            m = prices.mean()
            md = np.median(prices)

            mean_all[i] = m
            median_all[i] = md
            mean_log_all[i] = np.log1p(m)
            median_log_all[i] = np.log1p(md)
            cnt_all[i] = len(neigh_idx)

            if len(neigh_idx) > 1:
                std_all[i] = prices.std()
                q75, q25 = np.percentile(prices, [75, 25])
                iqr_all[i] = q75 - q25

            # category
            mask_h = cats_sub == 'house'
            if mask_h.sum() > 0:
                p = prices[mask_h]
                mean_house[i] = p.mean()
                mean_house_log[i] = np.log1p(p.mean())
                cnt_house[i] = mask_h.sum()

            mask_m = cats_sub == 'mansion'
            if mask_m.sum() > 0:
                p = prices[mask_m]
                mean_mansion[i] = p.mean()
                mean_mansion_log[i] = np.log1p(p.mean())
                cnt_mansion[i] = mask_m.sum()

        # 一括代入
        test_df[f'mean_price_{r}m'] = mean_all
        test_df[f'median_price_{r}m'] = median_all
        test_df[f'mean_price_{r}m_log'] = mean_log_all
        test_df[f'median_price_{r}m_log'] = median_log_all
        test_df[f'std_price_{r}m'] = std_all
        test_df[f'iqr_price_{r}m'] = iqr_all
        test_df[f'count_neighbors_{r}m'] = cnt_all

        test_df[f'mean_price_{r}m_house'] = mean_house
        test_df[f'mean_price_{r}m_house_log'] = mean_house_log
        test_df[f'count_neighbors_{r}m_house'] = cnt_house

        test_df[f'mean_price_{r}m_mansion'] = mean_mansion
        test_df[f'mean_price_{r}m_mansion_log'] = mean_mansion_log
        test_df[f'count_neighbors_{r}m_mansion'] = cnt_mansion

    return train_df, test_df


In [15]:
train_df_fe, test_df_fe = add_multi_radius_neighbor_features(
    train_df_fe,
    test_df_fe,
    target_col='money_room',
    lat_col='lat',
    lon_col='lon',
)

## 市区町村ごとの緯度・経度の中心

In [16]:
# 作成される特徴量：'city_lat', 'city_lon'
city_col = 'City/town/village name'

# --- ① train + test を結合 ---
combined_df = pd.concat([train_df_fe, test_df_fe], axis=0, ignore_index=True)

combined_df['lat'] = combined_df['lat'].astype(float)
combined_df['lon'] = combined_df['lon'].astype(float)

# --- ② 市区町村ごとの lat / lon の中央値 ---
city_lat_median = combined_df.groupby(city_col)['lat'].median()
city_lon_median = combined_df.groupby(city_col)['lon'].median()

# --- ③ 各レコードに city_lat / city_lon を付与 ---
combined_df['city_lat'] = combined_df[city_col].map(city_lat_median)
combined_df['city_lon'] = combined_df[city_col].map(city_lon_median)

# 型を float に統一
combined_df['city_lat'] = combined_df['city_lat'].astype('float')
combined_df['city_lon'] = combined_df['city_lon'].astype('float')

# --- ④ NaN を全体の中央値で埋める ---
combined_df['city_lat'] = combined_df['city_lat'].fillna(combined_df['lat'].median())
combined_df['city_lon'] = combined_df['city_lon'].fillna(combined_df['lon'].median())

# --- ⑤ 再分割 ---
train_df_fe = combined_df.iloc[:len(train_df_fe)].copy()
test_df_fe  = combined_df.iloc[len(train_df_fe):].copy()

## タグ情報のPCA

In [17]:
from sklearn.decomposition import PCA

# --- 1) combined_df を作成 ---
combined_df = pd.concat([train_df_fe, test_df_fe], ignore_index=True)

# --- 2) タグ列グループ（新カテゴリごと） ---
tag_groups = {
    # 土地まわりのタグ
    'land_price': [
        c for c in combined_df.columns
        if c.startswith('土地価格_')
    ],

    # 建物性能・構造
    'building_struct': [
        c for c in combined_df.columns
        if c.startswith('建物構造・性能_')
    ],

    # 建物の給排水・インフラ設備
    'infra': [
        c for c in combined_df.columns
        if c.startswith('建物設備（給排水・インフラ）_')
    ],

    # 立地プレミアム（タグ由来のもの）
    'location_premium': [
        c for c in combined_df.columns
        if c.startswith('立地プレミアム_')
    ],

    # 環境プレミアム（タグ由来のもの）
    'environment': [
        c for c in combined_df.columns
        if c.startswith('環境プレミアム_')
    ],

    # 専有部分設備
    'senyu': [
        c for c in combined_df.columns
        if c.startswith('専有部分設備')
    ],

    # 用途・投資セグメント
    'sales_status': [
        c for c in combined_df.columns
        if c.startswith('用途・投資セグメント_売買ステータス_')
    ],
    'certificate': [
        c for c in combined_df.columns
        if c.startswith('用途・投資セグメント_不動産の証明書・性能評価_')
    ],
}

# --- 2.5) グループごとの PCA 次元数（推奨値） ---
pca_dims = {
    'land_price': 1,          # 土地価格系タグは少数なので 1〜2
    'building_struct': 3,     # 構造・性能
    'infra': 3,               # 給排水・インフラ
    'location_premium': 1,    # 立地プレミアムタグは情報量少なめ
    'environment': 2,         # 環境系タグ
    'senyu': 5,                # 浴室・洗面
    'certificate': 3,         # 証明書・評価系
}

# --- 3) PCA + 累積寄与率を計算する関数 ---
def add_pca_features_and_report(df, cols, prefix, n_components):
    if len(cols) == 0:
        print(f'[SKIP] {prefix}: No columns')
        return df

    # 列数より多い成分数は指定できないので調整しておく
    n_components = min(n_components, len(cols))
    if n_components <= 0:
        print(f'[SKIP] {prefix}: n_components <= 0 after adjustment')
        return df

    X = df[cols].fillna(0).astype(float)

    pca = PCA(n_components=n_components, random_state=42)
    pca_features = pca.fit_transform(X)

    # 新しい PCA 列を追加
    for i in range(n_components):
        df[f'{prefix}_pca_{i+1}'] = pca_features[:, i]

    # 累積寄与率を計算
    explained = pca.explained_variance_ratio_
    cum_explained = explained.cumsum()

    # 表示
    print(f'\n=== {prefix} PCA Explained Variance (n_components={n_components}) ===')
    for i, (e, c) in enumerate(zip(explained, cum_explained), start=1):
        print(f'PC{i}: {e:.4f},  Cumulative: {c:.4f}')
    print('========================================\n')

    return df


# --- 4) 各グループに対して PCA + 累積寄与率を表示 ---
for prefix, cols in tag_groups.items():
    n_comp = pca_dims.get(prefix, 0)
    if n_comp <= 0:
        print(f'[SKIP] {prefix}: n_components <= 0 (not defined)')
        continue

    combined_df = add_pca_features_and_report(
        combined_df,
        cols,
        prefix,
        n_components=n_comp
    )

# --- 5) train/test に戻す ---
train_len = len(train_df_fe)
train_df_fe = combined_df.iloc[:train_len].copy()
test_df_fe  = combined_df.iloc[train_len:].copy()


=== land_price PCA Explained Variance (n_components=1) ===
PC1: 0.9536,  Cumulative: 0.9536


=== building_struct PCA Explained Variance (n_components=3) ===
PC1: 0.3215,  Cumulative: 0.3215
PC2: 0.1049,  Cumulative: 0.4264
PC3: 0.0547,  Cumulative: 0.4811


=== infra PCA Explained Variance (n_components=3) ===
PC1: 0.4508,  Cumulative: 0.4508
PC2: 0.1842,  Cumulative: 0.6350
PC3: 0.1323,  Cumulative: 0.7673


=== location_premium PCA Explained Variance (n_components=1) ===
PC1: 0.9414,  Cumulative: 0.9414


=== environment PCA Explained Variance (n_components=2) ===
PC1: 0.6547,  Cumulative: 0.6547
PC2: 0.0933,  Cumulative: 0.7480


=== senyu PCA Explained Variance (n_components=5) ===
PC1: 0.3034,  Cumulative: 0.3034
PC2: 0.0859,  Cumulative: 0.3893
PC3: 0.0522,  Cumulative: 0.4415
PC4: 0.0413,  Cumulative: 0.4828
PC5: 0.0365,  Cumulative: 0.5193

[SKIP] sales_status: n_components <= 0 (not defined)

=== certificate PCA Explained Variance (n_components=3) ===
PC1: 0.3726,  Cumulativ

## 交通情報

In [18]:
def add_access_zone_features(df, far_thresh=5000, error_thresh=20000, walk_speed=80):
    """
    交通系FEをまとめて付与する関数

    作成する特徴量:
      - access_zone           : 'walk', 'bus', 'car', 'error', 'unknown'
      - door_to_station_min   : 駅までの実質アクセス時間（徒歩 + バス）
      - door_to_station_min_log
    """
    df = df.copy()

    # 元の距離を退避
    df['walk_distance1_raw'] = df['walk_distance1']

    # 基本フラグ
    has_eki           = df['eki_name1'].notnull()
    has_bus_stop      = df['bus_stop1'].notnull()
    has_bus_time      = df['bus_time1'].notnull()
    has_traffic_other = df['traffic_other'].notnull()

    # =========================
    # 1) access_zone の分類
    # =========================
    df['access_zone'] = 'unknown'

    # 徒歩圏：駅あり & バス停なし
    mask_walk = has_eki & ~has_bus_stop
    df.loc[mask_walk, 'access_zone'] = 'walk'

    # バス圏：バス停あり（駅あり/なしどちらも）
    mask_bus = has_bus_stop
    df.loc[mask_bus, 'access_zone'] = 'bus'

    # 自動車圏候補：「駅までめっちゃ遠い」ケース
    very_far = df['walk_distance1_raw'] >= far_thresh

    # 1) traffic_other にコメントあり → 自動車圏
    mask_car1 = very_far & has_traffic_other

    # 2) 駅までの徒歩距離が far_thresh〜error_thresh 未満 → 自動車圏
    mask_car2 = very_far & (df['walk_distance1_raw'] < error_thresh)

    mask_car = mask_car1 | mask_car2

    # walk / bus / unknown のうち、car 条件を満たすものを上書き
    df.loc[mask_car, 'access_zone'] = 'car'

    # 20,000m 以上は入力ミス疑い(駅の位置情報取れれば直接計算もできるが、、)
    mask_error = df['walk_distance1_raw'] >= error_thresh
    df.loc[mask_error, 'access_zone'] = 'error'

    # カテゴリ化
    df['access_zone'] = df['access_zone'].astype('category')

    # =========================
    # 2) 駅までの実質アクセス時間
    #    (door_to_station_min)
    # =========================
    # 自宅→徒歩時間（分）
    walk_min1 = df['walk_distance1_raw'] / walk_speed

    # 初期化
    df['door_to_station_min'] = np.nan

    # パターンA：駅あり + バス停あり + バス時間あり（徒歩＋バス）
    mask_A = has_eki & has_bus_stop & has_bus_time
    df.loc[mask_A, 'door_to_station_min'] = (
        walk_min1[mask_A] + df.loc[mask_A, 'bus_time1']
    )

    # パターンB：駅のみ → 徒歩のみ
    mask_B = has_eki & ~has_bus_stop
    df.loc[mask_B, 'door_to_station_min'] = walk_min1[mask_B]

    # パターンC（駅なし＋バスのみ）は NaN のまま
    # → bus_only_flag などで別途対処可能

    # 非線形吸収用ログ特徴量
    df['door_to_station_min_log'] = np.log1p(df['door_to_station_min'])

    bins = [-1, 300, 700, 1500, 5000]
    labels = ['0-300', '300-700', '700-1500', '1500-5000']
  
    df['walk_distance_bin'] = pd.cut(
        df['walk_distance1_raw'],
        bins=bins,
        labels=labels
    ).astype('category')

    return df

train_df_fe = add_access_zone_features(train_df_fe)
test_df_fe  = add_access_zone_features(test_df_fe)

In [19]:
traffic_te_cols = ['eki_name1', 'eki_name2', 'rosen_name1', 'rosen_name2']

global_mean = train_df_fe[target_col].mean()

for col in traffic_te_cols:
    mapping = train_df_fe.groupby(col)[target_col].mean()

    train_df_fe[col + '_te'] = np.log1p(train_df_fe[col].map(mapping))
    test_df_fe[col + '_te']  = np.log1p(test_df_fe[col].map(mapping).fillna(global_mean))

## 建物情報の拡充

In [20]:
def add_building_area_features(df):
    """
    建物規模系の追加特徴量を作成する
    - density_floor_area
    - unit_count_density
    - unit_area_range
    - empty_ratio
    """
    df = df.copy()

    # ================
    # 1) 延床面積密度
    # ================
    df['density_floor_area'] = df['nobeyuka_area'] / df['tochi_area']
    
    # ================
    # 2) 総戸数密度
    # ================
    df['unit_count_density'] = df['unit_count'] / df['tochi_area']

    # ================
    # 3) 専有面積の範囲（max-min）
    # ================
    df['unit_area_range'] = df['unit_area_max'] - df['unit_area_min']

    # ================
    # 4) 空室率
    #    unit_count = 0 のケースは NaN にする
    # ================
    df['empty_ratio'] = df['empty_number'] / df['unit_count']

    return df

train_df_fe = add_building_area_features(train_df_fe)
test_df_fe  = add_building_area_features(test_df_fe)

## リフォーム・リノベ情報の拡充

In [21]:
def add_effective_age(df, train_year, train_month):
    df = df.copy()

    # 1) 日付を datetime に
    date_cols = ['reform_interior_date', 'reform_wet_area_date',
                 'reform_exterior_date', 'renovation_date']

    df[date_cols] = df[date_cols].apply(lambda col: pd.to_datetime(col, errors='coerce'))

    # 2) YYYYMM 整数化（datetime → int）
    for c in date_cols:
        df[c + '_ym'] = df[c].dt.year * 100 + df[c].dt.month

    # 部分リフォームのみ
    reform_cols_ym = ['reform_interior_date_ym', 'reform_wet_area_date_ym', 'reform_exterior_date_ym']

    df['first_reform_yearmonth'] = df[reform_cols_ym].min(axis=1)

    # フルリノベ判定
    reform_flag_cols = df.filter(regex=r'^reform_(exterior|wet_area|interior)').columns
    reform_count = df[reform_flag_cols].apply(pd.to_numeric, errors='coerce').fillna(0).astype(int).sum(axis=1)

    is_full_reno = df['renovation_date'].notnull() | (reform_count >= 5)

    # effective_build_year
    df['reno_year'] = df['renovation_date'].dt.year
    df['effective_build_year'] = np.where(
        is_full_reno & df['renovation_date'].notnull(),
        df['reno_year'],
        df['year_built']
    )

    # effective_age
    df['effective_age'] = train_year - df['effective_build_year']

    # ================
    # リノベ後のリフォームだけ discount する
    # ================
    def calc_discount(row):
        reno_ym = row['renovation_date_ym'] if pd.notnull(row['renovation_date_ym']) else None
        discount = 0

        # interior
        if pd.notnull(row['reform_interior_date_ym']):
            if (reno_ym is None) or (row['reform_interior_date_ym'] > reno_ym):
                discount += 2

        # wet area
        if pd.notnull(row['reform_wet_area_date_ym']):
            if (reno_ym is None) or (row['reform_wet_area_date_ym'] > reno_ym):
                discount += 3

        # exterior
        if pd.notnull(row['reform_exterior_date_ym']):
            if (reno_ym is None) or (row['reform_exterior_date_ym'] > reno_ym):
                discount += 1

        return discount

    df['discount'] = df.apply(calc_discount, axis=1)
    df['effective_age'] = np.maximum(0, df['effective_age'] - df['discount'])

    # ================
    # reform_newness（経過月数）
    # ================
    ref_total_months = train_year * 12 + train_month

    df['reform_newness'] = np.where(
        df['first_reform_yearmonth'].notnull(),
        ref_total_months - df['first_reform_yearmonth'] // 1,   # YYYYMM整数に対して計算
        np.nan
    )

    df['reform_newness_log'] = np.log1p(df['reform_newness'])
    df['reform_total_count'] = reform_count

    return df


In [22]:
# 例：target_ym の最大値から基準年・月を決める
train_max_ym = train_df_fe['target_ym'].max()  # 例: 202210
train_year = train_max_ym // 100            # → 2022
train_month = train_max_ym % 100            # → 10

train_df_fe = add_effective_age(train_df_fe, train_year, train_month)
test_df_fe  = add_effective_age(test_df_fe, train_year, train_month)

## 掲載期間

In [23]:
# 作成する特徴量：'listing_months'
def calculate_listing_months(df):

    # 1) building_create_date を datetime へ（すでに datetime の場合はスキップ）
    df['building_create_date'] = pd.to_datetime(df['building_create_date'], errors='coerce')

    # 2) target_ym (例: 201901 → 2019-01-01) を datetime に変換
    df['target_ym_date'] = pd.to_datetime(df['target_ym'].astype(str), format='%Y%m', errors='coerce')

    # 3) 年月のみを使った '月数' 変換
    b_y = df['building_create_date'].dt.year
    b_m = df['building_create_date'].dt.month
    t_y = df['target_ym_date'].dt.year
    t_m = df['target_ym_date'].dt.month

    # 4) 掲載期間（何ヶ月経っているか）を計算
    df['listing_months'] = (t_y - b_y) * 12 + (t_m - b_m)

    # 5) マイナス値はおかしいので NaN にする
    df.loc[df['listing_months'] < 0, 'listing_months'] = pd.NA

    return df


train_df_fe = calculate_listing_months(train_df_fe)
test_df_fe  = calculate_listing_months(test_df_fe)


## 地価の比率・平均値

In [24]:
# 作成する特徴量：'ratio_mean_land', 'ratio_weighted_land'
def add_land_price_ratios(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # 分母が 0 or NaN のときは NaN にするマスク
    mask_mean = (df['nearest_land_price'] > 0) & df['mean_price_1000m'].notna()
    mask_weighted = (df['weighted_land_price_3'] > 0) & df['median_price_1000m'].notna()

    df['ratio_mean_land'] = np.nan
    df.loc[mask_mean, 'ratio_mean_land'] = (
        df.loc[mask_mean, 'mean_price_1000m'] / df.loc[mask_mean, 'nearest_land_price']
    )

    df['ratio_weighted_land'] = np.nan
    df.loc[mask_weighted, 'ratio_weighted_land'] = (
        df.loc[mask_weighted, 'median_price_1000m'] / df.loc[mask_weighted, 'weighted_land_price_3']
    )

    return df

train_df_fe = add_land_price_ratios(train_df_fe)
test_df_fe  = add_land_price_ratios(test_df_fe)


## 未整理FE

In [25]:
# 作成する特徴量：'kyoueki_per_m2', 'shuuzen_per_m2', 'kyoueki_per_unit', 'shuuzen_per_unit', 'has_kyoueki', 'has_shuuzen', 'land_cheap_flag', 'land_expensive_flag', 'land_theoretical_price', 'land_theoretical_price_weighted', 'land_theoretical_price_within1km_mp',
def add_block1_2_features(train_df_fe: pd.DataFrame,
                          test_df_fe: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    ① 共益費・修繕費まわりのFE
    ② 地価ギャップまわりのFE
    を train/test 両方に追加する。
    """
    train = train_df_fe.copy()
    test  = test_df_fe.copy()
    
    for df in (train, test):
        # ======================
        # ① 共益費・修繕費ブロック
        # ======================
        # 面積・戸数が無い行は NaN にしておく（LightGBM は NaN を自然に扱える）
        house = df.get('senyu_area')
        unit_cnt = df.get('unit_count')
        
        # 共益費・修繕積立の生値
        kyoueki = df.get('money_kyoueki_std')
        shuuzen = df.get('money_shuuzen')
        
        # 1) 面積あたり
        if (house is not None) and (kyoueki is not None):
            df['kyoueki_per_m2'] = np.where(
                (house > 0) & np.isfinite(house),
                kyoueki / house,
                np.nan
            )
        if (house is not None) and (shuuzen is not None):
            df['shuuzen_per_m2'] = np.where(
                (house > 0) & np.isfinite(house),
                shuuzen / house,
                np.nan
            )
        
        # 2) 戸数あたり
        if (unit_cnt is not None) and (kyoueki is not None):
            df['kyoueki_per_unit'] = np.where(
                (unit_cnt > 0) & np.isfinite(unit_cnt),
                kyoueki / unit_cnt,
                np.nan
            )
        if (unit_cnt is not None) and (shuuzen is not None):
            df['shuuzen_per_unit'] = np.where(
                (unit_cnt > 0) & np.isfinite(unit_cnt),
                shuuzen / unit_cnt,
                np.nan
            )
        
        # 3) 有無フラグ
        if kyoueki is not None:
            df['has_kyoueki'] = (kyoueki > 0).astype('Int8')
        if shuuzen is not None:
            df['has_shuuzen'] = (shuuzen > 0).astype('Int8')
        
        # ======================
        # ② 地価ギャップブロック
        # ======================
        nearest_lp   = df.get('nearest_land_price')
        weighted_lp  = df.get('weighted_land_price_3')
        within1km_mp = df.get('mean_price_1000m')
        
        # 割安/割高フラグ（threshold は仮に 0.8 / 1.2）
        if 'ratio_mean_land' in df.columns:
            df['land_cheap_flag'] = (df['ratio_mean_land'] < 0.8).astype('Int8')
            df['land_expensive_flag'] = (df['ratio_mean_land'] > 1.2).astype('Int8')
        
        # 理論土地価格（= 地価 × 土地面積）
        land_area = df.get('kukaku_area')
        if (land_area is not None) and (nearest_lp is not None):
            df['land_theoretical_price'] = np.where(
                (land_area > 0) & np.isfinite(land_area),
                np.log1p(nearest_lp * land_area),
                np.nan
            )
        if (land_area is not None) and (weighted_lp is not None):
            df['land_theoretical_price_weighted'] = np.where(
                (land_area > 0) & np.isfinite(land_area),
                np.log1p(weighted_lp * land_area),
                np.nan
            )
        if (land_area is not None) and (weighted_lp is not None):
            df['land_theoretical_price_within1km_mp'] = np.where(
                (land_area > 0) & np.isfinite(land_area),
                np.log1p(within1km_mp * land_area),
                np.nan
            )
    
    return train, test


In [26]:
train_df_fe, test_df_fe = add_block1_2_features(train_df_fe, test_df_fe)

In [27]:
# 作成する特徴量：'parking_available', 'parking_on_site', 'parking_nearby_only', 'parking_distance_log', 'parking_money', 'parking_cost_ratio_clip_log',  'parking_cost_per_sqm_clip',
def add_parking_features(train_df_fe: pd.DataFrame,
                                  test_df_fe: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    駐車場まわり（有無・近さ・コスト比）
      - parking_available, parking_on_site, parking_nearby_only
      - parking_distance_log
      - parking_cost_ratio, parking_cost_ratio_log
      - parking_cost_per_sqm

    を train/test 両方に付与する。
    """
    combined = pd.concat([train_df_fe, test_df_fe], ignore_index=True)

    # ---------------------------
    # 4) 駐車場まわりの特徴量
    # ---------------------------
    pk = combined.get('parking_kubun', pd.Series(np.nan, index=combined.index))

    # 1:空有, 3:近隣, 5:有 → 駐車場あり
    combined['parking_available'] = pk.isin([1, 3, 5]).astype(int)

    # 敷地内あり（1:空有, 5:有）
    combined['parking_on_site'] = pk.isin([1, 5]).astype(int)

    # 近隣のみ（3）
    combined['parking_nearby_only'] = (pk == 3).astype(int)

    # 距離（log）
    if 'parking_distance' in combined.columns:
        dist = combined['parking_distance'].astype(float)
        dist = dist.clip(lower=0)
        combined['parking_distance_log'] = np.log1p(dist)
    else:
        combined['parking_distance_log'] = np.nan

    # コスト比：駐車場料金 / 物件価格
    parking_money = combined.get('parking_money_std', pd.Series(np.nan, index=combined.index)).astype(float)
    money_room    = combined.get('money_room',    pd.Series(np.nan, index=combined.index)).astype(float)

    valid_cost = (parking_money > 0) & (money_room > 0)
    parking_cost_ratio = np.where(valid_cost, parking_money / money_room, np.nan)
    parking_cost_ratio = np.clip(parking_cost_ratio, 0, 0.1)  # 10% 以上はアウトライヤーとしてクリップ（お好み）

    combined['parking_cost_ratio']      = parking_cost_ratio

    # コスト / 専有面積（senyu_area 優先, なければ unit_area）
    if 'senyu_area' in combined.columns:
        area = combined['senyu_area'].astype(float)
    else:
        area = pd.Series(np.nan, index=combined.index)

    valid_area = (parking_money > 0) & (area > 0)
    parking_cost_per_sqm = np.where(valid_area, parking_money / area, np.nan)
    parking_cost_per_sqm = np.clip(parking_cost_per_sqm, 0, np.nanpercentile(parking_cost_per_sqm[~np.isnan(parking_cost_per_sqm)], 99)
                                  ) if np.any(valid_area) else parking_cost_per_sqm

    combined['parking_cost_per_sqm'] = parking_cost_per_sqm

    # ---------------------------
    # 5) train/test に戻す
    # ---------------------------
    n_train = len(train_df_fe)
    train_out = combined.iloc[:n_train].reset_index(drop=True)
    test_out  = combined.iloc[n_train:].reset_index(drop=True)

    return train_out, test_out


In [28]:
train_df_fe, test_df_fe = add_parking_features(train_df_fe, test_df_fe)

In [29]:
for col in ['parking_cost_ratio', 'parking_cost_per_sqm', 'parking_money_std']:
    x = train_df_fe[col]
    # 上位1%でクリップ
    hi = x.quantile(0.99)
    train_df_fe[col + '_clip'] = x.clip(upper=hi)
    test_df_fe[col + '_clip']  = test_df_fe[col].clip(upper=hi)

# ratio 系は明示的に log バージョンだけ使うのもアリ
train_df_fe['parking_cost_ratio_clip_log'] = np.log1p(train_df_fe['parking_cost_ratio_clip'])
test_df_fe['parking_cost_ratio_clip_log']  = np.log1p(test_df_fe['parking_cost_ratio_clip'])

In [30]:
# 作成する特徴量：'log_min_amenity_distance', 'convenience_distance_log', 'super_distance_log', 'drugstore_distance_log', 'amenity_within_500m', 'amenity_within_1000m',
def add_life_convenience_features(
    train_df_fe: pd.DataFrame,
    test_df_fe: pd.DataFrame,
    amenity_cols: list[str] | None = None,
    thresholds: tuple[int, int] = (500, 1000),
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    生活利便距離の log & 最小距離 & 閾値フラグ を追加する。

    - 各 amenity 距離について log 変換列 xxx_log を追加
    - min_amenity_distance: 複数の生活利便距離の最小値
    - log_min_amenity_distance: 上記の log1p
    - amenity_within_500m, amenity_within_1000m: 閾値以下の便益フラグ
    """
    combined = pd.concat([train_df_fe, test_df_fe], ignore_index=True)

    # デフォルトの生活利便距離カラム
    if amenity_cols is None:
        amenity_cols = [
            c for c in [
                'convenience_distance',
                'super_distance',
                'hospital_distance',
                'park_distance',
                'drugstore_distance',
                'bank_distance',
                'shopping_street_distance',
                'est_other_distance',
            ]
            if c in combined.columns
        ]

    # 何もなければそのまま返す
    if len(amenity_cols) == 0:
        print('[add_life_convenience_features] amenity_cols is empty. Skip.')
        return train_df_fe, test_df_fe

    # 各 amenity 距離の log 変換
    for col in amenity_cols:
        vals = combined[col].astype(float)
        vals = vals.clip(lower=0)  # マイナスは 0 に丸める
        combined[f'{col}_log'] = np.log1p(vals)

    # 最小距離
    amenity_dist = combined[amenity_cols].astype(float)
    min_dist = amenity_dist.min(axis=1)

    combined['min_amenity_distance'] = min_dist
    combined['log_min_amenity_distance'] = np.log1p(min_dist.clip(lower=0))

    # 閾値フラグ（例: 500m, 1000m）
    th1, th2 = thresholds
    combined[f'amenity_within_{th1}m'] = (min_dist <= th1).astype(int)
    combined[f'amenity_within_{th2}m'] = (min_dist <= th2).astype(int)

    # 分割して返す
    n_train = len(train_df_fe)
    train_out = combined.iloc[:n_train].reset_index(drop=True)
    test_out  = combined.iloc[n_train:].reset_index(drop=True)

    return train_out, test_out


In [31]:
train_df_fe, test_df_fe = add_life_convenience_features(train_df_fe, test_df_fe)

In [32]:
# 作成する特徴量：'genkyo_flex_score', 'usable_status_score', 'usable_months_delay', 'has_management_association', 'management_form_score', 'manager_presence_score', 'management_total_score',
def add_status_and_management_features(
    train_df_fe: pd.DataFrame,
    test_df_fe: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    現況(genkyo_code)、引渡し(usable_status, usable_date)、
    管理状態(management_form, management_association_flg, house_kanrinin)
    を再整理 & スコア化して特徴量追加。
    """

    combined = pd.concat([train_df_fe, test_df_fe], ignore_index=True)

    # ---------------------------
    # 1) 現況 (genkyo_code)
    # ---------------------------
    genkyo = combined.get('genkyo_code', pd.Series(np.nan, index=combined.index)).astype(float)

    # 自由度スコア
    flex_map = {
        1: 0.5,   # 居住中 or 更地（兼ねるので控えめ）
        2: 1.5,   # 空家
        3: 0.0,   # 賃貸中 → 自由度低い
        4: 1.0,   # 未完成
        10: 1.5,  # 古屋あり更地引渡可
    }
    combined['genkyo_flex_score'] = genkyo.map(flex_map).fillna(0.0)

    # ---------------------------
    # 2) 引渡し (usable_status, usable_date)
    # ---------------------------
    usable_status = combined.get('usable_status', pd.Series(np.nan, index=combined.index)).astype(float)
    usable_date   = combined.get('usable_date',   pd.Series(np.nan, index=combined.index)).astype(float)
    target_ym     = combined.get('target_ym',     pd.Series(np.nan, index=combined.index)).astype(float)

    # フラグ
    combined['usable_immediate_flag']   = (usable_status == 1).astype(int)
    combined['usable_fixed_date_flag']  = (usable_status == 3).astype(int)

    # スコア: 即時 > 期日指定 > 相談・未定
    usable_score_map = {
        1: 2.0,   # 即時
        3: 1.0,   # 期日指定
        2: 0.5,   # 相談
        4: 0.0,   # 未定
    }
    combined['usable_status_score'] = usable_status.map(usable_score_map).fillna(0.0)

    # yyyymm 同士の差分 → おおざっぱに「ヶ月差」とみなす
    # 例: target_ym=202201, usable_date=202204 → 3 ヶ月
    usable_months_delay = np.nan

    if 'usable_date' in combined.columns and 'target_ym' in combined.columns:
        # 年月を整数に分解
        u = usable_date.copy()
        t = target_ym.copy()

        u_year  = (u // 100).astype('Int64')
        u_month = (u % 100).astype('Int64')

        t_year  = (t // 100).astype('Int64')
        t_month = (t % 100).astype('Int64')

        usable_months_delay = (u_year - t_year) * 12 + (u_month - t_month)

    combined['usable_months_delay'] = usable_months_delay

    # ---------------------------
    # 3) 管理状態
    # ---------------------------
    management_form          = combined.get('management_form',          pd.Series(np.nan, index=combined.index)).astype(float)
    management_association   = combined.get('management_association_flg', pd.Series(np.nan, index=combined.index)).astype(float)
    house_kanrinin           = combined.get('house_kanrinin',          pd.Series(np.nan, index=combined.index)).astype(float)

    # 管理組合あり
    combined['has_management_association'] = (management_association == 2).astype(int)

    # プロ管理（委託あり）
    combined['has_professional_management'] = management_form.isin([2, 3]).astype(int)

    # 管理人あり
    combined['has_manager'] = house_kanrinin.isin([1, 2, 3, 5]).astype(int)

    # 管理形態スコア
    management_form_score_map = {
        1: 1.0,   # 自主管理
        2: 2.0,   # 一部委託
        3: 3.0,   # 全部委託
    }
    combined['management_form_score'] = management_form.map(management_form_score_map).fillna(0.0)

    # 管理人スコア
    manager_score_map = {
        4: 0.0,   # 無
        5: 0.5,   # 非常駐
        3: 1.0,   # 巡回
        2: 1.5,   # 日勤
        1: 2.0,   # 常駐
    }
    combined['manager_presence_score'] = house_kanrinin.map(manager_score_map).fillna(0.0)

    # 合計管理スコア
    combined['management_total_score'] = (
        combined['management_form_score']
        + combined['manager_presence_score']
        + combined['has_management_association'] * 0.5  # 管理組合ありに少し加点
    )

    # ---------------------------
    # 4) train/test に戻す
    # ---------------------------
    n_train = len(train_df_fe)
    train_out = combined.iloc[:n_train].reset_index(drop=True)
    test_out  = combined.iloc[n_train:].reset_index(drop=True)

    return train_out, test_out


In [33]:
train_df_fe, test_df_fe = add_status_and_management_features(train_df_fe, test_df_fe)

## 持分比率

In [34]:
def make_mochibun_features(df):
    df = df.copy()

    # 持分割合（欠損は 100% 所有とみなす）
    df['mochibun_ratio'] = df['land_mochibun_b'] / df['land_mochibun_a']
    df['mochibun_ratio'] = df['mochibun_ratio'].replace([np.inf, -np.inf], np.nan)
    df['mochibun_ratio'] = df['mochibun_ratio'].fillna(1.0)

    # フラグ
    df['has_mochibun'] = (df['mochibun_ratio'] < 1.0).astype(int)

    # 実効面積（土地面積が存在する場合のみ）
    if 'land_area' in df.columns:
        df['mochibun_area'] = df['land_area'] * df['mochibun_ratio']
    else:
        df['mochibun_area'] = np.nan

    df['mochibun_area_log'] = np.log1p(df['mochibun_area'])

    return df

In [35]:
train_df_fe = make_mochibun_features(train_df_fe)
test_df_fe = make_mochibun_features(test_df_fe)

## 私道比率

In [36]:
def make_shidou_features(df):
    df = df.copy()

    # 私道負担面積（欠損＝0）
    df['shidou_area_eff'] = df['snapshot_land_shidou'].fillna(0)

    # 私道負担割合（分子 / 分母）
    df['land_shidou_ratio'] = (
        df['land_shidou_b'] / df['land_shidou_a']
    )
    df['land_shidou_ratio'] = df['land_shidou_ratio'].replace([np.inf, -np.inf], np.nan).fillna(0)

    # フラグ
    df['has_shidou'] = (df['shidou_area_eff'] > 0).astype(int)

    # optional: 土地面積比率
    if 'tochi_area' in df.columns:
        df['shidou_area_ratio'] = df['shidou_area_eff'] / df['tochi_area']
        df['shidou_area_ratio'] = df['shidou_area_ratio'].fillna(0)

    return df

In [37]:
train_df_fe = make_shidou_features(train_df_fe)
test_df_fe = make_shidou_features(test_df_fe)

## 住みやすさスコア

In [38]:
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge


# =========================================================
# ルールベースタグ重み（20選 + 必須欠落ペナルティ）
# =========================================================

TAG_RULES = {
    "bonus": {
        # --- 環境・立地系 ---
        "環境プレミアム_コンビニ 400ｍ以内": 2.0,
        "環境プレミアム_スーパー 800ｍ以内": 3.0,
        "環境プレミアム_総合病院 800ｍ以内": 2.0,
        "環境プレミアム_公園 400ｍ以内": 1.5,
        "環境プレミアム_子育てに嬉しい環境": 2.0,

        # --- 建物構造・共用部 ---
        "建物構造・性能_オートロック": 2.0,
        "建物構造・性能_宅配ボックス": 2.5,
        "建物構造・性能_防犯カメラ": 1.5,
        "建物構造・性能_エレベーター": 1.0,
        "建物構造・性能_管理人常駐": 1.5,
        "建物構造・性能_免震構造": 2.0,
        "建物構造・性能_耐震・制震・免震構造": 2.0,

        # --- 専有部設備 ---
        "専有部分設備_浴室・洗面_浴室乾燥機": 1.5,
        "専有部分設備_浴室・洗面_追焚機能": 1.5,
        "専有部分設備_浴室・洗面_洗面所独立": 1.0,
        "専有部分設備_キッチン_システムキッチン": 1.5,
        "専有部分設備_キッチン_食器洗い乾燥機": 1.5,
        "専有部分設備_収納_ウォークインクローゼット": 1.5,
        "専有部分設備_収納_全居室収納": 1.0,
        "専有部分設備_空調・暖房_床暖房": 2.0,
    },
    "penalty": {
        # --- 必須欠落（強い減点） ---
        "専有部分設備_トイレ_トイレなし": -10.0,
        "専有部分設備_浴室・洗面_バスなし": -10.0,
        "専有部分設備_トイレ_共同トイレ": -5.0,
        "専有部分設備_浴室・洗面_共同バス": -5.0,
        "建物設備（給排水・インフラ）_汲取": -3.0,
        "建物設備（給排水・インフラ）_浄化槽": -2.0,
    }
}


def compute_tag_rule_score(df: pd.DataFrame, rules: dict = TAG_RULES) -> pd.Series:
    """
    ルールベースタグスコア（加点 + 減点）を算出。
    存在しない列は無視するので、そのまま使える。
    """
    score = pd.Series(0.0, index=df.index)

    for col, w in rules.get("bonus", {}).items():
        if col in df.columns:
            score += df[col].fillna(0).astype(float) * float(w)

    for col, w in rules.get("penalty", {}).items():
        if col in df.columns:
            score += df[col].fillna(0).astype(float) * float(w)

    return score


# =========================================================
# ユーティリティ関数
# =========================================================

def _zscore(series: pd.Series) -> pd.Series:
    """平均0・分散1に正規化（std=0 のときは 0 を返す）"""
    s = series.astype(float)
    std = s.std()
    if std == 0 or np.isnan(std):
        return pd.Series(0.0, index=s.index)
    return (s - s.mean()) / std


def _safe_mean(df: pd.DataFrame, cols: list[str]) -> pd.Series:
    """存在する列だけで行方向平均を取る（全て無ければ0）"""
    cols_exist = [c for c in cols if c in df.columns]
    if not cols_exist:
        return pd.Series(0.0, index=df.index)
    return df[cols_exist].mean(axis=1).fillna(0.0)


def _add_tag_score(df: pd.DataFrame,
                   prefixes: list[str],
                   new_col: str) -> pd.DataFrame:
    """指定プレフィクスの One-hot 列の平均をスコア化して new_col に追加"""
    cols = [c for c in df.columns
            if any(c.startswith(p) for p in prefixes)]
    if not cols:
        df[new_col] = 0.0
    else:
        df[new_col] = df[cols].mean(axis=1).fillna(0.0)
    return df


# =========================================================
# ① 都市度スコアの作成
# =========================================================

def make_urban_score(
    train_df: pd.DataFrame,
    test_df: pd.DataFrame,
    urban_features: list[str] | None = None,
    impute_strategy: str = "median",
) -> tuple[pd.DataFrame, pd.DataFrame, dict]:

    if urban_features is None:
        candidate_cols = [
            "count_neighbors_1000m",
            "door_to_station_min_log",
            "tochi_area_log",
            "shikichi_area_log",
        ]
        urban_features = [c for c in candidate_cols if c in train_df.columns]

    train_df = train_df.copy()
    test_df = test_df.copy()

    if not urban_features:
        train_df["urban_score"] = 0.0
        test_df["urban_score"] = 0.0
        return train_df, test_df, {"scaler": None, "pca": None, "urban_features": []}

    combined = pd.concat(
        [train_df[urban_features], test_df[urban_features]],
        axis=0, ignore_index=True
    ).astype(float)

    combined = combined.replace([np.inf, -np.inf], np.nan)

    used_features = [c for c in combined.columns if combined[c].notna().any()]
    combined = combined[used_features]

    if not used_features:
        train_df["urban_score"] = 0.0
        test_df["urban_score"] = 0.0
        return train_df, test_df, {"scaler": None, "pca": None, "urban_features": []}

    if impute_strategy == "median":
        fill_values = combined.median(numeric_only=True)
    elif impute_strategy == "mean":
        fill_values = combined.mean(numeric_only=True)
    else:
        raise ValueError("impute_strategy must be 'median' or 'mean'")

    combined_imputed = combined.fillna(fill_values)

    scaler = StandardScaler()
    combined_scaled = scaler.fit_transform(combined_imputed)

    pca = PCA(n_components=1, random_state=42)
    urban_component = pca.fit_transform(combined_scaled).ravel()

    n_train = len(train_df)
    train_df["urban_score"] = urban_component[:n_train]
    test_df["urban_score"] = urban_component[n_train:]

    meta = {
        "scaler": scaler,
        "pca": pca,
        "urban_features": used_features,
        "impute_values": fill_values.to_dict(),
    }
    return train_df, test_df, meta


# =========================================================
# ② タグ＋数値から住みやすさサブスコアを作成（ルールタグを追加）
# =========================================================

def make_livability_subscores(
    train_df: pd.DataFrame,
    test_df: pd.DataFrame,
    use_rule_tags: bool = True,
    rule_weight_in_daily: float = 0.6,
    rule_weight_in_building: float = 0.8,
) -> tuple[pd.DataFrame, pd.DataFrame, dict]:

    train_df = train_df.copy()
    test_df = test_df.copy()

    # ---------- タグ系スコア（平均） ----------
    tag_groups = {
        "tag_land": ["土地価格_"],
        "tag_unit": ["専有部分設備_"],
        "tag_building": ["建物構造・性能_"],
        "tag_infra": ["建物設備（給排水・インフラ）_"],
        "tag_env": ["環境プレミアム_"],
        "tag_cert": ["用途・投資セグメント_不動産の証明書・性能評価_"],
    }

    for name, prefixes in tag_groups.items():
        train_df = _add_tag_score(train_df, prefixes, f"{name}_score")
        test_df = _add_tag_score(test_df, prefixes, f"{name}_score")

    # ---------- ルールベースタグ（重み付き） ----------
    if use_rule_tags:
        train_df["tag_rule_score_raw"] = compute_tag_rule_score(train_df)
        test_df["tag_rule_score_raw"] = compute_tag_rule_score(test_df)

        # train 기준で z-score し、testにも同じスケールで適用（安定）
        mu = train_df["tag_rule_score_raw"].mean()
        sd = train_df["tag_rule_score_raw"].std()
        if sd == 0 or np.isnan(sd):
            train_df["tag_rule_score"] = 0.0
            test_df["tag_rule_score"] = 0.0
        else:
            train_df["tag_rule_score"] = (train_df["tag_rule_score_raw"] - mu) / sd
            test_df["tag_rule_score"] = (test_df["tag_rule_score_raw"] - mu) / sd
    else:
        train_df["tag_rule_score"] = 0.0
        test_df["tag_rule_score"] = 0.0

    # ---------- 数値系（駅近 / 密度 / 面積） ----------
    for df in (train_df, test_df):
        if "door_to_station_min_log" in df.columns:
            df["access_station_score"] = _zscore(-df["door_to_station_min_log"].astype(float))
        else:
            df["access_station_score"] = 0.0

        if "count_neighbors_1000m" in df.columns:
            df["neighbor_density_score"] = _zscore(df["count_neighbors_1000m"].astype(float))
        else:
            df["neighbor_density_score"] = 0.0

        numeric_area_cols = [
            c for c in df.columns
            if any(k in c for k in ["senyu_area_log", "area_per_room", "nobeyuka_area_log"])
        ]
        df["room_space_score"] = _safe_mean(df, numeric_area_cols)
        if df["room_space_score"].std() > 0:
            df["room_space_score"] = _zscore(df["room_space_score"])
        else:
            df["room_space_score"] = 0.0

    # ---------- 4つのサブスコア ----------
    for df in (train_df, test_df):
        df["score_access"] = _safe_mean(df, [
            "access_station_score",
            "neighbor_density_score",
            "tag_env_score",
        ])

        # 生活利便性（+ ルールタグの一部を足す）
        df["score_daily"] = _safe_mean(df, [
            "tag_env_score",
            "tag_land_score",
        ]) + rule_weight_in_daily * df["tag_rule_score"]

        # 専有部快適性
        df["score_room"] = _safe_mean(df, [
            "tag_unit_score",
            "room_space_score",
        ])

        # 建物性能（+ ルールタグを強めに足す）
        df["score_building"] = _safe_mean(df, [
            "tag_building_score",
            "tag_infra_score",
            "tag_cert_score",
        ]) + rule_weight_in_building * df["tag_rule_score"]

    meta = {
        "tag_groups": tag_groups,
        "subscores": ["score_access", "score_daily", "score_room", "score_building"],
        "use_rule_tags": use_rule_tags,
        "rule_weights": {
            "daily": rule_weight_in_daily,
            "building": rule_weight_in_building
        },
        "rule_tags_used": {
            "bonus": [k for k in TAG_RULES["bonus"].keys() if k in train_df.columns],
            "penalty": [k for k in TAG_RULES["penalty"].keys() if k in train_df.columns],
        }
    }
    return train_df, test_df, meta


# =========================================================
# ③ 都市度 × サブスコアの重みを回帰で自動学習
# =========================================================

def fit_livability_weight_model(
    train_df: pd.DataFrame,
    target_col: str = "money_room",
    alpha: float = 1.0,
) -> tuple[Ridge, list[str]]:

    required_cols = ["urban_score", "score_access", "score_daily", "score_room", "score_building"]
    for c in required_cols + [target_col]:
        if c not in train_df.columns:
            raise KeyError(f"必要な列が存在しません: {c}")

    df = train_df.dropna(subset=[target_col]).copy()
    y = df[target_col].astype(float)

    # 標準化ターゲット
    y_std = (y - y.mean()) / y.std()

    X_parts = []
    feature_names = []

    X_parts.append(df[required_cols].astype(float).values)
    feature_names.extend(required_cols)

    for col in ["score_access", "score_daily", "score_room", "score_building"]:
        inter_name = f"{col}_x_urban"
        X_parts.append((df[col] * df["urban_score"]).values.reshape(-1, 1))
        feature_names.append(inter_name)

    X = np.hstack(X_parts)

    model = Ridge(alpha=alpha, random_state=42)
    model.fit(X, y_std)

    return model, feature_names


# =========================================================
# ④ 最終住みやすさスコアの算出（0〜100）
# =========================================================

def apply_livability_score(
    df: pd.DataFrame,
    model: Ridge,
    feature_names: list[str],
    train_min: float,
    train_max: float,
) -> pd.Series:

    base_cols = ["urban_score", "score_access", "score_daily", "score_room", "score_building"]

    X_parts = [df[base_cols].astype(float).values]
    for col in ["score_access", "score_daily", "score_room", "score_building"]:
        X_parts.append((df[col] * df["urban_score"]).values.reshape(-1, 1))
    X = np.hstack(X_parts)

    liv_raw = model.predict(X)

    if train_max == train_min:
        return pd.Series(50.0, index=df.index)

    liv_scaled = (liv_raw - train_min) / (train_max - train_min)
    liv_scaled = liv_scaled.clip(0, 1) * 100.0
    return pd.Series(liv_scaled, index=df.index)


# =========================================================
# パイプライン一括実行
# =========================================================

def add_livability_features(
    train_df: pd.DataFrame,
    test_df: pd.DataFrame,
    target_col: str = "money_room",
    alpha: float = 1.0,
    use_rule_tags: bool = True,
    rule_weight_in_daily: float = 0.6,
    rule_weight_in_building: float = 0.8,
):
    """
    ① urban_score
    ② subscores（ルールタグを加算）
    ③ Ridgeで重み学習
    ④ livability_score（0〜100）
    """

    train_u, test_u, urban_meta = make_urban_score(train_df, test_df)

    train_s, test_s, subs_meta = make_livability_subscores(
        train_u, test_u,
        use_rule_tags=use_rule_tags,
        rule_weight_in_daily=rule_weight_in_daily,
        rule_weight_in_building=rule_weight_in_building,
    )

    model, feature_names = fit_livability_weight_model(
        train_s, target_col=target_col, alpha=alpha
    )

    # raw を一旦計算して min/max を取る（※スケールはこの raw で決める）
    raw_tmp = model.predict(np.hstack([
        train_s[["urban_score","score_access","score_daily","score_room","score_building"]].astype(float).values,
        (train_s["score_access"] * train_s["urban_score"]).values.reshape(-1,1),
        (train_s["score_daily"]  * train_s["urban_score"]).values.reshape(-1,1),
        (train_s["score_room"]   * train_s["urban_score"]).values.reshape(-1,1),
        (train_s["score_building"] * train_s["urban_score"]).values.reshape(-1,1),
    ]))
    train_min = float(np.nanmin(raw_tmp))
    train_max = float(np.nanmax(raw_tmp))

    train_s["livability_score"] = apply_livability_score(train_s, model, feature_names, train_min, train_max)
    test_s["livability_score"]  = apply_livability_score(test_s,  model, feature_names, train_min, train_max)

    meta = {
        "urban_meta": urban_meta,
        "subscores_meta": subs_meta,
        "model": model,
        "feature_names": feature_names,
        "train_min": train_min,
        "train_max": train_max,
    }
    return train_s, test_s, meta


In [39]:
train_df_fe, test_df_fe, liv_meta = add_livability_features(
    train_df_fe,
    test_df_fe,
    target_col=target_col,
    alpha=1.0,
    use_rule_tags=True,
    rule_weight_in_daily=0.6,
    rule_weight_in_building=0.8,
)

## 道路関連

In [41]:
for df in [train_df_fe, test_df_fe]:
    df['is_no_road'] = (df['land_road_cond'] == 10).astype('int8')
    df['road_len_density_x_no_road'] = (df['road_len_density'] * df['is_no_road']).astype('float32')
    df['road_narrow_ratio_gap_x_no_road'] = (df['road_narrow_ratio_gap'] * df['is_no_road']).astype('float32')

## 特徴量の追加・削除

In [42]:
geo_cols = [
    'distance_to_landpoint_m', 'log_land_price', 'log_weighted_land_price_3',
    'land_price_yoy_nearest', 'land_price_yoy_w3', 'land_price_dlog_nearest', 'land_price_dlog_w3',
    'PTN_2020_nn', 'PTN_2020_idw3', 'RTA_2025_nn', 'RTA_2025_idw3',
    'RTB_2025_nn', 'RTB_2025_idw3', 'RTC_2025_nn', 'RTC_2025_idw3',
    'RTD_2025_nn', 'RTD_2025_idw3', 'RTE_2025_nn', 'RTE_2025_idw3',
    'pop_trend_rate_nn', 'pop_trend_rate_idw3',
    # 'road_road_len_total', 'road_road_len_wide', 'road_road_len_narrow',
    # 'road_road_wide_ratio', 'road_road_narrow_ratio',
    # 'road_road_len_total_gap', 'road_road_narrow_ratio_gap'
    'road_len_density', 'road_len_density_gap'
]

In [43]:
fe_cols += [
    'Prefecture name_te', 'City/town/village name_te',
    'area_ratio', 'relative_floor',
    'unit_land_density', 'area_per_room',
    'land_building_ratio',
    'senyu_area_x_built_diff', 'area_per_room_x_built_diff',
    'building_senyu_area_median', 'building_room_floor_max', 'building_unit_count',
    'mean_price_300m_log', 'median_price_300m_log', 'std_price_300m', 'iqr_price_300m', 'count_neighbors_300m', 'mean_price_300m_house_log', 'mean_price_300m_mansion_log',
    'mean_price_500m_log', 'median_price_500m_log', 'std_price_500m', 'iqr_price_500m', 'count_neighbors_500m', 'mean_price_500m_house_log', 'mean_price_500m_mansion_log',
    'mean_price_1000m_log', 'median_price_1000m_log', 'std_price_1000m', 'iqr_price_1000m', 'count_neighbors_1000m', 'mean_price_1000m_house_log', 'mean_price_1000m_mansion_log',
    'mean_price_2000m_log', 'median_price_2000m_log', 'std_price_2000m', 'iqr_price_2000m', 'count_neighbors_2000m', 'mean_price_2000m_house_log', 'mean_price_2000m_mansion_log',
    'city_lat', 'city_lon',
    'access_zone', 'walk_distance_bin', 'door_to_station_min_log',
    'density_floor_area', 'unit_count_density', 'unit_area_range', 'empty_ratio',
    'effective_age', 'reform_total_count', 'reform_newness_log',
    'listing_months',
    'ratio_mean_land', 'ratio_weighted_land',
    'kyoueki_per_m2', 'shuuzen_per_m2', 'kyoueki_per_unit', 'shuuzen_per_unit', 'has_kyoueki', 'has_shuuzen',
    'land_cheap_flag', 'land_expensive_flag', 'land_theoretical_price', 'land_theoretical_price_weighted', 'land_theoretical_price_within1km_mp',
    'parking_available', 'parking_on_site', 'parking_nearby_only', 'parking_distance_log', 'parking_money_std_clip', 'parking_cost_ratio_clip_log',  'parking_cost_per_sqm_clip',
    'log_min_amenity_distance', 'convenience_distance_log', 'super_distance_log', 'drugstore_distance_log', 'amenity_within_500m', 'amenity_within_1000m',
    'genkyo_flex_score', 'usable_status_score', 'usable_months_delay',
    'has_management_association', 'management_form_score', 'manager_presence_score', 'management_total_score',
    'mochibun_ratio', 'mochibun_area_log', 
    'land_shidou_ratio', 'has_shidou', 'shidou_area_ratio',
    'livability_score', 'urban_score', 'score_access', 'score_room', 'score_building',
    'is_no_road', 'road_len_density_x_no_road', 'road_narrow_ratio_gap_x_no_road' 
] + geo_cols + [c for c in train_df_fe.columns if 'pca' in c.lower()] + [c + '_te' for c in traffic_te_cols]

# 削除する特徴量
remove_cols = [
    'building_name', 'homes_building_name', 
    'lon', 'lat',
    'reform_date', 'reform_interior_date', 'reform_exterior_date', 'reform_wet_area_date', 'renovation_date',
    # 'rosen_name1', 'rosen_name2', 'eki_name1', 'eki_name2', 
    'walk_distance1', 'walk_distance2',
    'bus_stop1', 'bus_stop2', 'bus_time1', 'bus_time2', 'traffic_other',
    'building_tag_id', 'unit_tag_id', 'statuses',
    'building_create_date', 
    'unit_area_max', 'unit_area_min',
    'parking_money', 'parking_distance',
    'management_form', 'management_association_flg',
    'genkyo_code', 'usable_status', 'usable_date', 
    'convenience_distance', 'super_distance', 'drugstore_distance', 'school_ele_distance', 'school_jun_distance',
    # 'Prefecture name', 'City/town/village name',
    'senyu_area', 'nobeyuka_area', 'kukaku_area', 'tochi_area', 'shikichi_area', 'unit_count',
]
fe_cols = [c for c in fe_cols if c not in remove_cols]

## 出力

In [44]:
train_df_fe[fe_cols + [target_col]].to_parquet(f'{intermediate_path}train_df_fe_v{fe_ver}.parquet')
test_df_fe[fe_cols].to_parquet(f'{intermediate_path}test_df_fe_v{fe_ver}.parquet')