In [14]:
import os
import json
import glob
import re
import pandas as pd
from pypdf import PdfReader, PdfWriter
import pdfplumber
from tqdm import tqdm

# 設定工作目錄
TARGET_DIR = r"C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all"

# 設定輸出檔名
OUTPUT_CSV = "final_corrected_data_incremental.csv"
ERROR_CSV = "processing_errors_incremental.csv"

def clean_text(text):
    """清洗文字"""
    if not text:
        return ""
    return re.sub(r'[^\w\u4e00-\u9fff]', '', text).lower()

def get_shingles(text, n=2):
    """切分 n-gram"""
    if len(text) < n:
        return {text}
    return set(text[i:i+n] for i in range(len(text) - n + 1))

def calculate_hybrid_score(target, page_content):
    """混合評分機制 (Chunking + Shingling)"""
    if not target or not page_content:
        return 0.0
    
    # 策略 1: 連續片段
    chunk_size = 10
    chunks = [target[i:i+chunk_size] for i in range(0, len(target), chunk_size) if len(target[i:i+chunk_size]) > 4]
    
    chunk_hits = 0
    if chunks:
        for chunk in chunks:
            if chunk in page_content:
                chunk_hits += 1
        chunk_score = chunk_hits / len(chunks)
    else:
        chunk_score = 0.0

    if chunk_score > 0.6:
        return chunk_score

    # 策略 2: 碎片集合
    target_shingles = get_shingles(target, n=2)
    if not target_shingles:
        return 0.0
        
    hit_count = 0
    for s in target_shingles:
        if s in page_content:
            hit_count += 1
            
    shingle_score = hit_count / len(target_shingles)
    
    return max(chunk_score, shingle_score)

def split_pdf(file_path, company_code_stock):
    """切分 PDF"""
    reader = PdfReader(file_path)
    base_dir = os.path.dirname(file_path)
    
    check_file = os.path.join(base_dir, f"{company_code_stock}_page_1.pdf")
    if os.path.exists(check_file):
        return len(reader.pages)

    # 增加錯誤處理，避免硬碟滿了程式崩潰
    try:
        for i, page in enumerate(reader.pages):
            writer = PdfWriter()
            writer.add_page(page)
            output_path = os.path.join(base_dir, f"{company_code_stock}_page_{i+1}.pdf")
            with open(output_path, "wb") as f:
                writer.write(f)
        return len(reader.pages)
    except OSError:
        print(f"警告：切分 {company_code_stock} 時空間不足或發生錯誤。")
        return 0

def process_single_folder(folder_path, folder_name, is_test_mode=False):
    pdf_files = glob.glob(os.path.join(folder_path, "*.pdf"))
    original_pdf = [f for f in pdf_files if "_page_" not in f]
    json_files = glob.glob(os.path.join(folder_path, "*.json"))
    
    if not original_pdf or not json_files:
        return [], [{"company": folder_name, "error": "Missing PDF or JSON"}]

    pdf_path = original_pdf[0]
    json_path = json_files[0]
    company_key = folder_name

    results = []
    errors = []

    try:
        split_pdf(pdf_path, company_key)
        
        page_texts = {}
        with pdfplumber.open(pdf_path) as pdf:
            for i, page in enumerate(pdf.pages):
                extracted = page.extract_text() or ""
                page_texts[i + 1] = clean_text(extracted)

        with open(json_path, 'r', encoding='utf-8') as f:
            data_list = json.load(f)

        if is_test_mode:
            print(f"處理中: {company_key}")

        for item in data_list:
            raw_target = item.get("data", "")
            raw_evidence = item.get("evidence_string", "")
            
            cleaned_target = clean_text(raw_target)
            cleaned_evidence = clean_text(raw_evidence)
            
            best_page = None
            best_score = 0.0
            
            # 比對 Data
            if cleaned_target:
                for p_num, p_text in page_texts.items():
                    if len(p_text) < 50: continue 
                    score = calculate_hybrid_score(cleaned_target, p_text)
                    if score > best_score:
                        best_score = score
                        best_page = p_num

            # 比對 Evidence
            if best_score < 0.85 and cleaned_evidence:
                for p_num, p_text in page_texts.items():
                    if len(p_text) < 50: continue
                    score = calculate_hybrid_score(cleaned_evidence, p_text)
                    if score > best_score:
                        best_score = score
                        best_page = p_num

            result_row = item.copy()
            if best_page and best_score >= 0.4:
                result_row["URL"] = f"local_file://{company_key}_page_{best_page}.pdf"
                result_row["page_number"] = best_page
                result_row["_status"] = "Success"
                if is_test_mode:
                    print(f"  [成功] 頁數: {best_page} (分數: {best_score:.2f})")
            else:
                result_row["_status"] = "Not Found"
                errors.append({
                    "company": company_key,
                    "target": raw_target[:30],
                    "score": best_score
                })
                if is_test_mode:
                    print(f"  [失敗] 最高分僅: {best_score:.2f}")

            results.append(result_row)

    except Exception as e:
        return [], [{"company": company_key, "error": str(e)}]

    return results, errors

