# 社區經濟資料服務平台 - 行政區房屋稅籍住宅類數量依屋齡區分_鄉鎮市區 資料下載

In [None]:
import re
import os
import time
import zipfile
import shutil
import glob
from collections import Counter

import pandas as pd
import requests
from bs4 import BeautifulSoup
import urllib.parse
from urllib.parse import unquote
from tqdm import tqdm

In [None]:
# 1. 先 GET 介面，拿到 cookie + request-token
session = requests.Session()
session.headers.update({
    "User-Agent":  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"})
r1 = session.get("https://segis.moi.gov.tw/STATCloud/QueryInterface")
soup = BeautifulSoup(r1.text, "html.parser")
token = soup.find("gs-request-token")["value"]

In [None]:
# 2. 準備 Form Data（把 DevTools Copy 的 Raw 字串貼到這裡）
raw = ("draw=2&columns%5B0%5D%5Bdata%5D=CNAME&columns%5B0%5D%5Bname%5D=CNAME&columns%5B0%5D%5Bsearchable%5D=true&"
"columns%5B0%5D%5Borderable%5D=false&columns%5B0%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B0%5D%5Bsearch%5D%5Bregex%5D=false&"
"columns%5B1%5D%5Bdata%5D=BOUNDARY_DESC&columns%5B1%5D%5Bname%5D=BOUNDARY_DESC&columns%5B1%5D%5Bsearchable%5D=true&"
"columns%5B1%5D%5Borderable%5D=false&columns%5B1%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B1%5D%5Bsearch%5D%5Bregex%5D=false&"
"columns%5B2%5D%5Bdata%5D=STUNIT_NAME&columns%5B2%5D%5Bname%5D=STUNIT_NAME&columns%5B2%5D%5Bsearchable%5D=true&"
"columns%5B2%5D%5Borderable%5D=false&columns%5B2%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B2%5D%5Bsearch%5D%5Bregex%5D=false&"
"columns%5B3%5D%5Bdata%5D=PRODUCT_COUNT&columns%5B3%5D%5Bname%5D=PRODUCT_COUNT&columns%5B3%5D%5Bsearchable%5D=true&"
"columns%5B3%5D%5Borderable%5D=false&columns%5B3%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B3%5D%5Bsearch%5D%5Bregex%5D=false&"
"columns%5B4%5D%5Bdata%5D=PRODUCT_COUNT&columns%5B4%5D%5Bname%5D=PRODUCT_COUNT&columns%5B4%5D%5Bsearchable%5D=true&"
"columns%5B4%5D%5Borderable%5D=false&columns%5B4%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B4%5D%5Bsearch%5D%5Bregex%5D=false&"
"columns%5B5%5D%5Bdata%5D=PRODUCT_COUNT&columns%5B5%5D%5Bname%5D=PRODUCT_COUNT&columns%5B5%5D%5Bsearchable%5D=true&"
"columns%5B5%5D%5Borderable%5D=false&columns%5B5%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B5%5D%5Bsearch%5D%5Bregex%5D=false&start=0&length=176&"
"search%5Bvalue%5D=&search%5Bregex%5D=false&method=dataset.GetDataSetList&division=&section=0&stunit=&boundary=&yearstart=&yearend=&opentype=&"
"keyword=%E8%A1%8C%E6%94%BF%E5%8D%80%E5%BE%85%E5%94%AE%E6%96%B0%E6%88%90%E5%B1%8B%E7%B5%B1%E8%A8%88_%E9%84%89%E9%8E%AE%E5%B8%82%E5%8D%80_")
payload = dict(urllib.parse.parse_qsl(raw, encoding="utf-8"))

# 3. 把 token 加到 headers（或 payload）（瀏覽器是放在 header）
session.headers.update({
    "Accept": "application/json, text/javascript, */*; q=0.01",
    "Origin": "https://segis.moi.gov.tw",
    "Referer": "https://segis.moi.gov.tw/STATCloud/QueryInterface",
    "X-Requested-With": "XMLHttpRequest",
    "Gs-Request-Token": token,
    # 如果你看到 Csrf-Token 也有值，就同理加上去
    # "Gs-Csrf-Token": "<從 DevTools 拷貝的值>",
})

# 4. 發 POST
api_url = "https://segis.moi.gov.tw/STATCloud/reqcontroller.go"
resp = session.post(api_url, data=payload)

print(resp.status_code)
print(resp.headers.get("Content-Type"))
print(resp.text[:500])
# 如果 status == 200 且是 JSON，就能：
data = resp.json()


In [None]:
base = "https://segis.moi.gov.tw/STATCloud/reqcontroller.file"
output_dir = r"C:\pylabs\area-risk-flagging\data\segis\district_new_house_for_sale\raw_data"
os.makedirs(output_dir, exist_ok=True)

extracted = [
    {
        "code": item["DEC_MCOL"],
        "STTIME": item["STTIME"],
        "SUB_BOUNDARY": item["SUB_BOUNDARY_DESC"],
    }
    for item in data["data"]
]
print(extracted)

In [None]:
for item in tqdm(extracted, desc="Downloading & Extracting"):
    params = {
        "method":       "filedown.downloadproductfile",
        "code":         item["code"],
        "STTIME":       item["STTIME"],
        "STUNIT":       "U01TO",
        "BOUNDARY":     "全國",
        "SUB_BOUNDARY": item["SUB_BOUNDARY"],
    }

    resp = session.get(base, params=params, stream=True)
    if resp.status_code != 200:
        print(f"下載失敗：{item['STTIME']} ({resp.status_code})")
        continue

    # 檔名：STTIME_SUB_BOUNDARY.zip
    safe_boundary = re.sub(r'[<>:"/\\|?*\n\r]', "_", item["SUB_BOUNDARY"])
    filename = f"{item['STTIME']}_{safe_boundary}.zip"
    filepath = os.path.join(output_dir, filename)

    # 寫入 ZIP 檔
    with open(filepath, "wb") as f:
        for chunk in resp.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

    # 如果是合法的 ZIP，則解壓到同名資料夾
    if zipfile.is_zipfile(filepath):
        extract_dir = os.path.join(output_dir, f"{item['STTIME']}_{safe_boundary}")
        os.makedirs(extract_dir, exist_ok=True)
        with zipfile.ZipFile(filepath, 'r') as zf:
            zf.extractall(extract_dir)
        # 刪除已解壓的 ZIP 檔
        os.remove(filepath)
    else:
        print(f"{filename} 不是合法的 ZIP 檔，跳過解壓")

    # 每下載並解壓完一個檔案後暫停 5 秒
    time.sleep(5)

print("所有檔案下載並解壓完成！")

In [None]:
raw_root = r"C:\pylabs\area-risk-flagging\data\segis\district_new_house_for_sale"
source_dir = os.path.join(raw_root, "source")
os.makedirs(source_dir, exist_ok=True)

for dirpath, dirnames, filenames in os.walk(raw_root):
    # 跳過 processing 本身
    if os.path.abspath(dirpath) == os.path.abspath(source_dir):
        continue

    for fn in filenames:
        if fn.lower().endswith(".csv"):
            src = os.path.join(dirpath, fn)
            # 如果不同子資料夾裡有相同檔名，你可以加上子資料夾前綴避免覆寫：
            # sub = os.path.basename(dirpath)
            # dst_name = f"{sub}_{fn}"
            # 或直接用原檔名：
            dst_name = fn
            dst = os.path.join(source_dir, dst_name)
            shutil.copy2(src, dst)

print(f"已將所有 CSV 複製到：{source_dir}")

In [None]:

# 1. 設定你的 source 資料夾路徑
source_dir = r"C:\pylabs\area-risk-flagging\data\segis\district_new_house_for_sale\source"

# 2. 列出所有 CSV 檔名
filenames = [fn for fn in os.listdir(source_dir) if fn.lower().endswith(".csv")]

# 3. 準備 Counter 物件
time_counter = Counter()
county_counter = Counter()

# 4. 定義兩個正規表達式
#    - 時段：檔名前面開頭的「數字+年第+數字+季」
#    - 縣市：最後一個「_XXX縣(或市).csv」
re_time   = re.compile(r"^(\d+年第\d+季)")
re_county = re.compile(r"_([^_]+[縣市])\.csv$")

# 5. 逐一處理檔名
for fn in filenames:
    # 時段
    m_t = re_time.match(fn)
    if m_t:
        time_counter[m_t.group(1)] += 1
    else:
        time_counter["(無法解析)"] += 1

    # 縣市
    m_c = re_county.search(fn)
    if m_c:
        county_counter[m_c.group(1)] += 1
    else:
        county_counter["(無法解析)"] += 1

# 6. 輸出結果
print("=== 時段 出現次數 ===")
for period, cnt in time_counter.most_common():
    print(f"{period}: {cnt} 次")

print("\n=== 縣市 出現次數 ===")
for county, cnt in county_counter.most_common():
    print(f"{county}: {cnt} 次")


In [None]:

# 合併csv檔案
source_dir = r"C:\pylabs\area-risk-flagging\data\segis\district_new_house_for_sale\source"

raw_root = r"C:\pylabs\area-risk-flagging\data\segis\district_new_house_for_sale"
processed_dir = os.path.join(raw_root, "processed")
os.makedirs(processed_dir, exist_ok=True)
output_fname = "房屋稅籍住宅類數量依屋齡區分_鄉鎮市區.csv"
output_path = os.path.join(processed_dir, output_fname)

# 1. 列出所有 CSV，並排除掉最終輸出檔
all_files = sorted(glob.glob(os.path.join(source_dir, "*.csv")))
csv_files = [f for f in all_files if os.path.basename(f) != output_fname]

if not csv_files:
    raise RuntimeError("processing 資料夾裡找不到任何原始 CSV 檔！")

print(f"[INFO] 一共找到 {len(csv_files)} 個要合併的 CSV：")
for f in csv_files:
    print("  ", os.path.basename(f))

# 2. 讀第一個檔案，header=1 代表用第二列作欄位名稱，並自動跳過第一列
first_fp = csv_files[0]
df0 = pd.read_csv(
    first_fp,
    encoding="utf-8-sig",
    header=1  # 用第二列(中文說明列)當欄位名稱，資料從第三列開始
)
columns = df0.columns.tolist()
df_list = [df0]

# 3. 其餘檔案：跳過前兩列，只讀真正的資料列，並套用同一組欄位名稱
for fp in csv_files[1:]:
    df = pd.read_csv(
        fp,
        encoding="utf-8-sig",
        header=None,
        skiprows=2,   # 跳過第一列與第二列
        names=columns
    )
    df_list.append(df)

# 4. 合併並輸出
combined = pd.concat(df_list, ignore_index=True)
combined.to_csv(output_path, index=False, encoding="utf-8-sig")

print(f"[DONE] 合併完成，檔案位置：{output_path}")
