# User data

In [1]:
import pandas as pd

user = pd.read_csv("../../cleaned_data/user_cleaned.csv")

pd.set_option("display.max_columns", None)
user.head()

Unnamed: 0,experiment_date,user_id,treatment,source,ops_type_merged,city
0,2025-12-22,5145040,15x2元1張,隨機組,14天在其他尖峰預估車資,臺北市
1,2025-11-17,302812,15x2元1張,隨機組,14天在其他尖峰預估車資,新北市
2,2025-12-01,4375821,15x2元1張,隨機組,14天在其他尖峰預估車資,新北市
3,2025-11-24,2273154,15x2元1張,隨機組,14天在其他尖峰預估車資,臺北市
4,2025-12-22,433188,15x2元1張,隨機組,14天在其他尖峰預估車資,臺北市


# Trip data
## RDS trip (nonrepeat_cnt)

In [3]:
import pandas as pd
import numpy as np

# -----------------------
# Config (改這裡)
# -----------------------
USER_PATH = "../../cleaned_data/user_cleaned.csv"
TRIP_PATH = "../../merged_data/RDS_trip_merged.csv"
TEST_TRIP_PATH = "../../data/test_trip.csv"
OUT_PATH = "../data/cleaned_v1.csv"

START_DATE = pd.Timestamp("2025-07-28")
END_DATE   = pd.Timestamp("2026-01-11")  # 含 1/11
CHUNKSIZE = 2_000_000  # 依你機器記憶體調整：50萬~300萬都常見

# -----------------------
# Helper
# -----------------------
def week_monday(d: pd.Series) -> pd.Series:
    """把日期對齊到週一作為 experiment_date（週起始=週一）"""
    return d - pd.to_timedelta(d.dt.weekday, unit="D")

# -----------------------
# 1) Load user_cleaned (週 -> 類別)
# -----------------------
user = pd.read_csv(
    USER_PATH,
    usecols=["experiment_date", "user_id", "treatment", "source", "ops_type_merged", "city"],
    dtype={"user_id": "int64"},
)
user["experiment_date"] = pd.to_datetime(user["experiment_date"])
# 建索引方便 merge（其實 merge 已夠用，但保持乾淨）
user = user.drop_duplicates(["experiment_date", "user_id"])

# -----------------------
# 2) Load test_trip ids (排除清單)
# -----------------------
test_ids = pd.read_csv(TEST_TRIP_PATH, usecols=["trip_id"])
# 用 Index 省記憶體 + isin 也快
test_id_index = pd.Index(test_ids["trip_id"].astype("int64", errors="ignore"))

# -----------------------
# 3) Chunk read RDS_trip and aggregate
# -----------------------
agg_parts = []  # 也可以改成 dict 累加；先用 concat 再 groupby 方式較直覺

usecols = ["trip_id", "user_id", "year", "month", "day", "duplicate_id"]
dtypes = {
    "trip_id": "int64",
    "user_id": "int64",
    "year": "int16",
    "month": "int8",
    "day": "int8",
    "duplicate_id": "int8",
}

# END_DATE 要含當天，所以用 < END_DATE+1
end_exclusive = END_DATE + pd.Timedelta(days=1)

for i, chunk in enumerate(pd.read_csv(TRIP_PATH, usecols=usecols, dtype=dtypes, chunksize=CHUNKSIZE)):
    # 組出日期（用 year/month/day 避免解析 request_time）
    chunk["day_date"] = pd.to_datetime(
        dict(year=chunk["year"], month=chunk["month"], day=chunk["day"]),
        errors="coerce"
    )
    # 篩日期區間
    chunk = chunk[(chunk["day_date"] >= START_DATE) & (chunk["day_date"] < end_exclusive)]
    if chunk.empty:
        continue

    # 條件：duplicate_id = 0
    chunk = chunk[chunk["duplicate_id"].fillna(1).astype("int64") == 0]
    if chunk.empty:
        continue

    # 條件：trip_id 不在 test_trip
    # （注意：如果 trip_id 有缺失或型別怪，先 coerce）
    chunk = chunk[~chunk["trip_id"].isin(test_id_index)]
    if chunk.empty:
        continue

    # 轉成週（週一）
    chunk["experiment_date"] = week_monday(chunk["day_date"])

    # merge 出類別
    merged = chunk.merge(
        user,
        on=["experiment_date", "user_id"],
        how="left",
        validate="many_to_one",  # 一個 user 在一個 experiment_date 應該只有一筆類別
    )

    # groupby 計數 trip_id（每日每組 nonrepeat_cnt）
    g = (
        merged.groupby(["day_date", "treatment", "source", "ops_type_merged", "city"], dropna=False)["trip_id"]
        .nunique()   # trip_id 通常唯一；nunique 更保險
        .reset_index(name="nonrepeat_cnt")
    )
    agg_parts.append(g)

    print(f"[chunk {i}] kept_rows={len(merged):,} groups={len(g):,}")

# -----------------------
# 4) Final aggregate (合併各 chunk 結果)
# -----------------------
if len(agg_parts) == 0:
    out = pd.DataFrame(columns=["day", "treatment", "source", "ops_type_merged", "city", "nonrepeat_cnt"])
