In [None]:
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'


import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import LabelEncoder
import gradio as gr


# 資料載入
drivers_df = pd.read_csv("./data/drivers_updated.csv")
winners_df = pd.read_csv("./data/winners.csv")
teams_df = pd.read_csv("./data/teams_updated.csv")
laps_df = pd.read_csv("./data/fastest_laps_updated.csv")


# 模型定義
class MultiTaskLSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes_driver, num_classes_team, num_classes_pos, stat_feature_size=1, num_layers=2):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size, 
            hidden_size, 
            num_layers=num_layers, 
            batch_first=True, 
            bidirectional=True
        )
        combined_size = hidden_size * 2 + stat_feature_size
        self.driver_head = nn.Linear(combined_size, num_classes_driver)
        self.team_head = nn.Linear(combined_size, num_classes_team)
        self.pos_head = nn.Linear(combined_size, num_classes_pos)

    def forward(self, x, stats):
        lstm_out, _ = self.lstm(x)
        h = lstm_out[:, -1, :]  # Use last time step
        combined = torch.cat([h, stats], dim=1)
        return self.driver_head(combined), self.team_head(combined), self.pos_head(combined)






def predict(year, gp_name, show_team_rank=True, show_internal_rank=False):
    device = torch.device("cuda")

    # 修正：drivers 多保留 Pos 欄位
    drivers = drivers_df[["Driver", "Car", "year", "Pos"]].copy()
    winners = winners_df.copy()
    teams = teams_df.copy()
    laps_data = laps_df.copy()

    winners["year"] = pd.to_datetime(winners["Date"]).dt.year
    winners = winners.rename(columns={"Winner": "Driver"})

    # merge 只要一次
    df = winners.merge(drivers, on=["Driver", "Car", "year"], how="left")
    df = df.merge(laps_df, on=["Grand Prix", "Driver", "Car", "year"], how="left")
    df = df.merge(teams_df, left_on=["Car", "year"], right_on=["Team", "year"], how="left")

    # 只保留 Pos 欄位可以轉成數字的資料
    df = df[pd.to_numeric(df["Pos_x"], errors="coerce").notna()].copy()
    df["Pos"] = pd.to_numeric(df["Pos_x"], errors="coerce")

    # 只留下你要的欄位
    df = df[["Grand Prix", "Date", "Driver", "Car", "year", "Pos", "Team"]]
    df = df.dropna(subset=["Pos", "Driver", "Team", "Grand Prix"])

    start_year = max(1950, year - 20)
    df = df[(df["year"] >= start_year) & (df["year"] <= year)].copy()
    if df.empty:
        return "歷史資料不足"

    le_driver = LabelEncoder()
    le_team = LabelEncoder()
    le_gp = LabelEncoder()
    df["Driver_enc"] = le_driver.fit_transform(df["Driver"].astype(str))
    df["Team_enc"] = le_team.fit_transform(df["Team"].astype(str))
    df["GP_enc"] = le_gp.fit_transform(df["Grand Prix"].astype(str))  # 留作檢查，不參與訓練

    main_features = ["year"]  # 不使用 GP_enc
    df[main_features] = df[main_features].apply(pd.to_numeric, errors="coerce").fillna(0)

    def get_seq_features(df, driver, year, n_seq=10):
        hist = df[(df["Driver"] == driver) & (df["year"] < year)].sort_values("year", ascending=False)
        feats = hist.head(n_seq)[main_features].values
        if len(feats) < n_seq:
            pad = np.zeros((n_seq - len(feats), len(main_features)))
            feats = np.vstack([feats, pad])
        return feats

    def get_stat_features(df, driver, year):
        recent = df[(df["Driver"] == driver) & (df["year"] >= year - 5) & (df["year"] < year)]
        total_wins = len(recent)
        return np.array([total_wins], dtype=np.float32)

    X_seq, X_stat, y_driver, y_team, y_pos = [], [], [], [], []
    for _, row in df.iterrows():
        X_seq.append(get_seq_features(df, row["Driver"], row["year"]))
        X_stat.append(get_stat_features(df, row["Driver"], row["year"]))
        y_driver.append(row["Driver"])
        y_team.append(row["Team"])
        y_pos.append(row["Pos"])

    if not X_seq:
        return "序列資料不足"

    X_seq = np.stack(X_seq)
    X_stat = np.stack(X_stat)
    y_driver_enc = le_driver.transform(y_driver)
    y_team_enc = le_team.transform(y_team)
    y_pos_tensor = torch.tensor(y_pos, dtype=torch.float32).to(device)

    model = MultiTaskLSTMClassifier(
        input_size=len(main_features),
        hidden_size=128,
        num_classes_driver=len(le_driver.classes_),
        num_classes_team=len(le_team.classes_),
        num_classes_pos=1,
        stat_feature_size=X_stat.shape[1],
    ).to(device)

    model_path = f"./model/model_f1_{year}_{gp_name}.pt"
    if os.path.exists(model_path):
        state_dict = torch.load(model_path, map_location=device, weights_only=True)
        model.load_state_dict(state_dict)
        model.eval()
    else:
        X_tensor = torch.tensor(X_seq, dtype=torch.float32).to(device)
        X_stat_tensor = torch.tensor(X_stat, dtype=torch.float32).to(device)
        y_driver_tensor = torch.tensor(y_driver_enc, dtype=torch.long).to(device)
        y_team_tensor = torch.tensor(y_team_enc, dtype=torch.long).to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
        criterion = nn.CrossEntropyLoss(reduction="none")
        criterion_ce = nn.CrossEntropyLoss()
        criterion_pos = nn.SmoothL1Loss()

        DECAY = -0.08  # 調舊資料影響度
        weights = np.exp(DECAY * (year - df["year"].values))
        weights_tensor = torch.tensor(weights, dtype=torch.float32).to(device)

        # ==== 第一輪：常規訓練 ====
        for _ in range(500):
            model.train()
            optimizer.zero_grad()
            out_driver, out_team, out_pos = model(X_tensor, X_stat_tensor)
            loss_driver = criterion(out_driver, y_driver_tensor)
            loss_team = criterion(out_team, y_team_tensor)
            loss_pos = criterion_pos(out_pos.squeeze(), y_pos_tensor)
            loss = 0.2 * (loss_driver * weights_tensor).mean() + \
                   0.3 * (loss_team * weights_tensor).mean() + \
                   0.5 * loss_pos
            loss.backward()
            optimizer.step()

        # ==== 產生 pseudo label (軟標籤) ====
        with torch.no_grad():
            model.eval()
            out_driver, _, _ = model(X_tensor, X_stat_tensor)
            pseudo_soft_label = torch.softmax(out_driver / 10, dim=1)  

        # ==== 第二輪：pseudo label 蒸餾訓練 ====
        alpha = 0.7  # 真實標籤 loss 佔比
        for _ in range(150):  # pseudo label 再訓練 150 輪即可
            model.train()
            optimizer.zero_grad()
            out_driver, out_team, out_pos = model(X_tensor, X_stat_tensor)
            # 硬標籤loss
            loss_driver_hard = criterion_ce(out_driver, y_driver_tensor)
            # 軟標籤loss（KLDiv）
            loss_driver_soft = nn.KLDivLoss(reduction="batchmean")(
                torch.log_softmax(out_driver, dim=1),
                pseudo_soft_label
            )
            loss_team = criterion(out_team, y_team_tensor)
            loss_pos = criterion_pos(out_pos.squeeze(), y_pos_tensor)
            # 混合loss
            loss = alpha * loss_driver_hard + (1-alpha) * loss_driver_soft \
                   + 0.3 * (loss_team * weights_tensor).mean() + \
                   0.5 * loss_pos
            loss.backward()
            optimizer.step()

        torch.save(model.state_dict(), model_path)

    if gp_name not in le_gp.classes_:
        return "GP 名稱無效"

    recent_years = set(range(year - 3, year))
    driver_years = drivers.groupby("Driver")["year"].apply(set).to_dict()
    active_indices = [i for i, name in enumerate(le_driver.classes_) if recent_years & driver_years.get(name, set())]
    if len(active_indices) == 0:
        return "查無足夠活躍車手"

    full_df = winners.merge(teams, left_on=["Car", "year"], right_on=["Team", "year"], how="left")
    full_df = full_df.dropna(subset=["Driver", "Team"])
    last_team_map = (
        full_df.sort_values("year")
        .drop_duplicates(subset=["Driver"], keep="last")
        .set_index("Driver")["Team"]
        .to_dict()
    )

    driver_records = []
    for idx in active_indices:
        driver = le_driver.classes_[idx]
        team = last_team_map.get(driver, "未知車隊")
        input_tensor = torch.tensor(np.expand_dims(get_seq_features(df, driver, year), axis=0), dtype=torch.float32).to(device)
        stat_tensor = torch.tensor(np.expand_dims(get_stat_features(df, driver, year), axis=0), dtype=torch.float32).to(device)
        with torch.no_grad():
            out_driver, _, out_pos = model(input_tensor, stat_tensor)
            TEMPERATURE = 5  
            confidence = torch.softmax(out_driver / TEMPERATURE, dim=1)[0, idx].item() * 100
            position_score = out_pos.item()
        driver_records.append({"driver": driver, "team": team, "confidence": confidence, "position_score": position_score})

    df_pred = pd.DataFrame(driver_records).sort_values(by="confidence", ascending=False)

    result_lines = []
    team_scores = df_pred.groupby("team")["confidence"].sum().sort_values(ascending=False)

    if show_team_rank:
        result_lines.append("🏁 車隊總排名：")
        for idx, (team, score) in enumerate(team_scores.items(), 1):
            result_lines.append(f"{idx}. {team}（信心度 {score:.2f}%）")
        result_lines.append("")

    if show_internal_rank:
        result_lines.append("👥 車隊內部排序：")
        internal_sorted = df_pred.sort_values(["team", "confidence"], ascending=[True, False])
        for team in team_scores.index:
            result_lines.append(f"{team}：")
            for _, row in internal_sorted[internal_sorted["team"] == team].iterrows():
                result_lines.append(f"  {row['driver']}（{row['confidence']:.2f}%）")
        result_lines.append("")

    for rank, row in enumerate(df_pred.itertuples(), 1):
        result_lines.append(f"第 {rank} 名：{row.driver}（{row.team}） \t信心度：{row.confidence:.2f}% \t位置分數：{row.position_score:.2f}")

    return "\n".join(result_lines)






