In [3]:
!pip install google-api-python-client jieba torch transformers datasets



In [None]:
# flask應用
from flask import Flask, request, jsonify
from flask_cors import CORS

import os
import json
import googleapiclient.discovery
import jieba
import time
import torch
import logging
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from datasets import Dataset
import random

# 設置環境變量以防止 OpenMP 衝突
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
os.environ['KMP_INIT_AT_FORK'] = 'FALSE'

# 設置日志級別
logging.basicConfig(level=logging.INFO)


In [2]:
app = Flask(__name__)
CORS(app)  # 允許跨域請求

logging.basicConfig(level=logging.INFO)

# 初始化 YouTube API 客戶端
def get_youtube_client(api_key):
    return googleapiclient.discovery.build('youtube', 'v3', developerKey=api_key)
# 抓取頂層留言的回覆
def get_comment_replies(parent_id, youtube):
    replies = []
    next_page_token = None

    while True:
        request = youtube.comments().list(
            part="snippet",
            parentId=parent_id,
            pageToken=next_page_token,
            maxResults=100,
            textFormat="plainText"
        )
        response = request.execute()

        for item in response['items']:
            reply = item['snippet']['textDisplay']
            replies.append(reply)

        next_page_token = response.get('nextPageToken')
        if not next_page_token:
            break

    return replies


In [3]:
# 抓取影片的所有留言及其回覆

def get_video_comments(video_id, api_key):
    youtube = get_youtube_client(api_key)
    comments = []
    next_page_token = None

    while True:
        request = youtube.commentThreads().list(
            part="snippet",
            videoId=video_id,
            pageToken=next_page_token,
            maxResults=100,
            textFormat="plainText"
        )
        response = request.execute()

        for item in response['items']:
            top_comment = item['snippet']['topLevelComment']['snippet']['textDisplay']
            comments.append(top_comment)

            # 抓取這個頂層留言的所有回覆
            total_reply_count = item['snippet']['totalReplyCount']
            if (total_reply_count > 0):
                parent_id = item['id']
                replies = get_comment_replies(parent_id, youtube)
                comments.extend(replies)

        next_page_token = response.get('nextPageToken')
        if not next_page_token:
            break

    return comments


In [4]:
# 使用 jieba 進行留言分詞
def segment_comments(comments):
    segmented_comments = [' '.join(jieba.lcut(comment)) for comment in comments]
    return segmented_comments
# 訓練模型

def train_model(train_texts, train_labels, model_save_path):
    model_name = 'bert-base-chinese'
    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = BertForSequenceClassification.from_pretrained(model_name, num_labels=6)

    # 分詞和編碼
    logging.info("Tokenizing the data")
    train_encodings = tokenizer(train_texts, truncation=True, padding=True, max_length=512)
    
    # 創建數據集
    train_dataset = Dataset.from_dict({
        'input_ids': train_encodings['input_ids'],
        'attention_mask': train_encodings['attention_mask'],
        'labels': train_labels
    })

    # 設置訓練參數
    training_args = TrainingArguments(
        output_dir=model_save_path,
        num_train_epochs=3,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10,  # 每隔 10 步打印一次日志
    )

    # 使用 Trainer 訓練模型
    logging.info("Starting training")
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
    )
    trainer.train()
    
    # 保存模型
    trainer.save_model(model_save_path)
    tokenizer.save_pretrained(model_save_path)
    
    return model


In [None]:
# 加載模型
def load_model(model_path):
    model = BertForSequenceClassification.from_pretrained(model_path)
    tokenizer = BertTokenizer.from_pretrained(model_path)
    return model, tokenizer
# 預測情緒
def predict_emotions(model, tokenizer, comments):
    # 分詞處理留言
    inputs = tokenizer(comments, return_tensors='pt', padding=True, truncation=True, max_length=512)
    
    # 進行預測
    model.eval()
    with torch.no_grad():
        outputs = model(**inputs)
    
    logits = outputs.logits
    predictions = torch.argmax(logits, dim=1).tolist()

    return predictions



In [None]:
# 主程式
if __name__ == "__main__":
    # 計時開始
    logging.info("Starting the script")
    tStart = time.time()

    api_key = "AIzaSyAYu6KqHx8E96iIM96WD5saOdF2RbfoQmE"  # 替換為你的 YouTube Data API v3 密鑰
    video_url = input("請輸入影片 URL: ")
    video_id = video_url.split("v=")[1].split("&")[0]  # 確保只取到 video_id

    # 爬取留言
    logging.info("Fetching comments")
    comments = get_video_comments(video_id, api_key)

    # 斷句處理
    logging.info("Segmenting comments")
    segmented_comments = segment_comments(comments)

    # 訓練模型（假設您有標記好的數據集）
    # 這裡使用假數據，您應該替換為實際的訓練數據
    train_texts = []  # 請替換為實際的訓練文本
    train_labels = []  # 請替換為實際的標籤
    with open("trainword.json") as f:
        data = json.load(f)
    cnt = 0
    for i in data:
        for j in i:
            x = random.random()
            if x < 0.0034:
                train_texts.append(j[0])
                train_labels.append(int(j[1]))
    logging.info("Checking for existing model")
    model_save_path = './trained_model'

    if os.path.exists(model_save_path):
        logging.info("Loading existing model")
        model, tokenizer = load_model(model_save_path)
    else:
        logging.info("Training model")
        print("train size:", len(train_texts))
        model = train_model(train_texts, train_labels, model_save_path)
        tokenizer = BertTokenizer.from_pretrained(model_save_path)

    # 進行情緒預測
    logging.info("Predicting emotions")
    predictions = predict_emotions(model, tokenizer, segmented_comments)

    # 輸出預測結果
    for idx, (comment, prediction) in enumerate(zip(comments, predictions)):
        print(f"Comment {idx + 1}: {comment}")
        print(f"Emotion Prediction: {prediction}")
        print("----------------------")

    # 計時結束
    tEnd = time.time()

    # 輸出程式執行的時間
    logging.info(f"Execution took {tEnd - tStart} seconds.")