else:
    out = pd.concat(agg_parts, ignore_index=True)
    out = (
        out.groupby(["day_date", "treatment", "source", "ops_type_merged", "city"], dropna=False)["nonrepeat_cnt"]
        .sum()
        .reset_index()
        .rename(columns={"day_date": "day"})
        .sort_values(["day", "treatment", "source", "ops_type_merged", "city"])
    )

out.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")
print("Saved:", OUT_PATH, "rows:", len(out))


[chunk 0] kept_rows=981,933 groups=2,972
[chunk 1] kept_rows=1,684,483 groups=16,252
[chunk 2] kept_rows=1,651,557 groups=29,406
[chunk 3] kept_rows=210,530 groups=8,091
Saved: ../data/cleaned_v1.csv rows: 44969


## trip (is_match)

In [None]:
import os
import pandas as pd
import numpy as np

# -----------------------
# Config（改路徑）
# -----------------------
USER_PATH = "../../cleaned_data/user_cleaned.csv"
TRIP_MERGED_PATH = "../../merged_data/trip_merged.csv"
RDS_TRIP_MERGED_PATH = "../../merged_data/RDS_trip_merged.csv"  # 用來取 duplicate_id
TEST_TRIP_PATH = "../../data/test_trip.csv"

OUT_PATH = "../data/cleaned_v2.csv"
TMP_PATH = "_tmp_filtered_trip_merged.csv"  # 暫存 Step A 篩完的最小資料

START_DATE = pd.Timestamp("2025-07-28")
END_DATE   = pd.Timestamp("2026-01-11")  # 含 1/11
REQ_LOOKBACK_DAYS = 60

CHUNKSIZE = 2_000_000
TZ = "Asia/Taipei"

# -----------------------
# Helpers
# -----------------------
def to_taipei(ts: pd.Series) -> pd.Series:
    return pd.to_datetime(ts, utc=True, errors="coerce").dt.tz_convert(TZ)

def week_monday(d: pd.Series) -> pd.Series:
    return d - pd.to_timedelta(d.dt.weekday, unit="D")

def is_nonempty_str(s: pd.Series) -> pd.Series:
    ss = s.astype("string")
    return ss.notna() & (ss.str.strip() != "")

# ---- City group mapping（三區）
north = {'臺北市','新北市','基隆市','桃園市','宜蘭縣','花蓮縣','新竹縣','新竹市'}
central = {'苗栗縣','臺中市','南投縣','彰化縣','雲林縣','嘉義縣','嘉義市'}
south = {'臺南市','高雄市','屏東縣','臺東縣','澎湖縣','金門縣','連江縣'}

def map_city_group(city):
    if pd.isna(city) or city == "":
        city = "臺北市"
    if city in north:
        return "北區"
    if city in central:
        return "中區"
    if city in south:
        return "南區"
    return "北區"

# -----------------------
# 0) 時間窗（台北時區）
# -----------------------
reserve_start = pd.Timestamp(START_DATE.date(), tz=TZ)
reserve_end_excl = pd.Timestamp((END_DATE + pd.Timedelta(days=1)).date(), tz=TZ)
req_min = pd.Timestamp((START_DATE - pd.Timedelta(days=REQ_LOOKBACK_DAYS)).date(), tz=TZ)

# 針對「最後輸出」的 day 下限（拿掉 7/28 以前）
DAY_MIN = pd.Timestamp("2025-07-28")

# -----------------------
# 1) Load user_cleaned（週 -> 類別）
# -----------------------
user = pd.read_csv(
    USER_PATH,
    usecols=["experiment_date", "user_id", "treatment", "source", "ops_type_merged", "city"],
    dtype={"user_id": "int64"},
)
user["experiment_date"] = pd.to_datetime(user["experiment_date"])
user = user.drop_duplicates(["experiment_date", "user_id"])

# （可選）省記憶體
for c in ["treatment", "source", "ops_type_merged", "city"]:
    user[c] = user[c].astype("category")

# -----------------------
# 2) Load test_trip ids（排除）
# -----------------------
test_ids = pd.read_csv(TEST_TRIP_PATH, usecols=["trip_id"])
test_ids["trip_id"] = pd.to_numeric(test_ids["trip_id"], errors="coerce").astype("Int64")
exclude_trip = set(test_ids["trip_id"].dropna().astype("int64").tolist())

# -----------------------
# Step A) 掃 trip_merged.csv：時間條件 + 排除 test_trip，存最小暫存檔
# -----------------------
if os.path.exists(TMP_PATH):
    os.remove(TMP_PATH)

usecols_a = ["trip_id", "user_id", "driver_id", "request_time", "reserve_time"]
dtype_a = {"trip_id": "int64", "user_id": "int64", "driver_id": "string"}

kept_trip_ids = set()

