<a href="https://colab.research.google.com/github/sophielu05/114-1-/blob/main/HW4_%E6%96%87%E5%AD%97%E8%B3%87%E6%96%99%E5%B0%8F%E5%88%86%E6%9E%90.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [140]:
!pip -q install gspread gspread_dataframe google-auth google-auth-oauthlib google-auth-httplib2 \
               gradio pandas beautifulsoup4 google-generativeai python-dateutil

In [141]:
import os, time, uuid, re, json, datetime
from datetime import datetime as dt, timedelta
from dateutil.tz import gettz
import pandas as pd
import gradio as gr
import requests
from bs4 import BeautifulSoup

import google.generativeai as genai

# Google Auth & Sheets
from google.colab import auth
import gspread
from gspread_dataframe import set_with_dataframe, get_as_dataframe
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google.auth import default

In [142]:
from google.colab import auth
auth.authenticate_user()

import gspread
from google.auth import default
creds, _ = default()

gc = gspread.authorize(creds)

In [143]:
from google.colab import userdata

# 從 Colab Secrets 中獲取 API 金鑰
api_key = userdata.get('lty')

# 使用獲取的金鑰配置 genai
genai.configure(api_key=api_key)

model = genai.GenerativeModel('gemini-2.5-pro')

In [144]:
SHEET_URL = "https://docs.google.com/spreadsheets/d/10mSu-t3IZxMocPZyGKaP6ria3MxIlYnsTgFpU11Oh2s/edit?gid=0#gid=0"
WORKSHEET_NAME = "工作表1"
TIMEZONE = "Asia/Taipei"

In [145]:
PTT_HEADER = [
    "post_id","title","url","date","author","nrec","created_at",
    "fetched_at","content"
]
TERMS_HEADER = ["term","freq","df_count","tfidf_mean","examples"]

In [146]:
def ensure_spreadsheet(name):
    try:
        sh = gc.open(name)  # returns gspread.models.Spreadsheet
    except gspread.SpreadsheetNotFound:
        sh = gc.create(name)
    return sh

sh = ensure_spreadsheet(WORKSHEET_NAME)

In [147]:
def ensure_worksheet(sh, title, header):
    try:
        ws = sh.worksheet(title)
    except gspread.WorksheetNotFound:
        ws = sh.add_worksheet(title=title, rows="1000", cols=str(len(header)+5))
        ws.update([header])
    # 若沒有表頭就補上
    data = ws.get_all_values()
    if not data or (data and data[0] != header):
        ws.clear()
        ws.update([header])
    return ws

In [148]:
ws_ptt_posts = ensure_worksheet(sh, "ptt_movie_posts", PTT_HEADER)
ws_ptt_terms = ensure_worksheet(sh, "ptt_movie_terms", TERMS_HEADER)

In [149]:
import requests
from bs4 import BeautifulSoup
import gspread
from oauth2client.service_account import ServiceAccountCredentials
import pandas as pd
import jieba
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
import json
import time
import gradio as gr
from typing import Tuple, List

# --- 配置區 ---
# *** 請替換成您的 Gemini API Key (留空的話，Canvas 會自動注入) ***
GEMINI_API_KEY = "lty"
GEMINI_API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-09-2025:generateContent"
# 爬蟲設定
PTT_URL = 'https://www.ptt.cc'
STARTING_BOARD_URL = 'https://www.ptt.cc/bbs/movie/index.html'
TARGET_PAGES = 10
# Google Sheet 設定
SHEET_NAME = "PTT Movie Analysis"
JSON_KEY_FILE = 'service_account.json'
# TF-IDF 設定
TOP_N_KEYWORDS = 20
# Gradio UI 設定
TITLE = "PTT Movie 版數據分析工具"

# --- 輔助函式：API 呼叫與重試機制 ---

