In [93]:
from googleapiclient.discovery import build
import datetime
import time
API_KEY = 'AIzaSyBBjHEU2XTcP1hn9GVEHt_f4I9jMSIFaPs'
youtube = build('youtube', 'v3', developerKey=API_KEY)


### 爬取影片資料

In [94]:
def get_video_data(video_id):
    request = youtube.videos().list(
        part='statistics, snippet',
        id=video_id
    )
    response = request.execute()

    if 'items' not in response or len(response['items']) == 0:
        return None  # Return None if no data

    item = response['items'][0]
    statistics = item['statistics']
    snippet = item['snippet']

    # Extracting required data
    title = snippet['title']
    views = int(statistics.get('viewCount', 0))
    likes = int(statistics.get('likeCount', 0))
    comments = int(statistics.get('commentCount', 0))
    publish_date = snippet['publishedAt']
    publish_days = (datetime.datetime.now() - datetime.datetime.fromisoformat(publish_date[:-1])).days

    return {
        'title': title,
        'views': views,
        'likes': likes,
        'comments': comments,
        'publish_days': publish_days,
    }

### 解析video_id

In [95]:
def get_video_ids_from_channel(channel_id, max_results=500):
    video_ids = []
    next_page_token = None

    while len(video_ids) < max_results:
        # Fetching at most 50 results at a time
        request = youtube.search().list(
            part='id',
            channelId=channel_id,
            maxResults=50,
            pageToken=next_page_token
        )
        response = request.execute()

        # Extract video IDs
        for item in response['items']:
            if item['id']['kind'] == 'youtube#video':
                video_ids.append(item['id']['videoId'])

        # Check if there's another page of results
        next_page_token = response.get('nextPageToken')
        if not next_page_token:
            break

    return video_ids[:max_results]

###                                                             批量存取影片資料

In [96]:
def batch_video_data(video_ids):
    data = []
    for i in range(0, len(video_ids), 50):
        batch_ids = video_ids[i:i + 50]
        request = youtube.videos().list(
            part='statistics, snippet',
            id=','.join(batch_ids)
        )
        response = request.execute()

        for item in response['items']:
            snippet = item['snippet']
            statistics = item['statistics']
            publish_date = snippet['publishedAt']
            publish_days = (datetime.datetime.now() - datetime.datetime.fromisoformat(publish_date[:-1])).days
            views = int(statistics.get('viewCount', 0))

            # Filter out videos with 0 views
            if views > 0:
                data.append({
                    'video_id': item['id'],  # 添加 video_id
                    'title': snippet['title'],
                    'views': views,
                    'likes': int(statistics.get('likeCount', 0)),
                    'comments': int(statistics.get('commentCount', 0)),
                    'publish_days': publish_days
                })

        # 在每次处理一批视频后延时
        time.sleep(2.0)  # 延时 2 秒

    return data


In [97]:
def get_channel_video_data(channel_id, max_results=5):
    video_ids = get_video_ids_from_channel(channel_id, max_results)
    video_data = batch_video_data(video_ids)
    return video_data

### 訂閱數資料

In [98]:
def get_channel_subscriber_count(channel_id):
    request = youtube.channels().list(
        part='statistics',
        id=channel_id
    )
    response = request.execute()

    if 'items' not in response or len(response['items']) == 0:
        return None

    return int(response['items'][0]['statistics'].get('subscriberCount', 0))

### 由handle來找channel資訊

In [99]:
from googleapiclient.errors import HttpError
def get_channel_id_by_handle(handle):
    try:
        request = youtube.channels().list(
            part='snippet',
            forHandle=handle
        )
        response = request.execute()

        if 'items' in response and len(response['items']) > 0:
            return response['items'][0]['id']
        print("無法找到頻道，請檢查 Handle 是否正確。")
        return None
    except HttpError as e:
        print(f"API 錯誤：{e}")
        return None


In [100]:
from tqdm import tqdm
import os
import pandas as pd


def batch_main(handles, csv_file='youtube_video_data.csv'):
    all_video_data = []

    # 使用 tqdm 進度條
    for handle in tqdm(handles, desc="處理頻道進度", unit="頻道"):
        print(f"正在處理頻道: {handle}")
        # 獲取頻道 ID
        channel_id = get_channel_id_by_handle(handle)
        if not channel_id:
            print(f"無法找到頻道: {handle}")
            continue

        # 獲取頻道視頻數據
        video_data = get_channel_video_data(channel_id)
        if not video_data:
            print(f"未能獲取 {handle} 的任何視頻數據。")
            continue

        # 獲取訂閱數
        subscribers = get_channel_subscriber_count(channel_id)
        for item in video_data:
            item['subscribers'] = subscribers
        # 將數據追加到總數據中
        all_video_data.extend(video_data)

    # 使用 tqdm 進度條寫入數據
    print("正在保存數據到 CSV 文件...")
    with tqdm(total=len(all_video_data), desc="保存數據進度", unit="影片") as pbar:
        if not os.path.exists(csv_file):
            # 如果文件不存在，創建並寫入表頭
            df = pd.DataFrame(all_video_data)
            df.to_csv(csv_file, index=False, mode='w', header=True)
        else:
            # 如果文件存在，追加數據（避免重複）
            existing_df = pd.read_csv(csv_file)
            new_df = pd.DataFrame(all_video_data)
            combined_df = pd.concat([existing_df, new_df]).drop_duplicates(subset='video_id')
            combined_df.to_csv(csv_file, index=False, mode='w', header=True)

        pbar.update(len(all_video_data))

    print("所有數據已成功保存到 CSV 文件：")
    print(pd.read_csv(csv_file))

