<a href="https://colab.research.google.com/github/t8101349/group-project-202503/blob/main/gradio_web_0326.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pandas
!pip install numpy
!pip install rdkit
!pip install scikit-learn
!pip install xgboost
!pip install lightgbm
!pip install psutil

In [None]:
# 以上套件放進 huggingface spaces requirement.txt

In [None]:
# 放到 huggingface spaces app.py
import os
import tempfile
import gradio as gr
from gradio.themes import Base
from process import predict_process
from file_validator import validate_file

# ----------------------- 核心功能函數 -----------------------

def run_prediction(state):
    """
    執行機器學習模型預測

    參數:
    - state: 應用程式狀態字典

    返回:
    - 預測結果狀態訊息
    - 更新後的狀態字典
    """
    # 檢查是否已上傳檔案
    if not state["file_uploaded"] or state["df"] is None:
        return "❌ 請先上傳並確認檔案！", state

    try:
        # 使用機器學習模型進行預測
        result_df = predict_process(state["df"])

        # 更新狀態
        state["result_df"] = result_df
        state["prediction_done"] = True

        return f"✅ 預測完成！共處理 {len(result_df)} 筆資料", state

    except Exception as e:
        return f"❌ 預測錯誤：{str(e)}", state

def generate_file(format_choice, state):
    """
    生成預測結果檔案

    參數:
    - format_choice: 檔案格式 ("CSV" 或 "Parquet")
    - state: 應用程式狀態字典

    返回:
    - 檔案路徑
    - 檔案生成狀態訊息
    - 更新後的狀態字典
    """
    # 檢查是否已完成預測
    if not state["prediction_done"] or state["result_df"] is None:
        return None, "❌ 請先執行預測！", state

    try:
        # 建立臨時目錄
        temp_dir = tempfile.gettempdir()

        # 根據選擇的格式儲存檔案
        if format_choice == "CSV":
            filename = "prediction.csv"
            filepath = os.path.join(temp_dir, filename)
            with open(filepath, 'w') as f:
                state["result_df"].to_csv(f, index=False)
        else:
            filename = "prediction.parquet"
            filepath = os.path.join(temp_dir, filename)
            with open(filepath, 'wb') as f:
                state["result_df"].to_parquet(f, index=False)

        return filepath, f"✅ 已生成 {filename}，點擊下方按鈕即可下載", state

    except Exception as e:
        return None, f"❌ 生成檔案錯誤：{str(e)}", state

def download_file(filepath):
    """
    處理檔案下載

    參數:
    - filepath: 要下載的檔案路徑

    返回:
    - 下載狀態訊息
    """
    if filepath:
        filename = os.path.basename(filepath)
        return f"✅ 已下載 {filename}"
    return "❌ 下載失敗"

# ----------------------- 事件處理函數 -----------------------

def handle_confirm(file, state):
    """
    處理檔案確認事件

    參數:
    - file: 上傳的檔案
    - state: 應用程式狀態字典

    返回:
    - 檔案驗證狀態訊息
    - 更新後的狀態字典
    - 預測按鈕狀態
    - 確認按鈕狀態
    """
    # 驗證上傳的檔案
    status, updated_state = validate_file(file, state)

    return (
        status,
        updated_state,
        gr.update(interactive=updated_state["file_uploaded"]),  # 預測按鈕
        gr.update(interactive=False)  # 確認按鈕
    )

def handle_predict(state):
    """
    處理預測事件

    參數:
    - state: 應用程式狀態字典

    返回:
    - 預測狀態訊息
    - 更新後的狀態字典
    - 生成按鈕狀態
    - 預測按鈕狀態
    """
    # 執行預測
    status, updated_state = run_prediction(state)

    return (
        status,
        updated_state,
        gr.update(interactive=updated_state["prediction_done"]),  # 生成按鈕
        gr.update(interactive=False)  # 預測按鈕
    )

def handle_generate(format_choice, state):
    """
    處理生成檔案事件

    參數:
    - format_choice: 檔案格式
    - state: 應用程式狀態字典

    返回:
    - 檔案路徑
    - 生成狀態訊息
    - 更新後的狀態字典
    - 下載按鈕狀態
    - 生成按鈕狀態
    """
    # 生成預測結果檔案
    filepath, status, updated_state = generate_file(format_choice, state)

    return (
        filepath,
        status,
        updated_state,
        gr.update(value=filepath, interactive=filepath is not None),  # 下載按鈕
        gr.update(interactive=False)  # 生成按鈕
    )

