In [1]:
# !pip install requests beautifulsoup4
# !pip install selenium
# !pip install selenium
# !pip install webdriver-manager

In [2]:
from selenium.webdriver.chrome.service import Service
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from datetime import datetime, timedelta
import pandas as pd
import re
import os
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
from bs4 import BeautifulSoup
import subprocess
import sys

In [3]:
# 設置基本路徑
save_path = r'D:\Min\Python\Project\FA_Data\KY__buy_sell'
meta_data_path = r'D:\Min\Python\Project\FA_Data\meta_data'
meta_filename = os.path.join(meta_data_path, 'KY__buy_sell.csv')

# 確保目錄存在
os.makedirs(save_path, exist_ok=True)
os.makedirs(meta_data_path, exist_ok=True)

def get_chrome_options():
    """設置Chrome選項"""
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')
    options.add_argument('--disable-gpu')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--disable-extensions')
    options.add_argument('--disable-software-rasterizer')
    options.add_argument('--disable-features=VizDisplayCompositor')
    options.add_argument('--disable-features=NetworkService')
    options.add_experimental_option('excludeSwitches', ['enable-logging'])
    return options

@contextmanager
def get_driver():
    """創建和管理WebDriver實例"""
    driver = None
    try:
        service = Service()
        options = get_chrome_options()
        driver = webdriver.Chrome(service=service, options=options)
        yield driver
    except Exception as e:
        print(f"創建driver時發生錯誤: {str(e)}")
        try:
            driver_manager = ChromeDriverManager()
            driver_path = driver_manager.install()
            service = Service(executable_path=driver_path)
            driver = webdriver.Chrome(service=service, options=options)
            yield driver
        except Exception as backup_error:
            print(f"備用方法也失敗: {str(backup_error)}")
            raise
    finally:
        if driver:
            driver.quit()

In [4]:
@contextmanager
def get_driver():
    """創建和管理WebDriver實例"""
    driver = None
    try:
        service = Service()
        options = get_chrome_options()
        driver = webdriver.Chrome(service=service, options=options)
        yield driver
    except Exception as e:
        print(f"創建driver時發生錯誤: {str(e)}")
        try:
            driver_manager = ChromeDriverManager()
            driver_path = driver_manager.install()
            service = Service(executable_path=driver_path)
            driver = webdriver.Chrome(service=service, options=options)
            yield driver
        except Exception as backup_error:
            print(f"備用方法也失敗: {str(backup_error)}")
            raise
    finally:
        if driver:
            driver.quit()

def get_existing_dates():
    """獲取已經處理過的日期列表"""
    existing_files = os.listdir(save_path)
    return {filename.split('_')[0] for filename in existing_files if filename.endswith('_buy_sell.csv')}

def fetch_and_save_data(date_str):
    """爬取並保存特定日期的數據"""
    try:
        filename = f'{save_path}\\{date_str.replace("-", "")}_buy_sell.csv'
        
        url = f'https://5850web.moneydj.com/z/zg/zgb/zgb0.djhtm?a=8450&b=0038003400350042&c=E&e={date_str}&f={date_str}'
        
        with get_driver() as driver:
            print(f"開始處理日期：{date_str}")
            driver.get(url)
            
            wait = WebDriverWait(driver, 20)
            try:
                wait.until(EC.presence_of_element_located((By.TAG_NAME, "table")))
            except Exception as e:
                print(f"等待頁面載入時超時: {date_str}")
                return

            html = driver.page_source
        
        soup = BeautifulSoup(html, 'html.parser')
        tables = soup.find_all('table')
        
        if len(tables) >= 15:
            # 處理買超資料
            buy_table = tables[13]
            buy_rows = buy_table.find_all('tr')[2:]
            buy_data = []
            
            for row in buy_rows:
                cols = row.find_all('td')
                if len(cols) == 4:
                    buy_data.append([
                        cols[0].get_text(strip=True),
                        cols[1].get_text(strip=True).replace(',', ''),
                        cols[2].get_text(strip=True).replace(',', ''),
                        cols[3].get_text(strip=True).replace(',', '')
                    ])

            buy_df = pd.DataFrame(buy_data, columns=['券商名稱', '買進金額', '賣出金額', '差額'])

            # 處理賣超資料
            sell_table = tables[14]
            sell_rows = sell_table.find_all('tr')[2:]
            sell_data = []
            
            for row in sell_rows:
                cols = row.find_all('td')
                if len(cols) == 4:
                    sell_data.append([
                        cols[0].get_text(strip=True),
                        cols[1].get_text(strip=True).replace(',', ''),
                        cols[2].get_text(strip=True).replace(',', ''),
                        cols[3].get_text(strip=True).replace(',', '')
                    ])

            sell_df = pd.DataFrame(sell_data, columns=['券商名稱', '買進金額', '賣出金額', '差額'])

            if not buy_df.empty or not sell_df.empty:
                combined_df = pd.concat([buy_df, sell_df], keys=['買超', '賣超'])
                combined_df.to_csv(filename, index=True, encoding='utf-8-sig')
                print(f"資料已成功寫入: {date_str}")
                return True
            else:
                print(f"日期 {date_str} 沒有交易數據")
                return False
                
        else:
            print(f"找不到足夠的表格，日期：{date_str}")
            return False
            
    except Exception as e:
        print(f"處理日期 {date_str} 時發生錯誤: {str(e)}")
        return False