# === Gradio UI ===
try:
    winners_ui = pd.read_csv("./data/winners.csv")
    gp_names_for_ui = sorted(winners_ui["Grand Prix"].dropna().unique().tolist())
except:
    gp_names_for_ui = ["ERROR"]

def wrapped_predict(year, gp_name, driver_count_option, show_team_rank, show_internal_rank):
    full_text = predict(year, gp_name, show_team_rank, show_internal_rank)
    lines = full_text.splitlines()

    # 擷取個人名次區塊
    if driver_count_option == "Top 5":
        core = [line for line in lines if line.startswith("第 ")][:5]
    elif driver_count_option == "Top 10":
        core = [line for line in lines if line.startswith("第 ")][:10]
    else:
        core = [line for line in lines if line.startswith("第 ")]

    # 擷取車隊排名與排序區塊
    others = []
    capture = False
    for line in lines:
        if line.startswith("🏁") or line.startswith("👥"):
            capture = True
        elif line.startswith("第 "):
            capture = False
        if capture:
            others.append(line)

    result = "\n".join(core + [""] + others)
    result += f"\n\n🚥 預測車手總數：{len(core)} 位"
    return result




with gr.Blocks() as demo:
    gr.Markdown("# 🏎️ F1 排名預測")
    year = gr.Number(label="年份", value=2025)
    gp_name = gr.Dropdown(choices=gp_names_for_ui, label="Grand Prix 名稱", value=gp_names_for_ui[0])
    driver_count_option = gr.Dropdown(choices=["Top 5", "Top 10", "全部"], label="預測車手數量", value="全部")
    show_team_rank = gr.Checkbox(label="顯示車隊總排名", value=True)
    show_internal_rank = gr.Checkbox(label="顯示車隊內排序", value=False)
    output = gr.Textbox(label="預測結果", lines=30)
    btn = gr.Button("預測")
    btn.click(
        wrapped_predict,
        inputs=[year, gp_name, driver_count_option, show_team_rank, show_internal_rank],
        outputs=output
    )

demo.launch()



* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