def handle_download(filepath):
    """
    處理下載事件

    參數:
    - filepath: 要下載的檔案路徑

    返回:
    - 下載狀態訊息
    - 下載按鈕狀態
    """
    # 處理檔案下載
    status = download_file(filepath)

    return status, gr.update(interactive=False)  # 下載按鈕

# ----------------------- Gradio 界面 -----------------------

def create_gradio_interface():
    """
    建立 Gradio 互動介面

    返回:
    - Gradio 應用程式實例
    """
    # 使用自訂主題和樣式建立 Gradio 界面
    with gr.Blocks(
        delete_cache=(3600, 7200),  # 緩存清理時間
        theme=Base(primary_hue="cyan", secondary_hue="teal", neutral_hue="gray"),
        title="新藥預測工具",
        css="""
        .gradio-container { width: 100% !important; max-width: 800px !important; margin: 0 auto !important; padding: 0.5rem; }
        .gradient-title h1 {
            background: linear-gradient(45deg, #13A9E6, #3DD69E);
            -webkit-background-clip: text;
            background-clip: text;
            color: transparent;
            text-align: center;
            font-size: clamp(2rem, 5vw, 3.5rem);
            font-weight: bold;
        }
        footer { display: none !important; }
        .file-status { font-weight: bold; }
        """
    ) as demo:
        # 標題
        gr.Markdown("# 新藥預測工具", elem_classes=["gradient-title"])

        # 工具說明手風琴
        with gr.Accordion("點此查看工具詳細說明", open=False):
            gr.Markdown("""
                **功能介紹：**
                此工具可將 SMILES 字串的分子資料集根據機器學習模型預測出每個分子是否與指定的
                蛋白質標靶(sEH, BRD4, HSA 其中之一)結合，快速篩選出可能的藥物分子資料集。

                **操作說明：**
                1. 上傳分子數據集
                2. 確認檔案
                3. 執行預測
                4. 選擇下載格式
                5. 產生預測檔案
                6. 下載預測檔案

                **上傳資料限制：**
                1. 支援 CSV 與 Parquet 格式
                2. 包含必要欄位
                3. 檔案大小上限50MB
                4. 資料筆數上限50萬筆

                **必要欄位說明：**
                - molecule_smiles: 分子的 SMILES 字串
                - protein_name: 蛋白質名稱 (必須為 sEH, BRD4, HSA 其中之一)
            """)

        # 初始化應用程式狀態
        state = gr.State(value={
            "df": None,
            "result_df": None,
            "file_uploaded": False,
            "prediction_done": False
        })

        # 界面元件
        with gr.Column():
            # 檔案上傳
            file_input = gr.File(
                label="上傳分子數據集",
                file_types=[".csv", ".parquet"],
                type="filepath"
            )

            # 確認檔案按鈕
            confirm_btn = gr.Button("確認檔案", variant="primary")

            # 檔案狀態顯示
            file_status = gr.Textbox(
                label="檔案確認狀態",
                value="",
                elem_classes=["file-status"],
                interactive=False
            )

            # 預測按鈕
            predict_btn = gr.Button("執行預測", variant="primary", interactive=False)

            # 預測狀態顯示
            predict_status = gr.Textbox(
                label="預測狀態",
                value="",
                elem_classes=["file-status"],
                interactive=False
            )

            # 下載格式選擇
            download_format = gr.Radio(
                choices=["CSV", "Parquet"],
                label="選擇下載格式",
                value="CSV"
            )

            # 生成檔案按鈕
            generate_btn = gr.Button("產生下載檔案", variant="primary", interactive=False)

            # 檔案生成狀態
            generate_status = gr.Textbox(
                label="檔案生成狀態",
                value="",
                elem_classes=["file-status"],
                interactive=False
            )

            # 下載按鈕
            download_btn = gr.DownloadButton(
                label="下載預測結果",
                variant="primary",
                interactive=False
            )

            # 下載狀態
            download_status = gr.Textbox(
                label="下載狀態",
                value="",
                elem_classes=["file-status"],
                interactive=False
            )

        # 事件綁定
        confirm_btn.click(
            fn=handle_confirm,
            inputs=[file_input, state],
            outputs=[file_status, state, predict_btn, confirm_btn]
        )

        predict_btn.click(
            fn=handle_predict,
            inputs=[state],
            outputs=[predict_status, state, generate_btn, predict_btn]
        )

        generate_btn.click(
            fn=handle_generate,
            inputs=[download_format, state],
            outputs=[download_btn, generate_status, state, download_btn, generate_btn]
        )

        download_btn.click(
            fn=handle_download,
            inputs=[download_btn],
            outputs=[download_status, download_btn]
        )

    return demo