for i, ch in enumerate(pd.read_csv(TRIP_MERGED_PATH, usecols=usecols_a, dtype=dtype_a, chunksize=CHUNKSIZE)):
    # 排除 test_trip（先做，最快降量）
    ch = ch[~ch["trip_id"].isin(exclude_trip)]
    if ch.empty:
        continue

    # 轉台北時區
    ch["request_local"] = to_taipei(ch["request_time"])
    ch["reserve_local"] = to_taipei(ch["reserve_time"])
    ch = ch.dropna(subset=["request_local", "reserve_local"])
    if ch.empty:
        continue

    # request_time >= START-60d
    ch = ch[ch["request_local"] >= req_min]
    if ch.empty:
        continue

    # reserve_time in [START, END]
    ch = ch[(ch["reserve_local"] >= reserve_start) & (ch["reserve_local"] < reserve_end_excl)]
    if ch.empty:
        continue

    # day = request_time(台北) 的日期
    ch["day"] = ch["request_local"].dt.floor("D")

    # experiment_date（週一），用 day 的 naive 算週
    day_naive = ch["day"].dt.tz_localize(None)
    ch["experiment_date"] = week_monday(day_naive)

    # 只留最小欄位
    out_ch = ch[["trip_id", "user_id", "driver_id", "day", "experiment_date"]].copy()

    # append 寫入暫存
    out_ch.to_csv(TMP_PATH, mode="a", header=not os.path.exists(TMP_PATH), index=False, encoding="utf-8-sig")

    kept_trip_ids.update(out_ch["trip_id"].unique().tolist())
    print(f"[A chunk {i}] kept_rows={len(out_ch):,} unique_trip_ids={len(kept_trip_ids):,}")

print("Step A done. filtered trips:", len(kept_trip_ids))

# -----------------------
# Step B) 掃 RDS_trip_merged.csv：建立 trip_id -> duplicate_id mapping
# -----------------------
dup_map = {}

usecols_b = ["trip_id", "duplicate_id"]
dtype_b = {"trip_id": "int64", "duplicate_id": "int8"}

for j, ch in enumerate(pd.read_csv(RDS_TRIP_MERGED_PATH, usecols=usecols_b, dtype=dtype_b, chunksize=CHUNKSIZE)):
    ch = ch[ch["trip_id"].isin(kept_trip_ids)]
    if ch.empty:
        continue

    for tid, did in zip(ch["trip_id"].tolist(), ch["duplicate_id"].tolist()):
        if tid not in dup_map:
            dup_map[tid] = int(did)

    print(f"[B chunk {j}] found={len(ch):,} mapped={len(dup_map):,}")

print("Step B done. dup_map size:", len(dup_map))

# -----------------------
# Step C) 讀暫存檔：補 duplicate_id -> filter duplicate_id=0 -> merge user -> 算媒合率
# -----------------------
parts = []

usecols_c = ["trip_id", "user_id", "driver_id", "day", "experiment_date"]
dtype_c = {"trip_id": "int64", "user_id": "int64", "driver_id": "string", "day": "string", "experiment_date": "string"}

for k, ch in enumerate(pd.read_csv(TMP_PATH, usecols=usecols_c, dtype=dtype_c, chunksize=CHUNKSIZE)):

    # day：統一解析成 UTC 再轉台北，最後取當日 00:00
    ch["day"] = pd.to_datetime(ch["day"], errors="coerce", utc=True).dt.tz_convert(TZ).dt.floor("D")

    # experiment_date：naive datetime
    ch["experiment_date"] = pd.to_datetime(ch["experiment_date"], errors="coerce")

    ch = ch.dropna(subset=["day", "experiment_date"])
    if ch.empty:
        continue

    # duplicate_id
    ch["duplicate_id"] = ch["trip_id"].map(dup_map)
    ch = ch.dropna(subset=["duplicate_id"])
    ch = ch[ch["duplicate_id"].astype("int64") == 0]
    if ch.empty:
        continue

    # merge 類別
    m = ch.merge(user, on=["user_id", "experiment_date"], how="left", validate="many_to_one")

    # success
    m["is_success"] = is_nonempty_str(m["driver_id"])

    # 城市分區（三區）
    m["city_group"] = m["city"].map(map_city_group)

    # groupby 計數
    g = (
        m.groupby(["day", "treatment", "source", "ops_type_merged", "city_group"], dropna=False)
         .agg(total_cnt=("trip_id", "size"), success_cnt=("is_success", "sum"))
         .reset_index()
    )
    parts.append(g)
    print(f"[C chunk {k}] kept_rows={len(m):,} groups={len(g):,}")

# -----------------------
# Final aggregate & compute rate
# -----------------------
if not parts:
    out = pd.DataFrame(columns=["day", "treatment", "source", "ops_type_merged", "city_group", "is_match_rate"])
else:
    out = pd.concat(parts, ignore_index=True)
    out = (
        out.groupby(["day", "treatment", "source", "ops_type_merged", "city_group"], dropna=False)[["total_cnt", "success_cnt"]]
           .sum()
           .reset_index()
    )
    out["is_match_rate"] = np.where(out["total_cnt"] > 0, out["success_cnt"] / out["total_cnt"], np.nan)

    # 轉 day 成 YYYY-MM-DD 字串
    out["day"] = pd.to_datetime(out["day"], utc=True, errors="coerce").dt.tz_convert(TZ).dt.strftime("%Y-%m-%d")

    # 只保留輸出欄位
    out = out[["day", "treatment", "source", "ops_type_merged", "city_group", "is_match_rate"]].copy()

# -----------------------
# FINAL FILTER BEFORE SAVING（主動把不合條件的拿掉 + 拿掉 7/28 以前）
# -----------------------
# 乾淨字串
for c in ["day", "treatment", "source", "ops_type_merged", "city_group"]:
    out[c] = out[c].astype("string").str.strip()