def save_chunk(data, filename):
    """
    【關鍵功能】將資料寫入 CSV
    - 如果檔案不存在：建立新檔並寫入 Header
    - 如果檔案已存在：用 Append 模式寫入 (不寫 Header)
    """
    if not data:
        return
    
    df = pd.DataFrame(data)
    
    if not os.path.isfile(filename):
        # 檔案不存在，寫入 Header
        df.to_csv(filename, index=False, encoding='utf-8-sig')
    else:
        # 檔案存在，追加寫入，不寫 Header
        df.to_csv(filename, mode='a', header=False, index=False, encoding='utf-8-sig')

def main():
    if os.path.exists(TARGET_DIR):
        os.chdir(TARGET_DIR)
    
    # 初始化：如果之前有舊的檔案，先刪除，避免重複追加
    if os.path.exists(OUTPUT_CSV):
        print(f"刪除舊檔案: {OUTPUT_CSV}，重新開始...")
        os.remove(OUTPUT_CSV)
    if os.path.exists(ERROR_CSV):
        os.remove(ERROR_CSV)

    all_folders = [d for d in os.listdir('.') if os.path.isdir(d)]
    
    print("1. 測試單一 (Debug)")
    print("2. 執行全部 (邊跑邊存)")
    mode = input("Select: ").strip()

    targets = all_folders[:1] if mode == '1' else all_folders

    # 批次處理
    print(f"開始處理，結果將即時儲存至: {OUTPUT_CSV}")
    
    for folder in tqdm(targets):
        path = os.path.abspath(folder)
        
        # 處理單一公司
        res, err = process_single_folder(path, folder, is_test_mode=(mode=='1'))
        
        # 【立即存檔】
        save_chunk(res, OUTPUT_CSV)
        save_chunk(err, ERROR_CSV)

    print(f"\n全數完成！")
    print(f"結果檔案: {os.path.abspath(OUTPUT_CSV)}")
    if os.path.exists(ERROR_CSV):
        print(f"錯誤紀錄: {os.path.abspath(ERROR_CSV)}")

if __name__ == "__main__":
    main()

1. 測試單一 (Debug)
2. 執行全部 (邊跑邊存)
開始處理，結果將即時儲存至: final_corrected_data_incremental.csv


100%|██████████| 50/50 [36:47<00:00, 44.15s/it] 


全數完成！
結果檔案: C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\final_corrected_data_incremental.csv
錯誤紀錄: C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\processing_errors_incremental.csv





- 錯誤重跑

In [16]:
import pandas as pd
import os
import glob
import re
import pdfplumber
from tqdm import tqdm
import warnings

# 忽略 pandas 的未來警告
warnings.simplefilter(action='ignore', category=FutureWarning)

# ================= 設定區 =================
INPUT_CSV = r"C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\final_corrected_data_incremental.csv"
ROOT_DIR = r"C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all"
OUTPUT_CSV = r"C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\final_complete_best_guess.csv"
# =========================================

def clean_text(text):
    """
    修正版：強制轉型為字串，避免 NaN (float) 導致報錯
    """
    if pd.isna(text) or text is None:
        return ""
    
    # 強制轉成字串
    text_str = str(text)
    
    # 移除所有非中英數的字元
    return re.sub(r'[^\w\u4e00-\u9fff]', '', text_str).lower()

def get_shingles(text, n=2):
    if len(text) < n: return {text}
    return set(text[i:i+n] for i in range(len(text) - n + 1))