def main():
    """
    主程式入口

    啟動 Gradio 伺服器
    限制檔案上傳大小為 50MB
    """
    demo = create_gradio_interface()
    demo.launch(max_file_size=50 * 1024 * 1024)

if __name__ == "__main__":
    main()

In [None]:
# 在 huggingface spaces 創建 process.py
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import rdFingerprintGenerator
from joblib import Parallel, delayed
from huggingface_hub import hf_hub_download
import joblib

def smiles_to_morgan_fingerprint(smiles, n_bits=2048):
    """
    將 SMILES 轉換為 Morgan 指紋。

    參數:
    - smiles: 分子 SMILES 字符串
    - n_bits: 指紋位元數，預設為 2048

    返回:
    - Morgan 指紋的 numpy 數組
    """
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return np.zeros(n_bits, dtype=np.int8)
    generator = rdFingerprintGenerator.GetMorganGenerator(radius=2, fpSize=n_bits)
    return np.array(generator.GetFingerprint(mol), dtype=np.int8)

def parallel_smiles_conversion(smiles_series, n_jobs=2):
    """
    並行將 SMILES 系列轉換為 Morgan 指紋。

    參數:
    - smiles_series: SMILES 字符串的 pandas 系列
    - n_jobs: 並行作業數量，預設為 2

    返回:
    - Morgan 指紋的 numpy 數組
    """
    results = Parallel(n_jobs=n_jobs, backend='loky')(
        delayed(smiles_to_morgan_fingerprint)(smiles) for smiles in smiles_series
    )
    return np.array(results)

def batch_generator(df, batch_size):
    """
    將數據框分批生成。

    參數:
    - df: 數據框
    - batch_size: 每個批次的大小

    返回:
    - 批次數據框的生成器
    """
    total_rows = len(df)
    for start_idx in range(0, total_rows, batch_size):
        end_idx = min(start_idx + batch_size, total_rows)
        yield df.iloc[start_idx:end_idx]

def predict_process(df, batch_size=50000, n_jobs=2):
    """
    對輸入數據框執行預測處理。

    參數:
    - df: 包含 'molecule_smiles' 和 'protein_name' 列的數據框
    - batch_size: 每個批次的大小，預設為 50000
    - n_jobs: 並行作業數量，預設為 2

    返回:
    - 包含預測結果的數據框
    """
    # 載入模型
    model_path = hf_hub_download(repo_id='sinanju/model_voting', filename='voting_model.bin')
    model = joblib.load(model_path)

    all_results = []
    current_id = 1

    for batch_df in batch_generator(df, batch_size):
        mol_smiles = batch_df['molecule_smiles'].copy()
        protein_names = batch_df['protein_name'].copy()

        # 並行轉換 SMILES 為指紋
        batch_fingerprints = parallel_smiles_conversion(batch_df['molecule_smiles'], n_jobs=n_jobs)
        fingerprints_df = pd.DataFrame(batch_fingerprints, index=batch_df.index)

        # 對 protein_name 進行 one-hot 編碼
        protein_onehot = pd.get_dummies(protein_names, prefix='protein').astype(np.int8)
        protein_onehot.index = batch_df.index

        # 合併指紋和 one-hot 編碼
        X_test = pd.concat([fingerprints_df, protein_onehot], axis=1)
        X_test.columns = X_test.columns.astype(str)

        # 預測
        probabilities = model.predict_proba(X_test)[:, 1]
        predictions = (probabilities >= 0.5).astype(np.int8)

        # 構建結果數據框
        batch_result = pd.DataFrame({
            'id': range(current_id, current_id + len(batch_df)),
            'molecule_smiles': mol_smiles,
            'protein_name': protein_names,
            'binds': predictions
        })
        all_results.append(batch_result)
        current_id += len(batch_df)

    # 合併所有批次的結果
    result_df = pd.concat(all_results, ignore_index=True)

    return result_df