# (4) keys 不可為空
key_ok = (
    out["day"].notna() & (out["day"] != "") &
    out["treatment"].notna() & (out["treatment"] != "") &
    out["source"].notna() & (out["source"] != "") &
    out["ops_type_merged"].notna() & (out["ops_type_merged"] != "") &
    out["city_group"].notna() & (out["city_group"] != "")
)

# 拿掉 2025-07-28 以前
day_dt = pd.to_datetime(out["day"], errors="coerce")
rule_day_min = day_dt >= DAY_MIN

src = out["source"]
trt = out["treatment"]

# 用 contains 比較穩（避免 "控制組 " 或 "隨機組-xxx"）
is_control = src.str.contains("控制", na=False)
is_random  = src.str.contains("隨機", na=False)

# (1) 控制組 -> treatment 必須是 不發
rule1 = (~is_control) | (trt == "不發")

# (2) 隨機組 -> treatment 一定不能是 不發
rule2 = (~is_random) | (trt != "不發")

# (3) 隨機組日期門檻
has_x2 = trt.str.contains("x2元", na=False)
cut_no_x2 = pd.Timestamp("2025-10-06")
cut_has_x2 = pd.Timestamp("2025-11-03")

rule3 = (
    (~is_random) |
    ((~has_x2) & (day_dt >= cut_no_x2)) |
    (has_x2 & (day_dt >= cut_has_x2))
)

out = out[key_ok & rule_day_min & rule1 & rule2 & rule3].copy()

# 排序 & 輸出
out = out.sort_values(["day", "treatment", "source", "ops_type_merged", "city_group"]).reset_index(drop=True)
out.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")

print("Saved:", OUT_PATH, "rows:", len(out))

## trip_label (trip_cnt)

In [2]:
import pandas as pd
import numpy as np

# -----------------------
# Config (改這裡)
# -----------------------
USER_PATH = "../../cleaned_data/user_cleaned.csv"
TRIP_LABEL_PATH = "../../merged_data/trip_label_merged.csv"
OUT_PATH = "../data/cleaned_v3.csv"

START_DATE = pd.Timestamp("2025-07-28")
END_DATE   = pd.Timestamp("2026-01-11")  # 含 1/11
CHUNKSIZE = 2_000_000

# -----------------------
# Helper
# -----------------------
def week_monday(d: pd.Series) -> pd.Series:
    """把日期對齊到週一作為 experiment_date（週起始=週一）"""
    return d - pd.to_timedelta(d.dt.weekday, unit="D")

# -----------------------
# 1) Load user_cleaned (週 -> 類別)
# -----------------------
user = pd.read_csv(
    USER_PATH,
    usecols=["experiment_date", "user_id", "treatment", "source", "ops_type_merged", "city"],
    dtype={"user_id": "int64"},
)
user["experiment_date"] = pd.to_datetime(user["experiment_date"], errors="coerce")
user = user.dropna(subset=["experiment_date"])
user = user.drop_duplicates(["experiment_date", "user_id"])

# -----------------------
# 2) Chunk read trip_label_merged and aggregate trip_cnt
# -----------------------
agg_parts = []

usecols = ["trip_id", "user_id", "trip_date"]
dtypes = {"trip_id": "int64", "user_id": "int64"}

end_exclusive = END_DATE + pd.Timedelta(days=1)

for i, chunk in enumerate(pd.read_csv(TRIP_LABEL_PATH, usecols=usecols, dtype=dtypes, chunksize=CHUNKSIZE)):
    # trip_date -> day_date
    chunk["day_date"] = pd.to_datetime(chunk["trip_date"], errors="coerce")
    chunk = chunk.dropna(subset=["day_date"])
    if chunk.empty:
        continue

    # 篩日期區間
    chunk = chunk[(chunk["day_date"] >= START_DATE) & (chunk["day_date"] < end_exclusive)]
    if chunk.empty:
        continue

    # 轉成週（週一）用來 merge user_cleaned
    chunk["experiment_date"] = week_monday(chunk["day_date"])

    # merge 出類別
    merged = chunk.merge(
        user,
        on=["experiment_date", "user_id"],
        how="left",
        validate="many_to_one",
    )

    # groupby 計數 trip_id（每日每組 trip_cnt）
    g = (
        merged.groupby(["day_date", "treatment", "source", "ops_type_merged", "city"], dropna=False)["trip_id"]
        .nunique()  # trip_id 通常唯一；nunique 更保險
        .reset_index(name="trip_cnt")
    )
    agg_parts.append(g)

    print(f"[chunk {i}] kept_rows={len(merged):,} groups={len(g):,}")

# -----------------------
# 3) Final aggregate (合併各 chunk 結果)
# -----------------------
if len(agg_parts) == 0:
    out = pd.DataFrame(columns=["day", "treatment", "source", "ops_type_merged", "city", "trip_cnt"])
else:
    out = pd.concat(agg_parts, ignore_index=True)
    out = (
        out.groupby(["day_date", "treatment", "source", "ops_type_merged", "city"], dropna=False)["trip_cnt"]
        .sum()
        .reset_index()
        .rename(columns={"day_date": "day"})
        .sort_values(["day", "treatment", "source", "ops_type_merged", "city"])
    )

