In [7]:
import os
import glob
import polars as pl
import pyarrow.csv as pv
import pyarrow.parquet as pq
import pyarrow.compute as pc
import pyarrow as pa

# --- 設定路徑 ---
# 假設你的102個CSV檔都放在這個資料夾
CSV_FOLDER = '../data' 
# 我們要將轉換後的Parquet檔存在這裡
PARQUET_OUTPUT_FOLDER = '../data/metro_data.parquet' 

# 確保輸出資料夾存在
os.makedirs(PARQUET_OUTPUT_FOLDER, exist_ok=True)

# --- 取得所有CSV檔案的路徑 ---
csv_files = glob.glob(f"{CSV_FOLDER}/臺北捷運*.csv")
print(f"找到 {len(csv_files)} 個CSV檔案。")

找到 102 個CSV檔案。


In [8]:
# --- 開始轉換 ---
for i, file_path in enumerate(csv_files):
    print(f"處理中 ({i+1}/{len(csv_files)}): {os.path.basename(file_path)}")
    try:
        # 1) 讀 CSV：使用 schema_overrides（新版），先都當成字串或明確型別
        df_pl = pl.read_csv(
            file_path,
            schema_overrides={
                "日期": pl.Utf8,     # 先以字串讀入，稍後轉型
                "時段": pl.Utf8,     # 確保是字串
                "進站": pl.Utf8,
                "出站": pl.Utf8,
                "人次": pl.Int64
            },
            null_values=[""]        # 空字串視為 null
        )

        # 2) 清理與轉型：補人次的空值、將日期轉成 pl.Date（嘗試多種格式）
        df_pl = df_pl.with_columns(
            pl.col("人次").fill_null(0),
            pl.coalesce([
                pl.col("日期").str.strptime(pl.Date, format="%Y-%m-%d", strict=False),
                pl.col("日期").str.strptime(pl.Date, format="%Y/%m/%d", strict=False),
                pl.col("日期").str.strptime(pl.Date, format="%Y%m%d",   strict=False),
            ]).alias("日期")
        )

        # （可選）時段左側補零，統一成兩位數字串
        df_pl = df_pl.with_columns(
            pl.col("時段").str.zfill(2)
        )

        # 3) 轉成 PyArrow Table
        table = df_pl.to_arrow()  # 日期會是 date32 或 timestamp，兩者 pc.year 都支援

        # 4) year / month：先算，再 cast 到你要的寬度；用 append_column（不用 Field）
        years  = pc.cast(pc.year(table["日期"]),  pa.int16())
        months = pc.cast(pc.month(table["日期"]), pa.int8())

        table = table.append_column("year",  years)
        table = table.append_column("month", months)

        # 5) 寫入分區 Parquet 資料集
        pq.write_to_dataset(
            table,
            root_path=PARQUET_OUTPUT_FOLDER,
            partition_cols=["year", "month"],
            existing_data_behavior="overwrite_or_ignore"
        )

    except Exception as e:
        print(f"處理檔案 {os.path.basename(file_path)} 時發生致命錯誤: {e}")
        break

print("\n轉換流程完成！")
print(f"資料已儲存至: {PARQUET_OUTPUT_FOLDER}")

