In [2]:
# 導入所需套件
import os
import time
import random
import pandas as pd
import requests
from bs4 import BeautifulSoup
import urllib.parse
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import logging
from tqdm import tqdm
import hashlib
import re
import urllib3
from typing import Optional, Dict, Any, List

class ArticleScraper:
    def __init__(self, scan_mode="all", data_file="articles.xlsx"):
        """初始化爬蟲設定"""
        # 基本設定
        self.base_url = "https://real-estate.get.com.tw/Columns/"
        self.detail_url = f"{self.base_url}detail.aspx"
        self.journal_url = f"{self.base_url}journal.aspx"
        self.target_authors = ["曾榮耀", "許文昌", "蘇偉強"]
        self.scan_mode = scan_mode
        self.data_file = Path(data_file)

        # 期刊參數設定
        self.journal_params = {
            "no": "1282",
            "pno": "51121"
        }

        # 時間範圍設定
        self.end_date = datetime.now()
        self.start_date = self.end_date - timedelta(days=9*365)

        # 效能設定
        self.batch_size = 50
        self.max_workers = 4
        self.max_retries = 5
        self.retry_delay = 3
        self.request_interval = 1.5

        # 文章編號範圍
        self.start_no = 900000
        self.max_no = 915000

        # 初始化
        self.setup_directories()
        self.setup_session()
        self.setup_logger()
        self.processed_articles = set()
        self.load_processed_articles()
        self.last_request_time = 0

    def setup_directories(self):
        """建立必要的目錄結構"""
        self.base_dir = Path("real_estate_articles")
        self.articles_dir = self.base_dir / "articles"
        self.images_dir = self.articles_dir / "images"
        self.logs_dir = self.base_dir / "logs"

        for directory in [self.base_dir, self.articles_dir, self.images_dir, self.logs_dir]:
            directory.mkdir(parents=True, exist_ok=True)

        # 創建預設的失敗圖片
        self.failed_image_path = self.images_dir / "image_download_failed.png"
        if not self.failed_image_path.exists():
            try:
                from PIL import Image, ImageDraw
                img = Image.new('RGB', (400, 100), color='white')
                d = ImageDraw.Draw(img)
                d.text((10, 40), "Image Download Failed", fill='black')
                img.save(self.failed_image_path)
            except Exception:
                self.failed_image_path.touch()

    def setup_session(self):
        """設定請求session"""
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
            'Accept-Language': 'zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1'
        })

    def setup_logger(self):
        """設定日誌系統"""
        self.logger = logging.getLogger('ArticleScraper')
        self.logger.setLevel(logging.INFO)

        log_file = self.logs_dir / f"scraper_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
        handlers = [
            logging.FileHandler(log_file, encoding='utf-8'),
            logging.StreamHandler()
        ]

        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        for handler in handlers:
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)

    def load_processed_articles(self):
        """載入已處理的文章"""
        if self.data_file.exists():
            df = pd.read_excel(self.data_file)
            if '文章編號' in df.columns:
                self.processed_articles = set(df['文章編號'].astype(str))

    def wait_between_requests(self):
        """控制請求間隔"""
        current_time = time.time()
        elapsed = current_time - self.last_request_time
        if elapsed < self.request_interval:
            time.sleep(self.request_interval - elapsed)
        self.last_request_time = time.time()

    def get_max_page_number(self) -> int:
        """獲取期刊最大頁數"""
        try:
            self.wait_between_requests()
            response = self.session.get(
                self.journal_url,
                params=self.journal_params,
                timeout=30,
                verify=False
            )
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')

            # 找到分頁元素
            pagination = soup.select('.pagination a')
            if pagination:
                page_numbers = []
                for a in pagination:
                    try:
                        page_numbers.append(int(a.text.strip()))
                    except ValueError:
                        continue
                return max(page_numbers) if page_numbers else 1
            return 1

        except Exception as e:
            self.logger.error(f"獲取最大頁數失敗: {str(e)}")
            return 30  # 預設較大的頁數以確保不遺漏

    def get_article_urls_from_journal(self, page_no: int) -> List[int]:
        """從期刊頁面獲取文章編號列表"""
        try:
            params = self.journal_params.copy()
            params['page_no'] = page_no

            self.wait_between_requests()
            response = self.session.get(
                self.journal_url,
                params=params,
                timeout=30,
                verify=False
            )
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')

            articles = []
            for link in soup.select('a[href*="detail.aspx?no="]'):
                href = link.get('href', '')
                if 'no=' in href:
                    article_no = href.split('no=')[-1]
                    try:
                        article_no = int(article_no)
                        if str(article_no) not in self.processed_articles:
                            articles.append(article_no)
                    except ValueError:
                        continue

            return articles

        except Exception as e:
            self.logger.error(f"獲取第 {page_no} 頁文章列表失敗: {str(e)}")
            return []
        
        

    def fetch_article(self, article_no: int) -> Optional[Dict]:
        """抓取單篇文章"""
        if str(article_no) in self.processed_articles:
            return None

        for retry in range(self.max_retries):
            try:
                params = {'no': article_no}
                self.wait_between_requests()

                self.logger.info(f"正在抓取文章 {article_no}")
                response = self.session.get(
                    self.detail_url,
                    params=params,
                    timeout=30,
                    verify=False
                )

                self.logger.info(f"文章 {article_no} 回應狀態碼: {
                                 response.status_code}")

                if response.status_code == 404:
                    self.logger.info(f"文章 {article_no} 不存在")
                    return None

                response.raise_for_status()
                response.encoding = 'utf-8'

                soup = BeautifulSoup(response.text, 'html.parser')
                article_data = self.parse_article(soup, article_no)

                if article_data and self.validate_article(article_data):
                    self.logger.info(f"成功解析文章 {article_no}")
                    return article_data
                else:
                    self.logger.info(f"文章 {article_no} 不符合條件或解析失敗")
                    return None

            except requests.exceptions.RequestException as e:
                self.logger.error(f"抓取文章 {article_no} 失敗 (嘗試 {
                                  retry + 1}/{self.max_retries}): {str(e)}")
                if hasattr(e, 'response') and hasattr(e.response, 'text'):
                    self.logger.error(f"錯誤回應內容: {e.response.text[:200]}")
                if retry < self.max_retries - 1:
                    time.sleep(self.retry_delay * (retry + 1))
                    continue
            except Exception as e:
                self.logger.error(f"處理文章 {article_no} 時發生未預期錯誤: {str(e)}")
                break

        return None

    def parse_article(self, soup: BeautifulSoup, article_no: int) -> Optional[Dict]:
        """解析文章內容"""
        try:
            author = None
            article_info = {}

            for row in soup.select('.columnsDetail_tableRow'):
                th = row.select_one('.columnsDetail_tableth')
                td = row.select_one('.columnsDetail_tabletd')
                if th and td:
                    key = th.text.strip()
                    value = td.text.strip()
                    article_info[key] = value

                    if key == '內文':
                        article_info['內文HTML'] = str(td)
                    elif key == '作者':
                        author = value

            # 如果不是目標作者就直接返回
            if not author or not any(target in author for target in self.target_authors):
                return None

            # 處理文章內容和圖片
            content = self.process_content(
                article_info.get('內文HTML', ''), article_no)

            return {
                '文章編號': article_no,
                '標題': article_info.get('篇名', ''),
                '作者': author,
                '日期': article_info.get('日期', ''),
                '內文': content,
                'URL': f"{self.detail_url}?no={article_no}",
                '爬取時間': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }

        except Exception as e:
            self.logger.error(f"解析文章 {article_no} 失敗: {str(e)}")
            return None
        
    def process_content(self, html_content: str, article_no: int) -> str:
        """處理文章內容，包括圖片下載和格式轉換"""
        try:
            soup = BeautifulSoup(html_content, 'html.parser')
            content_parts = []

            # 處理文字段落
            for p in soup.find_all(['p', 'div']):
                text = p.get_text().strip()
                if text:
                    content_parts.append(text)

            # 處理圖片
            for img in soup.find_all('img'):
                img_url = img.get('src', '')
                if not img_url:
                    continue

                if not img_url.startswith(('http://', 'https://')):
                    img_url = urllib.parse.urljoin(self.base_url, img_url)

                img_filename = self.download_image(img_url, article_no)
                if img_filename:
                    content_parts.append(
                        f'\n![圖片](../images/{img_filename})\n')

            return '\n\n'.join(content_parts)

        except Exception as e:
            self.logger.error(f"處理文章 {article_no} 內容時發生錯誤: {str(e)}")
            return html_content

    def download_image(self, img_url: str, article_no: int) -> Optional[str]:
        """下載並保存圖片"""
        try:
            # 生成唯一的檔案名
            url_hash = hashlib.md5(img_url.encode()).hexdigest()
            img_ext = os.path.splitext(urllib.parse.urlparse(img_url).path)[1]
            if not img_ext:
                img_ext = '.jpg'
            filename = f"article_{article_no}_{url_hash}{img_ext}"

            img_path = self.images_dir / filename

            # 如果圖片已存在，直接返回檔名
            if img_path.exists():
                return filename

            # 下載圖片
            self.wait_between_requests()
            response = self.session.get(
                img_url,
                timeout=30,
                verify=False,
                stream=True
            )
            response.raise_for_status()

            # 檢查是否為有效的圖片
            content_type = response.headers.get('content-type', '')
            if not content_type.startswith('image/'):
                raise ValueError(f"無效的圖片類型: {content_type}")

            # 保存圖片
            with open(img_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)

            return filename

        except Exception as e:
            self.logger.error(f"下載圖片失敗 ({img_url}): {str(e)}")
            return None

    def save_article(self, article_data: Dict[str, Any]) -> bool:
        """保存文章到 Markdown 文件和 Excel"""
        try:
            article_no = article_data['文章編號']

            # 建立 Markdown 內容
            md_content = f"# {article_data['標題']}\n\n"
            md_content += f"作者: {article_data['作者']}\n"
            md_content += f"日期: {article_data['日期']}\n"
            md_content += f"來源: {article_data['URL']}\n\n"
            md_content += f"*注：本文圖片存放於 ../images/ 目錄下*\n\n"
            md_content += "---\n\n"
            md_content += article_data['內文']

            # 保存 Markdown 文件
            md_file = self.articles_dir / f"article_{article_no}.md"
            with open(md_file, 'w', encoding='utf-8') as f:
                f.write(md_content)

            # 更新 Excel 文件
            df_new = pd.DataFrame([article_data])
            if self.data_file.exists():
                df_existing = pd.read_excel(self.data_file)
                df = pd.concat([df_existing, df_new], ignore_index=True)
            else:
                df = df_new

            df.to_excel(self.data_file, index=False)

            # 更新已處理文章集合
            self.processed_articles.add(str(article_no))

            return True

        except Exception as e:
            self.logger.error(f"保存文章 {article_data['文章編號']} 失敗: {str(e)}")
            return False

    def validate_article(self, article_data: Dict) -> bool:
        """驗證文章資料的有效性"""
        if not article_data:
            return False

        required_fields = ['文章編號', '標題', '作者', '日期', '內文']
        if not all(field in article_data for field in required_fields):
            return False

        # 檢查文章日期
        try:
            article_date = datetime.strptime(article_data['日期'], '%Y-%m-%d')
            if not (self.start_date <= article_date <= self.end_date):
                return False
        except ValueError:
            return False

        # 檢查作者
        if not any(author in article_data['作者'] for author in self.target_authors):
            return False

        # 檢查內容長度
        if len(article_data['內文'].strip()) < 100:  # 最少100字
            return False

        return True
    
    def setup_session(self):
        """設置請求會話"""
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7',
        })
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

    def setup_logger(self):
        """設置日誌記錄"""
        self.logger = logging.getLogger('ArticleScraper')
        self.logger.setLevel(logging.INFO)

        # 檔案處理器
        log_file = self.logs_dir / \
            f'scraper_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setLevel(logging.INFO)

        # 控制台處理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)

        # 設置格式
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)

        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)

    def load_processed_articles(self):
        """載入已處理的文章清單"""
        try:
            if self.data_file.exists():
                df = pd.read_excel(self.data_file)
                self.processed_articles = set(str(no)
                                              for no in df['文章編號'].tolist())
            else:
                self.processed_articles = set()
        except Exception as e:
            self.logger.error(f"載入已處理文章清單失敗: {str(e)}")
            self.processed_articles = set()

    def wait_between_requests(self):
        """控制請求間隔"""
        current_time = time.time()
        time_since_last = current_time - self.last_request_time
        if time_since_last < self.request_interval:
            time.sleep(self.request_interval - time_since_last)
        self.last_request_time = time.time()

    def process_article_batch(self, article_numbers: List[int]):
        """處理一批文章"""
        for article_no in article_numbers:
            try:
                article_data = self.fetch_article(article_no)
                if article_data and self.validate_article(article_data):
                    if self.save_article(article_data):
                        self.logger.info(f"成功保存文章 {article_no}")
                    else:
                        self.logger.error(f"保存文章 {article_no} 失敗")
            except Exception as e:
                self.logger.error(f"處理文章 {article_no} 時發生錯誤: {str(e)}")

    def run(self):
        """主要執行函數"""
        self.logger.info("開始執行文章爬蟲")

        try:
            if self.scan_mode == "journal":
                # 從期刊頁面獲取文章
                max_page = self.get_max_page_number()
                self.logger.info(f"找到 {max_page} 頁期刊")

                all_articles = []
                for page in range(1, max_page + 1):
                    articles = self.get_article_urls_from_journal(page)
                    all_articles.extend(articles)

                self.logger.info(f"從期刊頁面找到 {len(all_articles)} 篇文章")

            else:  # scan_mode == "all"
                # 掃描所有可能的文章編號
                all_articles = list(range(self.start_no, self.max_no + 1))
                self.logger.info(f"將掃描 {len(all_articles)} 個可能的文章編號")

            # 分批處理文章
            batches = [all_articles[i:i + self.batch_size]
                       for i in range(0, len(all_articles), self.batch_size)]

            with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                list(tqdm(
                    executor.map(self.process_article_batch, batches),
                    total=len(batches),
                    desc="處理文章批次"
                ))

            self.logger.info("文章爬蟲執行完成")

        except Exception as e:
            self.logger.error(f"執行過程中發生錯誤: {str(e)}")
        finally:
            self.session.close()