In [None]:
# 在 huggingface spaces 創建 file_validator.py
import pandas as pd
import os
import traceback
from rdkit import Chem

def generate_error_message(invalid_smiles, invalid_proteins):
    """
    根據無效的 SMILES 和蛋白質名稱生成詳細的錯誤訊息。

    Args:
        invalid_smiles (list): 包含 (索引, SMILES) 的無效 SMILES 清單
        invalid_proteins (list): 包含 (索引, protein_name) 的無效蛋白質名稱清單

    Returns:
        str: 格式化的錯誤訊息
    """
    error_msg = ""
    if invalid_smiles:
        error_msg += "❌ 以下分子 SMILES 格式無效：\n"
        for idx, smiles in invalid_smiles[:5]:
            error_msg += f"行 {idx + 1}: {smiles}\n"
        if len(invalid_smiles) > 5:
            error_msg += f"...還有 {len(invalid_smiles) - 5} 個無效 SMILES\n"

    if invalid_proteins:
        error_msg += "❌ 以下蛋白質名稱無效（僅允許 sEH, BRD4 或 HSA）：\n"
        for idx, protein in invalid_proteins[:5]:
            error_msg += f"行 {idx + 1}: {protein}\n"
        if len(invalid_proteins) > 5:
            error_msg += f"...還有 {len(invalid_proteins) - 5} 個無效蛋白質名稱\n"

    return error_msg

def validate_file(file, state):
    """
    驗證上傳的檔案是否符合要求，並更新 state。

    Args:
        file (str): 檔案路徑
        state (dict): Gradio 的狀態字典

    Returns:
        tuple: (狀態訊息, 更新後的 state)
    """
    if file is None:
        return "請上傳分子數據集！", state

    try:
        # 定義必要欄位
        required_columns = ["molecule_smiles", "protein_name"]

        # 讀取檔案
        if file.endswith('.csv'):
            encodings = ["utf-8", "Windows-1252"]  # 支援多種編碼
            df = None
            for encoding in encodings:
                try:
                    df = pd.read_csv(file, encoding=encoding, usecols=required_columns)
                    break
                except UnicodeDecodeError:
                    continue
            if df is None:
                raise ValueError("無法解析檔案編碼")
        elif file.endswith('.parquet'):
            df = pd.read_parquet(file, columns=required_columns)
        else:
            return "❌ 不支援的檔案格式（僅接受 .csv 或 .parquet）", state

        # 檢查必要欄位是否存在
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            return f"檔案缺少必要的欄位：{', '.join(missing_columns)}", state

        # 檢查資料量是否超過限制
        if len(df) > 500000:
            return "資料筆數超過 50 萬筆，請減少資料量！", state

        # 檢查是否有缺值
        if df[required_columns].isnull().any().any():
            return "❌ 必要欄位含有缺值，請檢查並補充完整數據！", state

        # 定義有效蛋白質名稱
        valid_proteins = {"sEH", "BRD4", "HSA"}

        # 收集無效資料
        invalid_smiles = []
        invalid_proteins = []
        for row in df.itertuples():
            idx = row.Index
            smiles = row.molecule_smiles
            protein = row.protein_name

            # 檢查 SMILES 格式
            try:
                mol = Chem.MolFromSmiles(smiles)
                if mol is None:
                    invalid_smiles.append((idx, smiles))
            except ValueError:
                invalid_smiles.append((idx, smiles))

            # 檢查蛋白質名稱
            if protein not in valid_proteins:
                invalid_proteins.append((idx, protein))

        # 如果有錯誤，回報詳細訊息
        error_msg = generate_error_message(invalid_smiles, invalid_proteins)
        if error_msg:
            return error_msg, state

        # 更新狀態
        state["df"] = df
        state["file_uploaded"] = True
        state["prediction_done"] = False
        filename = os.path.basename(file)
        return f"✅ 已成功上傳檔案：{filename}，共 {len(df)} 筆資料", state

    except ValueError as ve:
        return f"❌ 檔案內容錯誤：{str(ve)}", state
    except Exception as e:
        error_details = traceback.format_exc()
        print(f"檔案處理錯誤：{str(e)}\n{error_details}")
        return f"❌ 檔案處理錯誤：{str(e)}", state