out.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")
print("Saved:", OUT_PATH, "rows:", len(out))

[chunk 0] kept_rows=1,578,262 groups=10,849
[chunk 1] kept_rows=1,864,410 groups=31,181
[chunk 2] kept_rows=61,235 groups=2,858
Saved: ../data/cleaned_v3.csv rows: 39334


## merged trip data

In [4]:
import pandas as pd
import numpy as np

# -----------------------
# Config (改路徑)
# -----------------------
V1_PATH = "../data/cleaned_v1.csv"  # nonrepeat_cnt (city)
V2_PATH = "../data/cleaned_v2.csv"  # is_match_rate (city_group)
V3_PATH = "../data/cleaned_v3.csv"  # trip_cnt (city)

OUT_PATH = "../data/daily_trip_merged.csv"

# -----------------------
# City -> City_group mapping（三區）
# -----------------------
north = {'臺北市','新北市','基隆市','桃園市','宜蘭縣','花蓮縣','新竹縣','新竹市'}
central = {'苗栗縣','臺中市','南投縣','彰化縣','雲林縣','嘉義縣','嘉義市'}
south = {'臺南市','高雄市','屏東縣','臺東縣','澎湖縣','金門縣','連江縣'}

def map_city_group(city):
    if pd.isna(city) or city == "":
        city = "臺北市"
    if city in north:
        return "北區"
    if city in central:
        return "中區"
    if city in south:
        return "南區"
    return "北區"

def strip_cols(df, cols):
    for c in cols:
        if c in df.columns:
            df[c] = df[c].astype("string").str.strip()
    return df

def nonempty(s: pd.Series) -> pd.Series:
    ss = s.astype("string").str.strip()
    return ss.notna() & (ss != "")

# -----------------------
# 1) Load
# -----------------------
v1 = pd.read_csv(V1_PATH)
v2 = pd.read_csv(V2_PATH)
v3 = pd.read_csv(V3_PATH)

key_base = ["day", "treatment", "source", "ops_type_merged"]

v1 = strip_cols(v1, key_base + ["city"])
v2 = strip_cols(v2, key_base + ["city_group"])
v3 = strip_cols(v3, key_base + ["city"])

v1["day"] = pd.to_datetime(v1["day"], errors="coerce")
v2["day"] = pd.to_datetime(v2["day"], errors="coerce")
v3["day"] = pd.to_datetime(v3["day"], errors="coerce")

# -----------------------
# 2) v1/v3: city -> city_group，再聚合到 city_group 層級
# -----------------------
v1["city_group"] = v1["city"].map(map_city_group)
v3["city_group"] = v3["city"].map(map_city_group)

v1_g = (
    v1.groupby(key_base + ["city_group"], dropna=False, as_index=False)["nonrepeat_cnt"]
      .sum()
)

v3_g = (
    v3.groupby(key_base + ["city_group"], dropna=False, as_index=False)["trip_cnt"]
      .sum()
)

# -----------------------
# 3) merge: v1_g + v3_g + v2
# -----------------------
m = v3_g.merge(v1_g, on=key_base + ["city_group"], how="outer")
m = m.merge(
    v2.rename(columns={"is_match_rate": "match_rate"}),
    on=key_base + ["city_group"],
    how="outer"
)

# -----------------------
# 4) format + drop empty keys + save
# -----------------------
m = m[["day","treatment","source","ops_type_merged","city_group","trip_cnt","nonrepeat_cnt","match_rate"]].copy()

# 轉 day 回字串（保留排序前先不轉也可以；這裡先轉，方便輸出）
m["day"] = pd.to_datetime(m["day"], errors="coerce").dt.strftime("%Y-%m-%d")

# 數值欄位轉 numeric
for c in ["trip_cnt", "nonrepeat_cnt", "match_rate"]:
    m[c] = pd.to_numeric(m[c], errors="coerce")

# ✅ 拿掉 treatment/source/ops_type_merged/city_group 任何空值或空字串
m = strip_cols(m, ["treatment","source","ops_type_merged","city_group"])
mask_keep = (
    nonempty(m["treatment"]) &
    nonempty(m["source"]) &
    nonempty(m["ops_type_merged"]) &
    nonempty(m["city_group"])
)
m = m[mask_keep].copy()

# 排序
m = m.sort_values(["day","treatment","source","ops_type_merged","city_group"], na_position="last").reset_index(drop=True)

m.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")
print("Saved:", OUT_PATH, "rows:", len(m))

Saved: ../data/daily_trip_merged.csv rows: 20160


# Weekend/Holiday data

In [6]:
import pandas as pd

# -----------------------
# Config (改路徑)
# -----------------------
IN_PATH  = "../data/daily_trip_merged.csv"
OUT_PATH = "../data/daily_trip_merged_with_weekend.csv"

# -----------------------
# Holidays (固定日 + 指定國定假日)
# -----------------------
HOLIDAYS = {
    "2025-09-29",
    "2025-10-06",
    "2025-10-10",
    "2025-10-24",
    "2025-12-25",
    "2026-01-01",
}

# -----------------------
# Load
# -----------------------
df = pd.read_csv(IN_PATH)