In [103]:
handles = ['@何電廠']
batch_main(handles)

處理頻道進度:   0%|          | 0/1 [00:00<?, ?頻道/s]

正在處理頻道: @何電廠


處理頻道進度: 100%|██████████| 1/1 [00:02<00:00,  2.44s/頻道]


正在保存數據到 CSV 文件...


保存數據進度: 100%|██████████| 5/5 [00:00<00:00, 1000.88影片/s]

所有數據已成功保存到 CSV 文件：
       video_id                                      title   views  likes  \
0   FgLkUspIsL0               全職獵人405話完整解說：西索本尊現身，旅團黑幫皆有行動  256441   2960   
1   Fy_bHNbWdRU            拳叔講哆啦A夢：哆啦A夢把房子變身無敵列車頭！開著房子去兜風！  154404   1217   
2   61fYLew16O8              臥底名單洩露，庫拉索的選擇，柯南劇場版20 純黑的噩夢解說   77427    653   
3   SHyMV8kt3F0                    獵人解說2:少數服從多數的陷阱，西索的放養計劃  239759   1263   
4   Htyr-4aOqe0                一反常态的基德，解救业火中的向日葵，柯南剧场版19解说   71886    557   
5   3L97GdpRfmk                                          🏀   23026    490   
6   mfZCP0mWAcY                   美食獵人解說37：金之主廚吉集現身，眾人分道揚鑣   54094    507   
7   -g4cZKqvyi8                          全職獵人408話討論：卡丁與墨蓮娜   54328    728   
8   hcuSnYbEXZU               全職獵人406話完整解說：團長盯上卡丁三大神器，送葬開始  132548   1824   
9   UcJSqVdISzk                                      毛巾拿來！  204045   6996   
10  4YZClY7CJcs                                  12強台灣奪冠瞬間  743774  23520   
11  mGGeepjpDxU         天氣炎熱 新莊棒球場涼水招待 #baseball #cpbl中華職




In [102]:
'''import json
import time 
import os
from googleapiclient.discovery import build

API_KEY = 'AIzaSyAZioI1gbrRVHsB8Dylb96npimTouclBB8'
youtube = build('youtube', 'v3', developerKey=API_KEY)

# 進度儲存檔案
PROGRESS_FILE = 'search_progress.json'

def save_progress(handles, next_page_token):
    """儲存搜尋進度到檔案"""
    progress_data = {
        "handles": sorted(list(handles)),  # 確保順序且無重複
        "nextPageToken": next_page_token
    }
    with open(PROGRESS_FILE, 'w', encoding='utf-8') as f:
        json.dump(progress_data, f, ensure_ascii=False, indent=4)

def load_progress():
    """載入搜尋進度"""
    if os.path.exists(PROGRESS_FILE):
        with open(PROGRESS_FILE, 'r', encoding='utf-8') as f:
            progress_data = json.load(f)
            return set(progress_data["handles"]), progress_data["nextPageToken"]
    return set(), None

def get_handles_from_keyword(keyword, max_results=50):
    handles, next_page_token = load_progress()

    while len(handles) < max_results:
        # 搜尋頻道
        search_request = youtube.search().list(
            part='snippet',
            q=keyword,
            type='channel',
            maxResults=50,
            pageToken=next_page_token
        )
        search_response = search_request.execute()

        for item in search_response['items']:
            channel_id = item['snippet']['channelId']

            # 在每次调用频道 API 前加入延时
            time.sleep(3.0)  # 延时 1.5 秒

            # 獲取頻道資訊以提取 handle
            channel_request = youtube.channels().list(
                part='snippet',
                id=channel_id
            )
            channel_response = channel_request.execute()

            for channel in channel_response['items']:
                custom_url = channel['snippet'].get('customUrl', None)
                title = channel['snippet']['title']
                handle = f"@{custom_url}" if custom_url else f"@{title.replace(' ', '').replace('-', '')}"
                handles.add(handle)

        # 儲存進度
        next_page_token = search_response.get('nextPageToken')
        save_progress(handles, next_page_token)

        if not next_page_token:
            break

        # 在每次分页请求之间延时
        time.sleep(3.0)  

    return sorted(list(handles))[:max_results]


# 搜尋台灣相關的頻道
keyword = "台灣"
handles = get_handles_from_keyword(keyword, max_results=10)
print("找到的 Handles:")
batch_main(handles)
'''

'import json\nimport time \nimport os\nfrom googleapiclient.discovery import build\n\nAPI_KEY = \'AIzaSyAZioI1gbrRVHsB8Dylb96npimTouclBB8\'\nyoutube = build(\'youtube\', \'v3\', developerKey=API_KEY)\n\n# 進度儲存檔案\nPROGRESS_FILE = \'search_progress.json\'\n\ndef save_progress(handles, next_page_token):\n    """儲存搜尋進度到檔案"""\n    progress_data = {\n        "handles": sorted(list(handles)),  # 確保順序且無重複\n        "nextPageToken": next_page_token\n    }\n    with open(PROGRESS_FILE, \'w\', encoding=\'utf-8\') as f:\n        json.dump(progress_data, f, ensure_ascii=False, indent=4)\n\ndef load_progress():\n    """載入搜尋進度"""\n    if os.path.exists(PROGRESS_FILE):\n        with open(PROGRESS_FILE, \'r\', encoding=\'utf-8\') as f:\n            progress_data = json.load(f)\n            return set(progress_data["handles"]), progress_data["nextPageToken"]\n    return set(), None\n\ndef get_handles_from_keyword(keyword, max_results=50):\n    handles, next_page_token = load_progress()\n\n    while len(h