def calculate_hybrid_score(target, page_content):
    if not target or not page_content: return 0.0
    
    # 策略 1: Chunking
    chunk_size = 10
    chunks = [target[i:i+chunk_size] for i in range(0, len(target), chunk_size) if len(target[i:i+chunk_size]) > 4]
    chunk_score = 0.0
    if chunks:
        hits = sum(1 for c in chunks if c in page_content)
        chunk_score = hits / len(chunks)
        
    if chunk_score > 0.6: return chunk_score

    # 策略 2: Shingling
    target_shingles = get_shingles(target, n=2)
    if not target_shingles: return 0.0
    hit_count = sum(1 for s in target_shingles if s in page_content)
    return max(chunk_score, hit_count / len(target_shingles))

def get_folder_map(root_dir):
    folder_map = {}
    if not os.path.exists(root_dir): return {}
    
    dirs = [d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]
    for d in dirs:
        match = re.search(r'(\d{4})', d)
        if match:
            code = match.group(1)
            folder_map[code] = os.path.join(root_dir, d)
            folder_map[d] = os.path.join(root_dir, d)
    return folder_map

def main():
    print(f"正在讀取資料: {INPUT_CSV}")
    try:
        df = pd.read_csv(INPUT_CSV, encoding='utf-8-sig')
    except:
        df = pd.read_csv(INPUT_CSV, encoding='utf-8')

    # 【重要修正】將所有空值填補為空字串，防止 float error
    df = df.fillna("")

    # 處理 page_number: 轉數字，無法轉的變 0
    df['page_number'] = pd.to_numeric(df['page_number'], errors='coerce').fillna(0)
    
    # 定義失敗條件：Status 是 Not Found 或者 Page 是 0
    mask = (df['_status'].astype(str).str.contains('Not Found')) | (df['page_number'] == 0)
    failed_indices = df[mask].index
    
    if len(failed_indices) == 0:
        print("恭喜！資料完整，沒有發現需要修復的項目。")
        return

    print(f"發現 {len(failed_indices)} 筆資料需要修復 (強制配對模式)...")

    folder_map = get_folder_map(ROOT_DIR)
    
    # 取出失敗的子集進行處理
    df_failed = df.loc[failed_indices].copy()
    grouped = df_failed.groupby('_company_key')

    pbar = tqdm(grouped, desc="修復進度")

    for company_key, group in pbar:
        company_key_str = str(company_key).replace(".0", "") # 去除可能的浮點數結尾
        
        target_folder = folder_map.get(company_key_str)
        if not target_folder:
            # 模糊搜尋
            for k, v in folder_map.items():
                if company_key_str in k:
                    target_folder = v
                    break
        
        if not target_folder:
            continue

        # 找 PDF
        pdf_files = glob.glob(os.path.join(target_folder, "*.pdf"))
        original_pdf = [f for f in pdf_files if "_page_" not in f]
        
        if not original_pdf:
            continue
            
        pdf_path = original_pdf[0]
        folder_name = os.path.basename(target_folder)

        # 讀 PDF
        page_texts = {}
        try:
            with pdfplumber.open(pdf_path) as pdf:
                for i, page in enumerate(pdf.pages):
                    # 這裡也加強 clean_text 防止 PDF 讀出怪東西
                    txt = clean_text(page.extract_text())
                    if len(txt) > 5:
                        page_texts[i + 1] = txt
        except Exception:
            continue

        if not page_texts:
            continue

        # 針對每一筆失敗資料進行 Best Guess
        for idx, row in group.iterrows():
            target_text = clean_text(row['data'])
            evidence_text = clean_text(row['evidence_string'])

            best_page = 1 
            best_score = -1.0
            
            # 強制遍歷所有頁面找最高分
            for p_num, p_text in page_texts.items():
                s1 = calculate_hybrid_score(target_text, p_text)
                
                s2 = 0.0
                if evidence_text:
                    s2 = calculate_hybrid_score(evidence_text, p_text)
                
                score = max(s1, s2)

                if score > best_score:
                    best_score = score
                    best_page = p_num

            # 寫回原始 df (使用 loc 更新)
            # 即使 best_score 很低，我們也接受 (因為是 Best Guess)
            new_url = f"local_file://{folder_name}_page_{best_page}.pdf"
            
            df.loc[idx, 'URL'] = new_url
            df.loc[idx, 'page_number'] = int(best_page)
            df.loc[idx, '_status'] = "Repaired"

    # 最終整理
    df['page_number'] = df['page_number'].astype(int)
    
    cols_to_keep = [
        "data", "URL", "page_number", "ESG_type", 
        "promise_status", "promise_string", "verification_timeline", 
        "evidence_status", "evidence_string", "evidence_quality", 
        "_company_key"
    ]
    
    final_cols = [c for c in cols_to_keep if c in df.columns]
    df_final = df[final_cols]

    print(f"\n全部修復完成！")
    print(f"輸出檔案: {OUTPUT_CSV}")
    df_final.to_csv(OUTPUT_CSV, index=False, encoding='utf-8-sig')