def split_broker_name(broker_name):
    """分拆券商名稱為證券代號和證券名稱"""
    match = re.match(r'([\dA-Z]+)([^\dA-Z]+)', broker_name)
    if match:
        return match.groups()
    else:
        return '', broker_name

def merge_data_files():
    """合併所有數據文件到元數據檔案"""
    print("開始合併數據文件...")
    
    # 獲取所有文件並排序
    files = []
    for file in os.listdir(save_path):
        if file.endswith('_buy_sell.csv'):
            date_str = file.split('_')[0]
            try:
                date = datetime.strptime(date_str, '%Y%m%d')
                files.append((file, date))
            except ValueError:
                print(f"忽略無效的文件名格式: {file}")
    
    files.sort(key=lambda x: x[1])
    
    if not files:
        print("沒有找到可處理的數據文件")
        return
    
    print(f"找到 {len(files)} 個數據文件")
    
    # 讀取現有的元數據（如果存在）
    existing_dates = set()
    if os.path.exists(meta_filename):
        try:
            existing_meta_df = pd.read_csv(meta_filename)
            existing_dates = set(existing_meta_df['日期'].unique())
            print(f"現有元數據包含 {len(existing_dates)} 個交易日")
        except Exception as e:
            print(f"讀取現有元數據時發生錯誤: {str(e)}")
            if os.path.exists(meta_filename):
                backup_file = meta_filename + '.bak'
                os.rename(meta_filename, backup_file)
                print(f"已備份原文件為: {backup_file}")
    
    # 處理新文件
    new_data = []
    for file_name, file_date in files:
        date_str = file_date.strftime('%Y-%m-%d')
        
        if date_str in existing_dates:
            print(f"日期 {date_str} 已存在，跳過")
            continue
            
        try:
            # 讀取CSV文件
            df = pd.read_csv(os.path.join(save_path, file_name))
            
            if df.empty:
                continue
                
            # 處理多層索引
            if 'Unnamed: 0' in df.columns:  # 檢查是否有索引列
                # 假設這是交易類型列
                df['交易類型'] = df['Unnamed: 0'].map({'買超': '買超', '賣超': '賣超'})
                df = df.drop(columns=['Unnamed: 0'])
            
            # 添加日期
            df['日期'] = date_str
            
            # 處理券商名稱
            if '券商名稱' in df.columns:
                split_result = df['券商名稱'].apply(split_broker_name)
                df['證券代號'] = split_result.apply(lambda x: x[0])
                df['證券名稱'] = split_result.apply(lambda x: x[1])
                df = df.drop(columns=['券商名稱'])
            
            # 確保所有需要的列都存在
            required_columns = ['日期', '交易類型', '證券代號', '證券名稱', '買進金額', '賣出金額', '差額']
            for col in required_columns:
                if col not in df.columns:
                    print(f"文件 {file_name} 缺少列 {col}，嘗試修復...")
                    if col == '交易類型':
                        # 根據數據特徵判斷交易類型
                        df['交易類型'] = df.apply(
                            lambda row: '買超' if pd.to_numeric(row['差額'].replace(',', '')) > 0 else '賣超', 
                            axis=1
                        )
            
            # 統一數據類型
            for col in ['買進金額', '賣出金額', '差額']:
                if col in df.columns:
                    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), errors='coerce')
            
            # 確保列的順序一致
            df = df.reindex(columns=required_columns)
            
            new_data.append(df)
            print(f"處理完成: {file_name}")
            
        except Exception as e:
            print(f"處理文件 {file_name} 時發生錯誤: {str(e)}")
            continue
    
    if new_data:
        try:
            # 合併所有新數據
            new_data_df = pd.concat(new_data, ignore_index=True)
            
            # 合併新舊數據
            if existing_dates:
                final_df = pd.concat([existing_meta_df, new_data_df], ignore_index=True)
            else:
                final_df = new_data_df
            
            # 確保所有必要的列都存在
            for col in ['日期', '交易類型', '證券代號', '證券名稱', '買進金額', '賣出金額', '差額']:
                if col not in final_df.columns:
                    print(f"最終數據缺少列 {col}")
                    if col == '交易類型':
                        final_df['交易類型'] = final_df.apply(
                            lambda row: '買超' if pd.to_numeric(row['差額'].replace(',', '')) > 0 else '賣超', 
                            axis=1
                        )
            
            # 排序和去重
            final_df = final_df.sort_values(['日期', '交易類型', '證券代號'])
            final_df = final_df.drop_duplicates()
            
            # 保存
            final_df.to_csv(meta_filename, index=False, encoding='utf-8-sig')
            print(f"元數據更新完成: {meta_filename}")
            print(f"總記錄數: {len(final_df)}")
            print(f"交易日數: {len(final_df['日期'].unique())}")
            
        except Exception as e:
            print(f"保存元數據時發生錯誤: {str(e)}")
            print("錯誤詳情:")
            import traceback
            traceback.print_exc()
    else:
        print("沒有新的數據需要處理")