def call_gemini_api(prompt: str, system_instruction: str) -> str:
    """串接 Gemini API 並處理指數退避重試"""
    if not GEMINI_API_KEY:
        # 如果在 Canvas 環境運行，API Key 將自動注入
        api_key_placeholder = "lty"
    else:
        api_key_placeholder = GEMINI_API_KEY

    url = f"{GEMINI_API_URL}?key={api_key_placeholder}"

    # payload 結構
    payload = {
        "contents": [{"parts": [{"text": prompt}]}],
        "systemInstruction": {"parts": [{"text": system_instruction}]},
    }

    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = requests.post(
                url,
                headers={'Content-Type': 'application/json'},
                data=json.dumps(payload)
            )
            response.raise_for_status()  # 檢查 HTTP 錯誤

            result = response.json()

            # 解析文本內容
            text = result.get('candidates', [{}])[0].get('content', {}).get('parts', [{}])[0].get('text', '').strip()

            if text:
                return text
            else:
                print(f"API 響應成功但未包含文本內容: {result}")
                return "API 返回結果結構異常，請檢查日誌。"

        except requests.exceptions.RequestException as e:
            print(f"API 呼叫失敗 (嘗試 {attempt + 1}/{max_retries}): {e}")
            if attempt < max_retries - 1:
                # 指數退避
                wait_time = 2 ** attempt
                print(f"等待 {wait_time} 秒後重試...")
                time.sleep(wait_time)
            else:
                return f"API 呼叫失敗，已達到最大重試次數。錯誤: {e}"
        except json.JSONDecodeError:
            print("API 返回的不是有效的 JSON。")
            return "API 返回資料解析失敗。"
    return "API 呼叫最終失敗。"

# --- 步驟 1: PTT 爬蟲 ---

def crawl_ptt(start_url: str, pages: int) -> pd.DataFrame:
    """爬取 PTT Movie 版指定頁數的文章列表"""
    global PTT_URL

    print(f"開始爬取 PTT Movie 版，共 {pages} 頁...")
    all_posts = []
    current_url = start_url

    for i in range(pages):
        print(f"  > 正在爬取第 {i+1} 頁: {current_url}")
        try:
            # 設置 User-Agent 模仿瀏覽器
            headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}

            # 修正爬蟲問題：加入 over18 Cookie 以繞過 PTT 年齡確認頁面
            cookies = {'over18': '1'}

            response = requests.get(current_url, headers=headers, cookies=cookies)
            response.raise_for_status() # 檢查 HTTP 狀態碼

            soup = BeautifulSoup(response.text, 'html.parser')

            posts = soup.find_all('div', class_='r-ent')

            for post in posts:
                title_tag = post.find('div', class_='title').find('a')

                if title_tag:
                    title = title_tag.text.strip()
                    href = PTT_URL + title_tag['href']
                else:
                    # 處理被刪除的文章
                    title = '[已被刪除]'
                    href = None

                author = post.find('div', class_='author').text.strip()
                date = post.find('div', class_='date').text.strip()

                all_posts.append({
                    'Title': title,
                    'Author': author,
                    'Date': date,
                    'Href': href,
                    'Keywords': '', # 預留給分析結果
                })

            # 尋找「上頁」連結以繼續爬取
            paging = soup.find('div', class_='btn-group btn-group-paging')
            if paging:
                # PTT 頁面的第二個 'a' 連結是 '上頁' (即更舊的文章頁面)
                prev_page_btn = paging.find_all('a')[1]
                if 'href' in prev_page_btn.attrs:
                    current_url = PTT_URL + prev_page_btn['href']
                else:
                    print("  > 已到達最舊的文章頁面，停止爬蟲。")
                    break
            else:
                print("  > 無法找到分頁按鈕，停止爬蟲。")
                break

        except requests.exceptions.RequestException as e:
            print(f"爬取第 {i+1} 頁失敗: {e}")
            break
        except Exception as e:
            print(f"處理第 {i+1} 頁時發生錯誤: {e}")
            break

    df = pd.DataFrame(all_posts)
    print(f"爬蟲完成。共抓取 {len(df)} 篇文章。")
    return df

# --- 步驟 2 & 4: Google Sheets 讀寫 ---