# day -> datetime
df["day_dt"] = pd.to_datetime(df["day"], errors="coerce")

# 週末：Saturday(5) / Sunday(6)
is_weekend = df["day_dt"].dt.dayofweek.isin([5, 6])

# 指定國定假日
is_holiday = df["day"].astype(str).isin(HOLIDAYS)

# weekend OR holiday -> 1 else 0
df["is_weekend_holiday"] = (is_weekend | is_holiday).astype("int8")

# 清掉暫存欄位
df = df.drop(columns=["day_dt"])

# Save
df.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")
print("Saved:", OUT_PATH, "rows:", len(df))


Saved: ../data/daily_trip_merged_with_weekend.csv rows: 20160


# Weather data

In [8]:
import pandas as pd
import numpy as np

# -----------------------
# Config (改路徑)
# -----------------------
BASE_PATH = "../data/daily_trip_merged_with_weekend.csv"
WEATHER_PATH = "../../data/weather.csv"             
OUT_PATH = "../data/daily_trip_merged_with_weekend_and_weather.csv"

# -----------------------
# City group mapping（三區，固定分母用）
# -----------------------
north = {'臺北市','新北市','基隆市','桃園市','宜蘭縣','花蓮縣','新竹縣','新竹市'}
central = {'苗栗縣','臺中市','南投縣','彰化縣','雲林縣','嘉義縣','嘉義市'}
south = {'臺南市','高雄市','屏東縣','臺東縣','澎湖縣','金門縣','連江縣'}

def map_city_group(city):
    if pd.isna(city) or city == "":
        city = "臺北市"
    if city in north:
        return "北區"
    if city in central:
        return "中區"
    if city in south:
        return "南區"
    return "北區"

GROUP_CITIES = {
    "北區": sorted(list(north)),
    "中區": sorted(list(central)),
    "南區": sorted(list(south)),
}
GROUP_DENOM = {g: len(cities) for g, cities in GROUP_CITIES.items()}

# -----------------------
# Helpers
# -----------------------
def strip_col(df, c):
    if c in df.columns:
        df[c] = df[c].astype("string").str.strip()
    return df

# -----------------------
# 1) Load base
# -----------------------
base = pd.read_csv(BASE_PATH)
base = strip_col(base, "day")
base = strip_col(base, "city_group")
base["day_dt"] = pd.to_datetime(base["day"], errors="coerce")
# 保險：若 city_group 有空/怪值，也先丟掉（你前面應該已清過）
base = base.dropna(subset=["day_dt", "city_group"]).copy()

# -----------------------
# 2) Load weather
# -----------------------
w = pd.read_csv(WEATHER_PATH)

# 只留需要欄位
need_cols = ["cityName", "forecastDate", "publishDate", "precipChance"]
w = w[need_cols].copy()

# 清理字串
w = strip_col(w, "cityName")
w = strip_col(w, "forecastDate")
w = strip_col(w, "publishDate")

# 轉日期
w["forecast_dt"] = pd.to_datetime(w["forecastDate"], errors="coerce")
w["publish_dt"]  = pd.to_datetime(w["publishDate"], errors="coerce")

# precipChance 轉數字
w["precipChance"] = pd.to_numeric(w["precipChance"], errors="coerce")

# 丟掉關鍵缺失
w = w.dropna(subset=["cityName", "forecast_dt", "publish_dt", "precipChance"]).copy()

# -----------------------
# 3) 套 publishDate == forecastDate - 7 天 的規則
# -----------------------
w["publish_expected"] = w["forecast_dt"] - pd.Timedelta(days=7)
w = w[w["publish_dt"].dt.date == w["publish_expected"].dt.date].copy()

# 如果你的 publishDate 是日期(無時間)也OK，上面用 date 對齊最穩

# -----------------------
# 4) city/day 是否下雨：同一天(早晚兩筆)只要任一筆 precipChance>=60
# -----------------------
w["is_rainy_city"] = (w["precipChance"] >= 60).astype("int8")

city_day = (
    w.groupby(["cityName", "forecast_dt"], as_index=False)["is_rainy_city"]
     .max()
)

# -----------------------
# 5) city -> city_group，算每區每日 is_rainy = rainy_city_cnt / 該區城市總數(固定分母)
# -----------------------
city_day["city_group"] = city_day["cityName"].map(map_city_group)

group_day = (
    city_day.groupby(["forecast_dt", "city_group"], as_index=False)["is_rainy_city"]
            .sum()
            .rename(columns={"is_rainy_city": "rainy_city_cnt"})
)

group_day["denom_city_cnt"] = group_day["city_group"].map(GROUP_DENOM).astype("int16")
group_day["is_rainy"] = np.where(
    group_day["denom_city_cnt"] > 0,
    group_day["rainy_city_cnt"] / group_day["denom_city_cnt"],
    np.nan
)

# 只留要 merge 的欄位
group_day = group_day.rename(columns={"forecast_dt": "day_dt"})[["day_dt", "city_group", "is_rainy"]]

# -----------------------
# 6) Merge 回主檔（by day + city_group）
# -----------------------
out = base.merge(group_day, on=["day_dt", "city_group"], how="left")

# 還原 day 字串、移除 day_dt
out["day"] = out["day_dt"].dt.strftime("%Y-%m-%d")
out = out.drop(columns=["day_dt"])