def crawl_data(days=150):
    """執行爬蟲任務"""
    print(f"開始爬取最近 {days} 天的數據...")
    
    # 設置日期範圍
    end_date = datetime.today() + timedelta(days=1)
    start_date = end_date - timedelta(days=days)
    
    # 獲取已存在的檔案
    existing_dates = get_existing_dates()
    
    # 建立需要處理的日期列表
    dates_to_process = []
    current_date = start_date
    while current_date <= end_date:
        if current_date.weekday() < 5:  # 只處理週一到週五
            date_str = current_date.strftime('%Y%m%d')
            if date_str not in existing_dates:
                dates_to_process.append(current_date.strftime('%Y-%m-%d'))
        current_date += timedelta(days=1)
    
    if not dates_to_process:
        print("沒有新的日期需要處理")
        return
    
    print(f"需要處理 {len(dates_to_process)} 天的數據")
    
    # 使用線程池處理
    with ThreadPoolExecutor(max_workers=1) as executor:
        futures = []
        for date_str in dates_to_process:
            futures.append(executor.submit(fetch_and_save_data, date_str))
            time.sleep(random.uniform(3, 7))  # 隨機延遲
        
        # 等待所有任務完成
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"執行任務時發生錯誤: {str(e)}")

# 分別定義爬蟲和合併的函數，方便在notebook中分別呼叫
def run_crawler(days=150):
    """執行爬蟲"""
    try:
        crawl_data(days)
        print("爬蟲完成")
    except Exception as e:
        print(f"爬蟲過程中發生錯誤: {str(e)}")

def run_merge():
    """執行合併"""
    try:
        merge_data_files()
        print("合併完成")
    except Exception as e:
        print(f"合併過程中發生錯誤: {str(e)}")

def run_all(days=150):
    """執行完整流程"""
    run_crawler(days)
    run_merge()

In [6]:
if __name__ == "__main__":
    run_all()

開始爬取最近 150 天的數據...
需要處理 18 天的數據
開始處理日期：2024-07-24
日期 2024-07-24 沒有交易數據
開始處理日期：2024-07-25
日期 2024-07-25 沒有交易數據
開始處理日期：2024-09-17
日期 2024-09-17 沒有交易數據
開始處理日期：2024-10-02
日期 2024-10-02 沒有交易數據
開始處理日期：2024-10-03
日期 2024-10-03 沒有交易數據
開始處理日期：2024-10-10
日期 2024-10-10 沒有交易數據
開始處理日期：2024-10-29
資料已成功寫入: 2024-10-29
開始處理日期：2024-10-30
資料已成功寫入: 2024-10-30
開始處理日期：2024-10-31
日期 2024-10-31 沒有交易數據
開始處理日期：2024-11-01
資料已成功寫入: 2024-11-01
開始處理日期：2024-11-04
資料已成功寫入: 2024-11-04
開始處理日期：2024-11-05
資料已成功寫入: 2024-11-05
開始處理日期：2024-11-06
資料已成功寫入: 2024-11-06
開始處理日期：2024-11-07
資料已成功寫入: 2024-11-07
開始處理日期：2024-11-08
資料已成功寫入: 2024-11-08
開始處理日期：2024-11-11
資料已成功寫入: 2024-11-11
開始處理日期：2024-11-12
資料已成功寫入: 2024-11-12
開始處理日期：2024-11-13
日期 2024-11-13 沒有交易數據
爬蟲完成
開始合併數據文件...
找到 492 個數據文件
現有元數據包含 482 個交易日
日期 2022-10-28 已存在，跳過
日期 2022-10-31 已存在，跳過
日期 2022-11-01 已存在，跳過
日期 2022-11-02 已存在，跳過
日期 2022-11-03 已存在，跳過
日期 2022-11-04 已存在，跳過
日期 2022-11-07 已存在，跳過
日期 2022-11-08 已存在，跳過
日期 2022-11-09 已存在，跳過
日期 2022-11-10 已存在，跳過
日期 2022-11-11 已存在，跳