## 預售屋市場檢視

In [None]:
import os
import re
from typing import Generator, List, Any, Literal
import pandas as pd
from fuzzywuzzy import fuzz, process
from tqdm import tqdm
from IPython.display import display
import warnings
warnings.filterwarnings('ignore')

import sys
from pathlib import Path
project_root = Path.cwd().parent  # 找出根目錄：Path.cwd()找出現在所在目錄(/run).parent(上一層是notebook).parent(再上層一層business_district_discovery)
print(project_root)
sys.path.append(str(project_root))

In [None]:
from utils.helper_function import csv_extractor  # 取得及載入資料

### 讀取資料

In [None]:
# community_df path
comm_dir = r"C:\pylabs\housing-market-insights\data\lvr_moi\ps_community\processed"
comm_fn = "ps_community_processed.csv"
comm_path = os.path.join(comm_dir,  comm_fn)

# transaction_df path
transaction_dir = r"C:\pylabs\housing-market-insights\data\lvr_moi\ps_transaction\processed"
transaction_fn = "ps_transaction_processed.csv"
transaction_path = os.path.join(transaction_dir,  transaction_fn)

In [None]:
# 載入預售屋社區csv
comm_extracted =csv_extractor(comm_path)
print(f" 預售社區逐筆交易資料載入成功:{comm_extracted.shape[0]}筆，合計{comm_extracted.shape[1]}欄位 \n 記憶體使用: {comm_extracted.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# 載入預售交易csv
transaction_extracted =csv_extractor(transaction_path)
print(f" 預售交易逐筆交易資料載入成功: {transaction_extracted.shape[0]}筆，合探{transaction_extracted.shape[1]}欄位 \n 記憶體使用: {transaction_extracted.memory_usage(deep=True).sum() / 1024**2:.2f} MB")


In [None]:
community_df = comm_extracted.copy()
transaction_df = transaction_extracted.copy()

In [None]:
display(community_df.columns)
display(transaction_df.columns)

In [None]:
# === 2. 建立複合鍵 ===
print("\n 建立複合鍵...")

def create_composite_key(df, city_col='縣市', district_col='行政區', community_col='社區名稱'):
    """建立複合索引鍵"""
    df = df.copy()
    df['複合鍵'] = (df[city_col].fillna('').astype(str) + '_' + 
                    df[district_col].fillna('').astype(str) + '_' + 
                    df[community_col].fillna('').astype(str))
    return df


community_with_key = create_composite_key(community_df)
transaction_with_key = create_composite_key(transaction_df)

# 取得唯一複合鍵
community_keys = community_with_key['複合鍵'].unique()
transaction_keys = transaction_with_key['複合鍵'].unique()

print(f"唯一社區複合鍵: {len(community_keys):,} 個")
print(f"唯一交易複合鍵: {len(transaction_keys):,} 個")

In [None]:
# === 3. 建立社區資料查找表 ===
print("\n 建立社區資料查找表...")

# 移除重複的社區資料，保留第一筆
community_unique = community_with_key.drop_duplicates(subset=['複合鍵'])
community_lookup = dict(zip(
    community_unique['複合鍵'],
    community_unique[['戶數', '建設公司']].to_dict('records')
))

print(f" 建立 {len(community_lookup):,} 個社區查找記錄")

In [None]:
# === 4. 多階段比對 ===
print("\n開始多階段比對...")

def normalize_key(key):
    """標準化複合鍵"""
    if pd.isna(key) or key == '':
        return key
    normalized = str(key).lower().strip()
    # 移除特殊符號但保留底線分隔符
    normalized = re.sub(r'[^\w\u4e00-\u9fff_]', '', normalized)
    return normalized

def extract_community_name(composite_key):
    """提取社區名稱"""
    if pd.isna(composite_key):
        return ''
    parts = str(composite_key).split('_')
    return parts[2] if len(parts) >= 3 else ''

# 初始化比對結果
final_mapping = {}
stage_stats = {
    'stage1_exact': 0,
    'stage2_normalized': 0,
    'stage3_community': 0
}

unmatched_keys = list(transaction_keys)


In [None]:
    # 第1階段：完全匹配
    print("   🎯 第1階段：完全匹配")
    matched_in_stage1 = []
    for key in unmatched_keys:
        if key in community_lookup:
            final_mapping[key] = key
            stage_stats['stage1_exact'] += 1
            matched_in_stage1.append(key)
    
    unmatched_keys = [k for k in unmatched_keys if k not in matched_in_stage1]
    print(f"      ✅ 匹配: {stage_stats['stage1_exact']:,} 個")
    

In [None]:
# 第2階段：標準化匹配
print("   🔧 第2階段：標準化匹配")
community_normalized_map = {}
for key in community_keys:
    normalized = normalize_key(key)
    if normalized not in community_normalized_map:
        community_normalized_map[normalized] = key

matched_in_stage2 = []
for key in unmatched_keys:
    normalized = normalize_key(key)
    if normalized in community_normalized_map:
        matched_key = community_normalized_map[normalized]
        final_mapping[key] = matched_key
        stage_stats['stage2_normalized'] += 1
        matched_in_stage2.append(key)

unmatched_keys = [k for k in unmatched_keys if k not in matched_in_stage2]
print(f"      ✅ 匹配: {stage_stats['stage2_normalized']:,} 個")

In [None]:
# 第3階段：社區名稱單獨比對
print("   🏘️ 第3階段：社區名稱匹配")
community_name_map = {}
for key in community_keys:
    community_name = extract_community_name(key)
    if community_name and community_name.strip():
        if community_name not in community_name_map:
            community_name_map[community_name] = []
        community_name_map[community_name].append(key)

for key in unmatched_keys:
    community_name = extract_community_name(key)
    if community_name and community_name in community_name_map:
        # 選擇第一個匹配的候選
        matched_key = community_name_map[community_name][0]
        final_mapping[key] = matched_key
        stage_stats['stage3_community'] += 1

print(f"      ✅ 匹配: {stage_stats['stage3_community']:,} 個")

In [None]:
# === 5. 合併資料 ===
print("\n🔗 合併資料...")

def get_community_info(composite_key):
    """根據複合鍵取得社區資訊"""
    matched_key = final_mapping.get(composite_key)
    if matched_key and matched_key in community_lookup:
        return community_lookup[matched_key]
    return {'戶數': None, '建設公司': None}

# 應用社區資訊到交易資料
community_info_list = transaction_with_key['複合鍵'].apply(get_community_info).tolist()

transaction_with_key['戶數'] = [info['戶數'] for info in community_info_list]
transaction_with_key['建設公司'] = [info['建設公司'] for info in community_info_list]

transaction_with_key
# 移除臨時複合鍵欄位
# merged_df = transaction_with_key.drop('複合鍵', axis=1)

In [None]:
transaction_with_key

In [None]:
print("\n📈 統計結果...")

total_matched = sum(stage_stats.values())
total_unique_keys = len(transaction_keys)
match_rate = (total_matched / total_unique_keys * 100) if total_unique_keys > 0 else 0

In [None]:
match_rate