if __name__ == "__main__":
    # 使用範例
    scraper = ArticleScraper(
        scan_mode="journal",  # 或 "all"
        data_file="articles.xlsx"
    )
    scraper.run()

2025-02-02 16:22:09,927 - INFO - 開始執行文章爬蟲
2025-02-02 16:22:09,927 - INFO - 開始執行文章爬蟲
2025-02-02 16:22:10,434 - INFO - 找到 1 頁期刊
2025-02-02 16:22:10,434 - INFO - 找到 1 頁期刊
2025-02-02 16:22:11,481 - INFO - 從期刊頁面找到 0 篇文章
2025-02-02 16:22:11,481 - INFO - 從期刊頁面找到 0 篇文章
處理文章批次: 0it [00:00, ?it/s]
2025-02-02 16:22:11,482 - INFO - 文章爬蟲執行完成
2025-02-02 16:22:11,482 - INFO - 文章爬蟲執行完成


In [6]:
from pathlib import Path
import pandas as pd
from datetime import datetime
import re


def generate_table_of_contents(excel_path: str, markdown_dir: str, output_path: str):
    """
    生成文章目錄
    
    Args:
        excel_path: Excel檔案路徑，包含文章metadata
        markdown_dir: Markdown文件目錄
        output_path: 輸出目錄檔案路徑
    """
    # 讀取Excel
    df = pd.read_excel(excel_path)

    # 轉換日期欄位
    df['日期'] = pd.to_datetime(df['日期'])

    # 生成目錄內容
    toc_content = "# 文章目錄\n\n"

    # 按日期排序（從新到舊）
    df = df.sort_values('日期', ascending=False)

    # 依年份分組
    for year in df['日期'].dt.year.unique():
        toc_content += f"\n## {year}年\n\n"
        year_df = df[df['日期'].dt.year == year]

        # 依月份分組
        for month in year_df['日期'].dt.month.unique():
            toc_content += f"\n### {month}月\n\n"
            month_df = year_df[year_df['日期'].dt.month == month]

            # 生成每篇文章的連結
            for _, article in month_df.iterrows():
                clean_title = re.sub(r'[<>:"/\\|?*]', '', str(article['標題']))
                md_path = Path(markdown_dir) / \
                    f"{article['文章編號']}_{clean_title}.md"

                if md_path.exists():
                    relative_path = f"./articles/{md_path.name}"
                    date_str = article['日期'].strftime('%Y-%m-%d')
                    toc_content += f"- {date_str} [{article['標題']}]({relative_path}) - {
                        article['作者']}\n"

    # 添加生成時間
    toc_content += f"\n\n---\n最後更新時間：{
        datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"

    # 寫入檔案
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(toc_content)

    print(f"目錄已生成至：{output_path}")


def main():
    # 設定路徑
    excel_path = "articles.xlsx"
    markdown_dir = "./articles"
    output_path = "./README.md"

    # 生成目錄
    generate_table_of_contents(excel_path, markdown_dir, output_path)


if __name__ == "__main__":
    main()

目錄已生成至：./README.md