if __name__ == "__main__":
    main()

正在讀取資料: C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\final_corrected_data_incremental.csv
發現 89 筆資料需要修復 (強制配對模式)...


修復進度: 100%|██████████| 14/14 [11:04<00:00, 47.46s/it]


全部修復完成！
輸出檔案: C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\final_complete_best_guess.csv





- 用final_complete_best_guess.csv重新跑pegatron_4938_esg_report_2024.pdf

In [17]:
import pandas as pd
import os
import re
import pdfplumber
from tqdm import tqdm
import warnings

# 忽略 pandas 警告
warnings.simplefilter(action='ignore', category=FutureWarning)

# ================= 設定區 =================
# 1. 原始 CSV 路徑
INPUT_CSV = r"C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\final_complete_best_guess.csv"

# 2. 根目錄 (用來找 PDF)
ROOT_DIR = r"C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all"

# 3. 指定要重跑的公司資料夾名稱 (必須精確)
TARGET_COMPANY_FOLDER = "pegatron_4938"
TARGET_COMPANY_KEY = 4938  # CSV 裡面的 _company_key 通常是數字

# 4. 輸出檔案 (更新後的 CSV)
OUTPUT_CSV = r"C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\final_complete_pegatron_updated.csv"
# =========================================

def clean_text(text):
    """強力清洗：防呆(NaN) + 只留中英數"""
    if pd.isna(text) or text is None:
        return ""
    text_str = str(text)
    return re.sub(r'[^\w\u4e00-\u9fff]', '', text_str).lower()

def get_shingles(text, n=2):
    if len(text) < n: return {text}
    return set(text[i:i+n] for i in range(len(text) - n + 1))

def calculate_segment_score(target, page_content):
    """分段比對 (針對表格結構 | ●)"""
    segments = re.split(r'[|●•\n<br>。；;]+', str(target))
    valid_segments = [clean_text(s) for s in segments if len(clean_text(s)) > 2]
    
    if not valid_segments:
        return 0.0

    hits = sum(1 for seg in valid_segments if seg in page_content)
    return hits / len(valid_segments)

def calculate_hybrid_score(target, page_content):
    """綜合比對：取 (長句, 碎片, 分段) 三者最高分"""
    if not target or not page_content: return 0.0
    
    target = str(target)
    
    # 1. Chunking (長句)
    chunk_size = 10
    chunks = [target[i:i+chunk_size] for i in range(0, len(target), chunk_size) if len(target[i:i+chunk_size]) > 4]
    chunk_score = 0.0
    if chunks:
        hits = sum(1 for c in chunks if c in page_content)
        chunk_score = hits / len(chunks)

    # 2. Shingling (碎片)
    target_shingles = get_shingles(clean_text(target), n=2)
    shingle_score = 0.0
    if target_shingles:
        hit_count = sum(1 for s in target_shingles if s in page_content)
        shingle_score = hit_count / len(target_shingles)

    # 3. Segment (表格分段)
    segment_score = calculate_segment_score(target, page_content)

    # 回傳最高分
    return max(chunk_score, shingle_score, segment_score)