def sheets_io_write(df_posts: pd.DataFrame, df_keywords: pd.DataFrame) -> str:
    """將爬蟲結果和關鍵字分析結果寫入 Google Sheets"""
    try:
        # 使用服務帳號認證
        scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
        # 嘗試從 JSON_KEY_FILE 載入憑證
        creds = ServiceAccountCredentials.from_json_keyfile_name(JSON_KEY_FILE, scope)
        gc = gspread.authorize(creds)

        # 開啟或建立試算表
        try:
            sh = gc.open(SHEET_NAME)
            print(f"成功開啟 Google Sheet: '{SHEET_NAME}'。")
        except gspread.SpreadsheetNotFound:
            print(f"Google Sheet '{SHEET_NAME}' 不存在，正在建立新的試算表...")
            sh = gc.create(SHEET_NAME)
            # 建議用戶手動共享： sh.share(None, perm_type='user', role='writer', emailAddress=creds.service_account_email)
            # 注意：這裡無法自動共享，使用者必須手動將服務帳號 Email 加入編輯者
            print(f"✅ 已建立新的 Google Sheet。請務必將服務帳號 {creds.service_account_email} 加入編輯者名單。")

        # 工作表標題
        worksheet_title_data = "PTT_Data"
        worksheet_title_analysis = "Keywords_Analysis"

        # --- 寫入爬蟲結果 (PTT_Data) ---
        data_msg = "PTT_Data 寫入狀態: "
        if not df_posts.empty:
            try:
                # 嘗試取得現有工作表
                worksheet_data = sh.worksheet(worksheet_title_data)
                worksheet_data.clear()
            except gspread.WorksheetNotFound:
                # 找不到則建立新的工作表
                # add_worksheet 至少需要 1 列和 1 行
                worksheet_data = sh.add_worksheet(title=worksheet_title_data, rows="1", cols="1")

            # 使用 dataframe 寫入，包含標頭
            sh.values_update(
                worksheet_title_data,
                params={'valueInputOption': 'USER_ENTERED'},
                # 結合標題行和數據行
                body={'values': [df_posts.columns.values.tolist()] + df_posts.values.tolist()}
            )
            data_msg += f"✅ 爬蟲結果已成功寫入。"
        else:
            data_msg += "❌ 爬蟲結果為空，跳過寫入。"


        # --- 寫入 TF-IDF 分析結果 (Keywords_Analysis) ---
        keywords_msg = "Keywords_Analysis 寫入狀態: "
        if not df_keywords.empty:
            try:
                worksheet_analysis = sh.worksheet(worksheet_title_analysis)
                worksheet_analysis.clear()
            except gspread.WorksheetNotFound:
                worksheet_analysis = sh.add_worksheet(title=worksheet_title_analysis, rows="1", cols="1")

            # 使用 dataframe 寫入，包含標頭
            sh.values_update(
                worksheet_title_analysis,
                params={'valueInputOption': 'USER_ENTERED'},
                # 結合標題行和數據行
                body={'values': [df_keywords.columns.values.tolist()] + df_keywords.values.tolist()}
            )
            keywords_msg += "✅ 關鍵字分析結果已成功寫入。"
        else:
            keywords_msg += "❌ 關鍵字分析結果為空，跳過寫入。"

        return f"--- Google Sheet 同步狀態 ---\n{data_msg}\n{keywords_msg}"

    except gspread.exceptions.APIError as e:
        error_details = json.loads(e.response.text).get('error', {}).get('message', '未知 API 錯誤')
        print(f"Google Sheets API 錯誤: {error_details}")
        return f"❌ Google Sheets API 錯誤！請檢查服務帳號是否有**編輯**權限。錯誤詳情：{error_details}"
    except FileNotFoundError:
        print(f"錯誤：找不到服務帳號金鑰檔案 '{JSON_KEY_FILE}'。")
        return f"❌ 錯誤：找不到金鑰檔案 '{JSON_KEY_FILE}'。請確保檔案已上傳到程式碼同一目錄。"
    except Exception as e:
        print(f"Google Sheets 操作失敗: {e}")
        return f"❌ Google Sheets 操作失敗: {e}"

# --- 步驟 3: 詞頻與關鍵字統計 (TF-IDF) ---