# -----------------------
# 7) Save
# -----------------------
out.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")
print("Saved:", OUT_PATH, "rows:", len(out))

Saved: ../data/daily_trip_merged_with_weekend_and_weather.csv rows: 20160


# Coupon data

In [18]:
from pathlib import Path
import pandas as pd
import numpy as np
from datetime import timedelta

# =========================================================
# Paths
# =========================================================
USER_PATH  = Path("../../cleaned_data/user_cleaned.csv")
COUPON_PATH = Path("../../merged_data/coupon_merged.csv")
CAT_PATH   = Path("../../data/coupon_category.csv")

DAILY_TRIP_PATH = Path("../data/daily_trip_merged_with_weekend_and_weather.csv")
OUT_PATH = Path("../data/daily_trip_merged_with_weekend_and_weather_with_coupon.csv")

# =========================================================
# City group mapping (your definition)
# =========================================================
north = {'臺北市','新北市','基隆市','桃園市','宜蘭縣','花蓮縣','新竹縣','新竹市'}
central = {'苗栗縣','臺中市','南投縣','彰化縣','雲林縣','嘉義縣','嘉義市'}
south = {'臺南市','高雄市','屏東縣','臺東縣','澎湖縣','金門縣','連江縣'}

def map_city_group(city):
    if pd.isna(city) or city == "":
        city = "臺北市"
    if city in north:
        return "北區"
    if city in central:
        return "中區"
    if city in south:
        return "南區"
    return "北區"

GROUP_CITIES = {
    "北區": sorted(list(north)),
    "中區": sorted(list(central)),
    "南區": sorted(list(south)),
}

# =========================================================
# Coupon type buckets (8)
# =========================================================
coupon_cols = [
    "coupon_BD", "coupon_CDP", "coupon_folk", "coupon_growth_other",
    "coupon_MGM", "coupon_MKT", "coupon_register", "coupon_daily"
]
coupon_total_cols = [f"{c}_total" for c in coupon_cols]

# =========================================================
# Helper: UTC string -> Asia/Taipei date
# =========================================================
def utcstr_to_taipei_date(series: pd.Series) -> pd.Series:
    dt = pd.to_datetime(series, utc=True, errors="coerce")
    return dt.dt.tz_convert("Asia/Taipei").dt.date

# =========================================================
# 1) Load daily trip dataset (target)
# =========================================================
daily = pd.read_csv(DAILY_TRIP_PATH)
daily["day"] = pd.to_datetime(daily["day"], errors="coerce").dt.date

need_cols = ["day", "treatment", "source", "ops_type_merged", "city_group"]
missing = [c for c in need_cols if c not in daily.columns]
if missing:
    raise ValueError(f"[daily_trip] Missing required columns: {missing}")

# Normalize city_group to your labels (in case any whitespace)
daily["city_group"] = daily["city_group"].astype(str).str.strip()

# =========================================================
# 2) Load user_cleaned (weekly assignment; experiment_date is Monday)
# =========================================================
user = pd.read_csv(USER_PATH)

user["experiment_date"] = pd.to_datetime(user["experiment_date"], errors="coerce").dt.date
user["user_id"] = pd.to_numeric(user["user_id"], errors="coerce").astype("Int64")

# Build city_group using YOUR mapping
if "city" not in user.columns:
    raise ValueError("[user_cleaned] Expected a 'city' column to map into city_group.")

user["city"] = user["city"].astype(str).str.strip().replace({"nan": ""})
user["city_group"] = user["city"].apply(map_city_group)

user_map = user[[
    "user_id", "experiment_date", "treatment", "source", "ops_type_merged", "city_group"
]].drop_duplicates()

# week_end = Sunday
user_map["week_end"] = user_map["experiment_date"].apply(lambda d: d + timedelta(days=6))

# =========================================================
# 3) Load coupon_category + filters
# =========================================================
coupon_category = pd.read_csv(CAT_PATH)

coupon_category["promo_id"] = pd.to_numeric(coupon_category["promo_id"], errors="coerce").astype("Int64")
coupon_category["enable_date"] = pd.to_datetime(coupon_category["enable_date"], errors="coerce").dt.date

pattern = r"(機場|機接|接機|送機|出國|test|測試|租車券|旅遊)"
coupon_category["promo_title"] = coupon_category["promo_title"].astype(str)

coupon_category_f = coupon_category.loc[
    (coupon_category["promo_id"].fillna(0) >= 10508458) &
    (~coupon_category["promo_title"].str.contains(pattern, case=False, regex=True, na=False)),
    ["promo_id", "coupon_category", "coupon_type", "enable_date"]
].copy()

# =========================================================
# 4) Chunk read coupon_merged and aggregate DAILY group totals (active coupons)
# Grain: day + treatment + source + ops_type_merged + city_group
# =========================================================
usecols = ["user_id", "promo_id", "expiry_date", "redeem_time"]
dtype = {
    "user_id": "Int64",
    "promo_id": "Int64",
    "expiry_date": "string",
    "redeem_time": "string",
}

acc = {}  # key=(day, treatment, source, ops_type_merged, city_group) -> np.array(8)