def main():
    print(f"正在讀取 CSV: {INPUT_CSV}")
    try:
        df = pd.read_csv(INPUT_CSV, encoding='utf-8-sig')
    except:
        df = pd.read_csv(INPUT_CSV, encoding='utf-8')

    # 填補空值
    df = df.fillna("")

    # 篩選出 Pegatron (4938) 的資料
    # 注意：有時候 key 是 int, 有時候是 float/string，轉成 string 比對最安全
    df['_company_key'] = df['_company_key'].astype(str).str.replace(r'\.0$', '', regex=True)
    target_key_str = str(TARGET_COMPANY_KEY)

    pegatron_indices = df[df['_company_key'] == target_key_str].index

    if len(pegatron_indices) == 0:
        print(f"錯誤：在 CSV 中找不到公司代號為 {TARGET_COMPANY_KEY} 的資料。")
        print("請確認 CSV 中的 _company_key 欄位。")
        return

    print(f"找到 {len(pegatron_indices)} 筆 Pegatron 資料，準備重跑...")

    # 尋找 PDF 路徑
    # 路徑結構: .../all/pegatron_4938/xxxxx.pdf
    company_dir = os.path.join(ROOT_DIR, TARGET_COMPANY_FOLDER)
    if not os.path.exists(company_dir):
        print(f"錯誤：找不到資料夾 {company_dir}")
        return

    pdf_files = [f for f in os.listdir(company_dir) if f.endswith(".pdf") and "_page_" not in f]
    if not pdf_files:
        print("錯誤：資料夾內找不到原始 PDF 檔。")
        return
    
    pdf_path = os.path.join(company_dir, pdf_files[0])
    print(f"正在讀取 PDF: {pdf_files[0]}")

    # 讀取 PDF 全文索引
    page_texts = {}
    try:
        with pdfplumber.open(pdf_path) as pdf:
            for i, page in enumerate(tqdm(pdf.pages, desc="解析 PDF 頁面")):
                txt = clean_text(page.extract_text())
                if len(txt) > 5:
                    page_texts[i + 1] = txt
    except Exception as e:
        print(f"PDF 讀取失敗: {e}")
        return

    print("PDF 解析完成，開始重新比對資料...")

    # 開始針對每一筆資料進行重新比對
    updated_count = 0
    
    # 這裡使用 tqdm 顯示進度
    for idx in tqdm(pegatron_indices, desc="重跑 Pegatron 資料"):
        row = df.loc[idx]
        target_text = row['data']
        evidence_text = row['evidence_string']

        best_page = 1
        best_score = -1.0
        
        # 遍歷每一頁找最高分
        for p_num, p_text in page_texts.items():
            # 算 Data 分數
            s1 = calculate_hybrid_score(target_text, p_text)
            
            # 算 Evidence 分數
            s2 = 0.0
            if evidence_text:
                s2 = calculate_hybrid_score(evidence_text, p_text)
            
            # 取兩者最高
            score = max(s1, s2)

            if score > best_score:
                best_score = score
                best_page = p_num

        # 更新 DataFrame
        new_url = f"local_file://{TARGET_COMPANY_FOLDER}_page_{best_page}.pdf"
        
        # 檢查是否跟原本的不一樣 (可以觀察有多少被修正了)
        old_page = df.loc[idx, 'page_number']
        try:
            old_page = int(float(old_page))
        except:
            old_page = -1

        if old_page != best_page:
            updated_count += 1
        
        df.loc[idx, 'URL'] = new_url
        df.loc[idx, 'page_number'] = int(best_page)
        # 標記一下狀態，讓你知道這筆是被特定重跑的
        df.loc[idx, '_status'] = f"Updated (Pegatron Re-run, Score: {best_score:.2f})"

    # 輸出
    print(f"\n重跑完成！共有 {updated_count} 筆資料的頁碼發生變更。")
    
    # 整理欄位格式
    df['page_number'] = pd.to_numeric(df['page_number'], errors='coerce').fillna(0).astype(int)
    
    cols_to_keep = [
        "data", "URL", "page_number", "ESG_type", 
        "promise_status", "promise_string", "verification_timeline", 
        "evidence_status", "evidence_string", "evidence_quality", 
        "_company_key"
    ]
    # 保留必要欄位，並加上 _status 方便你看結果 (提交時可移除)
    final_cols = [c for c in df.columns if c in cols_to_keep or c == '_status']
    
    df_final = df[final_cols]
    
    print(f"正在儲存至: {OUTPUT_CSV}")
    df_final.to_csv(OUTPUT_CSV, index=False, encoding='utf-8-sig')
    print("完成。")

if __name__ == "__main__":
    main()

正在讀取 CSV: C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\final_complete_best_guess.csv
找到 70 筆 Pegatron 資料，準備重跑...
正在讀取 PDF: pegatron_4938_esg_report_2024.pdf


解析 PDF 頁面: 100%|██████████| 57/57 [00:10<00:00,  5.39it/s]


PDF 解析完成，開始重新比對資料...


重跑 Pegatron 資料: 100%|██████████| 70/70 [00:01<00:00, 59.59it/s]



重跑完成！共有 54 筆資料的頁碼發生變更。
正在儲存至: C:\Users\wesley\OneDrive\桌面\LAB\ai_cup\company_data_by_annotation_group\all\final_complete_pegatron_updated.csv
完成。