def analyze_text(df: pd.DataFrame, top_n: int) -> Tuple[pd.DataFrame, str]:
    """使用 TF-IDF 進行中文分詞和關鍵字提取"""

    if df.empty:
        return pd.DataFrame(), "錯誤：輸入數據為空，無法進行分析。"

    # 合併所有標題作為文本資料
    documents = df['Title'].tolist()

    # 執行中文分詞
    corpus = []
    print("開始執行 Jieba 中文分詞...")
    for doc in documents:
        # 過濾掉刪除的文章標記
        if doc == '[已被刪除]':
            continue
        # 使用 jieba.cut 進行分詞，並將結果以空格連接
        words = jieba.cut(doc)
        corpus.append(" ".join(words))

    if not corpus:
        return pd.DataFrame(), "錯誤：所有文章標題皆為空或已刪除，無法進行分析。"

    # 建立 TF-IDF 模型
    # min_df=5: 排除出現次數過少的詞彙，提升關鍵字品質
    # max_df=0.8: 排除過於常見的詞彙 (如 '的', '是', '了')
    vectorizer = TfidfVectorizer(min_df=5, max_df=0.8)

    try:
        tfidf_matrix = vectorizer.fit_transform(corpus)
    except ValueError as e:
        print(f"TFIDF fit_transform 失敗: {e}")
        return pd.DataFrame(), "錯誤：TF-IDF 處理失敗，詞彙量不足或設定太嚴格。"

    feature_names = vectorizer.get_feature_names_out()

    # 計算每個詞彙的平均 TF-IDF 分數 (作為關鍵字熱度)
    # 取矩陣的總和，除以文章數，得到平均權重
    scores = np.asarray(tfidf_matrix.sum(axis=0)).ravel()

    # 建立關鍵字 DataFrame
    df_keywords = pd.DataFrame({'Keyword': feature_names, 'TFIDF_Score': scores})

    # 排序並取得前 N 個熱詞
    df_keywords = df_keywords.sort_values(by='TFIDF_Score', ascending=False).head(top_n).reset_index(drop=True)

    # 將分數格式化為小數點後兩位
    df_keywords['TFIDF_Score'] = df_keywords['TFIDF_Score'].round(2)

    # 生成熱詞字串用於 LLM 摘要
    hot_words_list = df_keywords['Keyword'].tolist()
    hot_words_str = "、".join(hot_words_list)

    print(f"TF-IDF 分析完成。前 {top_n} 熱詞: {hot_words_str[:100]}...")

    return df_keywords, hot_words_str

# --- 步驟 5: Gemini API 摘要 ---

def generate_insight(hot_words: str, titles: List[str]) -> str:
    """使用 Gemini API 根據關鍵字和標題生成洞察摘要和結論"""
    if not hot_words:
        return "無法生成摘要，因為沒有提取到關鍵字。"

    # 將標題列表轉為單一字串，限制長度以避免超過 API 限制
    titles_sample = "\n".join(titles[:50]) # 取前 50 篇文章標題作為樣本

    system_prompt = (
        "你是一位專業的影視市場分析師，專門分析 PTT 上的電影討論熱度。你的回應必須是繁體中文。"
        "任務是根據提供的電影版文章標題和 TF-IDF 熱詞，總結出市場洞察。"
        "請嚴格按照以下格式輸出："
        "1. 五句簡潔有力的市場洞察摘要。"
        "2. 一段 120 字的總結結論。"
    )

    user_query = (
        "請根據以下的資訊，生成五句洞察摘要和一段 120 字的結論：\n\n"
        f"【關鍵字熱詞 (依熱度排序)】: {hot_words}\n\n"
        f"【近期文章標題範例 (部分)】:\n{titles_sample}\n\n"
        "請嚴格按照要求，先是五句洞察摘要，接著是一段 120 字的總結結論。"
    )

    print("呼叫 Gemini API 進行文本摘要與洞察生成...")
    llm_output = call_gemini_api(user_query, system_prompt)

    return llm_output

# --- 步驟 6: 完整執行流程 ---

