In [1]:
import time
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.edge.service import Service
from selenium.common.exceptions import TimeoutException, StaleElementReferenceException
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from datetime import datetime
import threading
import pandas as pd 
# 設定日誌
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'currency_scraper_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
# 全域變數
info_list = []
info_lock = threading.Lock()  # 用於保護 info_list
PATH = "C:/python/msedgedriver.exe"
"""初始化 WebDriver"""
def init_driver():
   
    options = webdriver.EdgeOptions()
    options.add_argument('--start-maximized')
    options.add_argument('--disable-notifications')
    options.add_experimental_option('excludeSwitches', ['enable-logging'])
    service = Service(PATH)
    return webdriver.Edge(service=service, options=options)
"""抓取單個幣別的匯率"""
def get_exchange_rate(index, max_retries=5):
    for attempt in range(max_retries):
        driver = None
        try:
            logger.info(f"開始處理第 {index + 1} 個幣別 (第 {attempt + 1} 次嘗試)")
            driver = init_driver()
            driver.get("https://rate.bot.com.tw/xrt")
            time.sleep(2 + random.random())  # 加入隨機等待

            # 找到並點擊歷史查詢按鈕
            history_buttons = WebDriverWait(driver, 10).until(
                EC.presence_of_all_elements_located((By.XPATH, '//*[@id="ie11andabove"]/div/table/tbody/tr/td[7]/a'))
            )
            
            if index >= len(history_buttons):
                logger.error(f"索引 {index} 超出範圍。總按鈕數: {len(history_buttons)}")
                return False

            # 點擊歷史查詢按鈕
            history_buttons[index].click()
            time.sleep(2 + random.random())

            # 處理新視窗
            wait = WebDriverWait(driver, 10)
            wait.until(lambda d: len(d.window_handles) > 1)
            new_window = driver.window_handles[-1]
            driver.switch_to.window(new_window)
            time.sleep(1 + random.random())

            # 點擊查詢按鈕
            search_button = WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//*[@id="single"]'))
            )
            search_button.click()
            time.sleep(2 + random.random())

            # 獲取幣別名稱
            currency_element = WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.XPATH, '//*[@id="ie11andabove"]/div/div[2]/div[2]/div/div[2]'))
            )
            currency_name = currency_element.text.strip()

            # 等待表格加載並獲取數據
            rows = WebDriverWait(driver, 10).until(
                EC.presence_of_all_elements_located((By.XPATH, '//*[@id="ie11andabove"]/div/table/tbody/tr'))
            )

            local_info_list = []  # 暫存當前幣別的數據
            for row in rows:
                try:
                    datetime_str = row.find_element(By.XPATH, './td[1]').text.strip()
                    buy_price = row.find_element(By.XPATH, './td[3]').text.strip()
                    sell_price = row.find_element(By.XPATH, './td[4]').text.strip()

                    local_info_list.append({
                        "幣別": currency_name,
                        "更新時間": datetime_str,
                        "買入價": buy_price,
                        "賣出價": sell_price,
                        "抓取時間": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    })
                except Exception as e:
                    logger.error(f"處理行數據時發生錯誤: {str(e)}")
                    continue

            # 如果成功獲取數據，將其加入全域列表
            if local_info_list:
                with info_lock:
                    info_list.extend(local_info_list)
                logger.info(f"成功抓取 {currency_name} 的 {len(local_info_list)} 筆數據")
                return True

        except Exception as e:
            logger.error(f"第 {attempt + 1} 次嘗試失敗: {str(e)}")
            if attempt == max_retries - 1:
                logger.error(f"幣別 {index} 最終抓取失敗")
        finally:
            try:
                if driver:
                    driver.quit()
            except:
                pass
            time.sleep(2)  # 確保瀏覽器完全關閉

    return False
"""將結果保存到文件"""
def save_to_file():
    try:
        filename = f"exchange_rates_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        with open(filename, 'w', encoding='utf-8') as f:
            for info in info_list:
                f.write(f"幣別: {info['幣別']}\n")
                f.write(f"更新時間: {info['更新時間']}\n")
                f.write(f"買入價: {info['買入價']}\n")
                f.write(f"賣出價: {info['賣出價']}\n")
                f.write(f"抓取時間: {info['抓取時間']}\n")
                f.write("-" * 50 + "\n")
        logger.info(f"數據已保存到文件: {filename}")
    except Exception as e:
        logger.error(f"保存文件時發生錯誤: {str(e)}")
"""主程序"""
def main():
    try:
        # 獲取總幣別數
        driver = init_driver()
        driver.get("https://rate.bot.com.tw/xrt")
        history_buttons = driver.find_elements(By.XPATH, '//*[@id="ie11andabove"]/div/table/tbody/tr/td[7]/a')
        total_currencies = len(history_buttons)
        driver.quit()

        logger.info(f"總共發現 {total_currencies} 個幣別需要處理")

        # 使用線程池處理
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = {executor.submit(get_exchange_rate, i): i for i in range(total_currencies)}
            completed = 0
            
            for future in as_completed(futures):
                index = futures[future]
                try:
                    success = future.result()
                    completed += 1
                    status = "成功" if success else "失敗"
                    logger.info(f"完成第 {index + 1} 個幣別 ({completed}/{total_currencies}) - {status}")
                except Exception as e:
                    logger.error(f"處理第 {index + 1} 個幣別時發生錯誤: {str(e)}")

        logger.info(f"實際抓取到 {len(info_list)} 個幣別的數據")
        save_to_file()
        
    except Exception as e:
        logger.error(f"主程序執行錯誤: {str(e)}")
if __name__ == "__main__":
    main()
df = pd.DataFrame(info_list)
filename = f"exchange_rates_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(filename, index=False, encoding='utf-8-sig')

KeyboardInterrupt: 