處理中 (1/102): 臺北捷運每日分時各站OD流量統計資料_201701.csv
處理中 (2/102): 臺北捷運每日分時各站OD流量統計資料_201702.csv
處理中 (3/102): 臺北捷運每日分時各站OD流量統計資料_201703.csv
處理中 (4/102): 臺北捷運每日分時各站OD流量統計資料_201704.csv
處理中 (5/102): 臺北捷運每日分時各站OD流量統計資料_201705.csv
處理中 (6/102): 臺北捷運每日分時各站OD流量統計資料_201706.csv
處理中 (7/102): 臺北捷運每日分時各站OD流量統計資料_201707.csv
處理中 (8/102): 臺北捷運每日分時各站OD流量統計資料_201708.csv
處理中 (9/102): 臺北捷運每日分時各站OD流量統計資料_201709.csv
處理中 (10/102): 臺北捷運每日分時各站OD流量統計資料_201710.csv
處理中 (11/102): 臺北捷運每日分時各站OD流量統計資料_201711.csv
處理中 (12/102): 臺北捷運每日分時各站OD流量統計資料_201712.csv
處理中 (13/102): 臺北捷運每日分時各站OD流量統計資料_201801.csv
處理中 (14/102): 臺北捷運每日分時各站OD流量統計資料_201802.csv
處理中 (15/102): 臺北捷運每日分時各站OD流量統計資料_201803.csv
處理中 (16/102): 臺北捷運每日分時各站OD流量統計資料_201804.csv
處理中 (17/102): 臺北捷運每日分時各站OD流量統計資料_201805.csv
處理中 (18/102): 臺北捷運每日分時各站OD流量統計資料_201806.csv
處理中 (19/102): 臺北捷運每日分時各站OD流量統計資料_201807.csv
處理中 (20/102): 臺北捷運每日分時各站OD流量統計資料_201808.csv
處理中 (21/102): 臺北捷運每日分時各站OD流量統計資料_201809.csv
處理中 (22/102): 臺北捷運每日分時各站OD流量統計資料_201810.csv
處理中 (23/102): 臺北捷運每日分時各站OD流量統計資料_201811.c

In [10]:
import duckdb
import pandas as pd

PARQUET_PATH = r"../data/metro_data.parquet"  # ← 改成你的 Parquet 根目錄

con = duckdb.connect()

# 1) 先把「日期+時段」合成 time，並把人次彙總為每站×每小時的進/出
#    注意：用 strptime()，且對「時段」左側補零成兩位數
sql = f"""
WITH base AS (
  SELECT
    CAST(日期 AS DATE) AS 日期,
    lpad(CAST(時段 AS VARCHAR), 2, '0') AS 時段,
    進站,
    出站,
    人次
  FROM parquet_scan('{PARQUET_PATH}/**/*.parquet')
),
io AS (
  -- 進站：以「進站」欄位當車站
  SELECT
    strptime(CAST(日期 AS VARCHAR) || ' ' || 時段 || ':00:00', '%Y-%m-%d %H:%M:%S') AS time,
    進站 AS station,
    SUM(人次) AS entries,
    0::BIGINT AS exits
  FROM base
  GROUP BY 1,2

  UNION ALL

  -- 出站：以「出站」欄位當車站
  SELECT
    strptime(CAST(日期 AS VARCHAR) || ' ' || 時段 || ':00:00', '%Y-%m-%d %H:%M:%S') AS time,
    出站 AS station,
    0::BIGINT AS entries,
    SUM(人次) AS exits
  FROM base
  GROUP BY 1,2
),
agg AS (
  SELECT
    time,
    station,
    SUM(entries) AS total_entries,
    SUM(exits)   AS total_exits
  FROM io
  GROUP BY 1,2
)

-- 這裡不 pivot，先拉回 Python 做寬表
SELECT * FROM agg
ORDER BY time, station
"""

agg_df = con.execute(sql).df()

# 2) 進站寬表：row=time, columns=station, values=total_entries
entries_df = (
    agg_df.loc[:, ["time", "station", "total_entries"]]
          .pivot(index="time", columns="station", values="total_entries")
          .sort_index()
    .fillna(0)
)

# 3) 出站寬表：row=time, columns=station, values=total_exits
exits_df = (
    agg_df.loc[:, ["time", "station", "total_exits"]]
          .pivot(index="time", columns="station", values="total_exits")
          .sort_index()
    .fillna(0)
)

# 4) 依需求轉型（可選）：整數更直觀；若有 NA 已被填 0
entries_df = entries_df.astype("int64")
exits_df   = exits_df.astype("int64")

# 5) 輸出 CSV（含索引當作 time 欄）
entries_out = "../data/entries_by_station_hour.csv"
exits_out   = "../data/exits_by_station_hour.csv"
entries_df.to_csv(entries_out, encoding="utf-8-sig", index=True, index_label="time")
exits_df.to_csv(exits_out, encoding="utf-8-sig", index=True, index_label="time")

print("Saved:", entries_out, exits_out)

Saved: ../data/entries_by_station_hour.csv ../data/exits_by_station_hour.csv