chunksize = 1_000_000
reader = pd.read_csv(
    COUPON_PATH,
    usecols=usecols,
    dtype=dtype,
    chunksize=chunksize,
    low_memory=True
)

group_cols_day = ["day", "treatment", "source", "ops_type_merged", "city_group"]

for i, chunk in enumerate(reader, start=1):
    chunk["user_id"] = pd.to_numeric(chunk["user_id"], errors="coerce").astype("Int64")
    chunk["promo_id"] = pd.to_numeric(chunk["promo_id"], errors="coerce").astype("Int64")

    # Apply promo filter via inner join
    c = chunk.merge(coupon_category_f, on="promo_id", how="inner")
    if c.empty:
        continue

    # expiry_date_eff = redeem_time (Taipei date) if present else expiry_date (Taipei date)
    c["expiry_date_eff"] = utcstr_to_taipei_date(c["redeem_time"])
    m = c["expiry_date_eff"].isna()
    if m.any():
        c.loc[m, "expiry_date_eff"] = utcstr_to_taipei_date(c.loc[m, "expiry_date"])

    # Drop missing enable/expiry
    c = c.loc[c["enable_date"].notna() & c["expiry_date_eff"].notna()].copy()
    if c.empty:
        continue

    # Join weekly group assignment
    cu = c.merge(user_map, on="user_id", how="inner")
    if cu.empty:
        continue

    # Overlap between coupon active window and the user's week window [Mon..Sun]
    cu["overlap_start"] = cu[["enable_date", "experiment_date"]].max(axis=1)
    cu["overlap_end"]   = cu[["expiry_date_eff", "week_end"]].min(axis=1)

    cu = cu.loc[cu["overlap_start"] <= cu["overlap_end"]].copy()
    if cu.empty:
        continue

    # Bucketize (0/1 per row)
    cu["coupon_BD"] = (cu["coupon_type"].isin(["BD", "異業合作"])).astype(np.int32)
    cu["coupon_CDP"] = ((cu["coupon_type"] == "Folksonomy") & (cu["coupon_category"] == "Event-Triggered")).astype(np.int32)
    cu["coupon_folk"] = ((cu["coupon_type"] == "Folksonomy") & (cu["coupon_category"] == "Gene")).astype(np.int32)
    cu["coupon_growth_other"] = (
        (cu["coupon_type"] == "Folksonomy") &
        (~cu["coupon_category"].isin(["Event-Triggered", "Gene"]))
    ).astype(np.int32)
    cu["coupon_MGM"] = (cu["coupon_type"] == "MGM").astype(np.int32)
    cu["coupon_MKT"] = (cu["coupon_type"] == "MKT").astype(np.int32)
    cu["coupon_register"] = (cu["coupon_type"] == "新註冊").astype(np.int32)
    cu["coupon_daily"] = (cu["coupon_type"] == "天天領").astype(np.int32)

    # Expand to daily within overlap (max 7 days per row)
    cu["day"] = cu.apply(
        lambda r: pd.date_range(r["overlap_start"], r["overlap_end"], freq="D").date.tolist(),
        axis=1
    )
    cu = cu.explode("day", ignore_index=True)

    # Daily totals in this chunk
    g = cu.groupby(group_cols_day, as_index=False)[coupon_cols].sum()

    # Accumulate into dict
    for row in g.itertuples(index=False):
        key = (row.day, row.treatment, row.source, row.ops_type_merged, row.city_group)
        vec = np.array([getattr(row, col) for col in coupon_cols], dtype=np.int64)
        if key in acc:
            acc[key] += vec
        else:
            acc[key] = vec

    if i % 5 == 0:
        print(f"processed chunks: {i}, acc daily-groups: {len(acc):,}")

print(f"✅ Finished reading coupons. Total daily groups aggregated: {len(acc):,}")

# =========================================================
# 5) acc -> DataFrame (coupon_daily_totals)
# =========================================================
if len(acc) == 0:
    coupon_daily_totals = daily[need_cols].drop_duplicates().copy()
    for c in coupon_total_cols:
        coupon_daily_totals[c] = 0
else:
    rows = []
    for k, v in acc.items():
        rows.append(list(k) + list(v))
    coupon_daily_totals = pd.DataFrame(rows, columns=group_cols_day + coupon_cols)
    coupon_daily_totals = coupon_daily_totals.rename(columns={c: f"{c}_total" for c in coupon_cols})

# =========================================================
# 6) Merge into daily trip data and save
# =========================================================
out = daily.merge(
    coupon_daily_totals,
    on=["day", "treatment", "source", "ops_type_merged", "city_group"],
    how="left"
)

for c in coupon_total_cols:
    if c not in out.columns:
        out[c] = 0
    out[c] = out[c].fillna(0).astype(np.int64)

out.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")
print(f"Saved merged output to: {OUT_PATH.resolve()}")

  (~coupon_category["promo_title"].str.contains(pattern, case=False, regex=True, na=False)),


processed chunks: 5, acc daily-groups: 17,196
processed chunks: 10, acc daily-groups: 21,130
processed chunks: 15, acc daily-groups: 21,504
✅ Finished reading coupons. Total daily groups aggregated: 21,504
Saved merged output to: /Users/shotime/Desktop/2026winter_project/day_model/data/daily_trip_merged_with_weekend_and_weather_with_coupon.csv