def full_pipeline(pages: int, top_n: int) -> Tuple[str, str, str]:
    """執行整個爬蟲、分析、寫入和 LLM 摘要的自動化流程"""

    # 0. 初始化狀態
    df_posts_result = pd.DataFrame()
    df_keywords_result = pd.DataFrame()
    hot_words_str = ""
    llm_insight = "等待執行..."

    try:
        result_msg = "--- 執行狀態 ---\n"

        # 1. 執行 PTT 爬蟲
        df_posts_result = crawl_ptt(STARTING_BOARD_URL, pages)

        if df_posts_result.empty:
            result_msg += "❌ 爬蟲失敗或未抓取到任何文章。可能是網路問題或 PTT 頁面結構變動。\n"
            # 即使爬蟲失敗，也嘗試進行 Sheets 寫入，以便將空數據報告到 Sheets
            pass # 流程繼續
        else:
            result_msg += f"✅ 爬蟲成功：共 {len(df_posts_result)} 篇文章。\n"

        # 2. 詞頻與關鍵字統計 (TF-IDF)
        if not df_posts_result.empty:
            df_keywords_result, hot_words_str = analyze_text(df_posts_result, top_n)

            if df_keywords_result.empty:
                result_msg += "❌ TF-IDF 分析失敗。\n"
            else:
                result_msg += f"✅ TF-IDF 分析成功：已提取前 {top_n} 個熱詞。\n"
        else:
            result_msg += "ℹ️ 爬蟲結果為空，跳過 TF-IDF 分析。\n"

        # 3. 寫入 Google Sheet (回寫統計表和爬蟲結果)
        sheet_write_status = sheets_io_write(df_posts=df_posts_result, df_keywords=df_keywords_result)
        result_msg += sheet_write_status + "\n"

        # 4. 串接 Gemini API 生成摘要
        titles_for_llm = df_posts_result['Title'].tolist() if not df_posts_result.empty else []
        if hot_words_str and titles_for_llm:
            llm_insight = generate_insight(hot_words_str, titles_for_llm)
            result_msg += "✅ LLM 洞察摘要已生成。\n"
        else:
            llm_insight = "無法生成摘要，因為爬蟲或關鍵字提取失敗。"
            result_msg += "❌ LLM 摘要生成失敗。\n"

        # 5. 準備 Gradio 輸出

        # 格式化關鍵字 DataFrame 為 Markdown 輸出
        df_analysis_output = df_keywords_result.to_markdown(index=False) if not df_keywords_result.empty else "無關鍵字分析結果。"

        # 格式化 LLM 輸出
        llm_display = "--- 市場洞察與結論 ---\n" + llm_insight

        result_msg += "\n--- 流程完成 ---"
        return result_msg, df_analysis_output, llm_display

    except Exception as e:
        error_msg = f"致命錯誤：流程執行中發生未預期的錯誤 -> {e}"
        print(error_msg)
        return error_msg, "發生錯誤", "發生錯誤"

# --- Gradio 介面 ---

# 提示使用者準備金鑰
initial_message = (
    "## ⚙️ 環境準備與除錯\n"
    "如果您遇到 Google Sheet 寫入失敗，請務必檢查以下兩點：\n"
    "1. **Google Sheets 服務帳號金鑰**：名為 `service_account.json` 的檔案是否已上傳到此腳本的同一目錄。\n"
    "2. **Sheet 權限**：請確認您服務帳號的 Email 已被加入您 Google Sheet 的**編輯者**名單中。\n"
    "3. **Jieba 詞典**：程式會自動使用 Jieba 進行中文分詞。\n"
)

# 定義 Gradio 介面
with gr.Blocks(title=TITLE) as demo:
    gr.Markdown(f"# {TITLE}")
    gr.Markdown(initial_message)

    with gr.Row():
        pages_input = gr.Slider(
            minimum=1,
            maximum=20,
            step=1,
            value=TARGET_PAGES,
            label="爬蟲頁數 (PTT Movie 版)",
            info="選擇從最新頁開始往前抓取的頁數。"
        )
        keywords_input = gr.Slider(
            minimum=5,
            maximum=50,
            step=1,
            value=TOP_N_KEYWORDS,
            label="TF-IDF 熱詞數量 (N)",
            info="選擇要提取和回寫到 Google Sheet 的熱門關鍵字數量。"
        )

    run_button = gr.Button("一鍵執行爬蟲、分析與摘要")

    # 狀態輸出
    status_output = gr.Textbox(label="執行狀態與進度（包含 Google Sheet 寫入狀態）", lines=10, show_copy_button=True)

    # 分析結果輸出
    with gr.Row():
        keyword_output = gr.Markdown(label="TF-IDF 關鍵字統計結果 (Markdown)", show_copy_button=True)
        llm_output = gr.Textbox(label="Gemini AI 洞察摘要與結論", lines=15, show_copy_button=True)

    # 綁定按鈕和執行函數
    # full_pipeline 輸出 (status_msg, df_markdown, llm_insight)
    run_button.click(
        full_pipeline,
        inputs=[pages_input, keywords_input],
        outputs=[status_output, keyword_output, llm_output]
    )

# 在 Jupyter/Colab/Canvas 環境中，需要呼叫 launch() 來顯示介面
if __name__ == "__main__":
    demo.launch()

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://cfcdd2d1c06846c269.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
