In [30]:
# 初始化 PDF文件路径
pdf_path = "../../90-文档_Data/复杂PDF/billionaires_page-1-5.pdf"

# # 导入 LlamaIndex 相关模块
# from llama_index.core import Settings
# from llama_index.llms.openai import OpenAI
# from llama_index.embeddings.openai import OpenAIEmbedding
# from dotenv import load_dotenv

# # 全局设置
# load_dotenv()
# Settings.llm = OpenAI(model="gpt-3.5-turbo")
# Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")

In [36]:
# 处理文档  移除页眉页脚 方法
import fitz  # PyMuPDF
from collections import defaultdict
import os

def remove_header_footer(input_pdf_path):
    """
    使用模式识别从PDF文件的每一页中移除页眉和页脚
    
    参数：
        input_pdf_path: 输入PDF文件的路径
    """
    # 打开输入的PDF
    doc = fitz.open(input_pdf_path)
    
    print(f"PDF共有 {len(doc)} 页")
    
    if len(doc) < 2:
        print("警告：PDF少于2页，模式检测可能不可靠。")
    
    # 步骤1：分析文本元素的位置以识别页眉和页脚
    # 这种方法使用更复杂的方法来查看文本位置
    
    # 获取每页的尺寸
    page_sizes = [(page.rect.width, page.rect.height) for page in doc]
    
    # 首先，分析所有页面上的文本位置
    top_blocks = []  # 存储顶部区域的块
    bottom_blocks = []  # 存储底部区域的块
    
    # 定义页面的哪个百分比应被视为页眉/页脚
    header_region = 0.15  # 顶部15%
    footer_region = 0.15  # 底部15%
    
    print(f"分析顶部{header_region*100}%的页眉和底部{footer_region*100}%的页脚")
    
    # 第一遍：收集位置信息
    for page_num in range(len(doc)):
        page = doc[page_num]
        width, height = page_sizes[page_num]
        
        print(f"\n第 {page_num+1} 页 ({width}x{height}):")
        
        # 提取带有位置信息的文本
        blocks = page.get_text("dict")["blocks"]
        
        # 处理每个文本块
        for block in blocks:
            if block.get("type") == 0:  # 文本块
                for line in block.get("lines", []):
                    for span in line.get("spans", []):
                        text = span.get("text", "").strip()
                        if not text:
                            continue
                            
                        # 获取位置信息
                        bbox = span.get("bbox")  # (x0, y0, x1, y1)
                        if not bbox:
                            continue
                            
                        # 检查是否在页眉区域
                        y0 = bbox[1]
                        y1 = bbox[3]
                        
                        # 在页眉区域？
                        if y0 < height * header_region:
                            top_blocks.append({
                                "text": text,
                                "page": page_num,
                                "bbox": bbox,
                                "rel_pos": y0 / height  # 相对位置
                            })
                            print(f"页眉候选：'{text[:30]}...' 位置y={y0:.1f}")
                        
                        # 在页脚区域？
                        if y1 > height * (1 - footer_region):
                            bottom_blocks.append({
                                "text": text,
                                "page": page_num,
                                "bbox": bbox,
                                "rel_pos": y1 / height  # 相对位置
                            })
                            print(f"页脚候选：'{text[:30]}...' 位置y={y1:.1f}")
    
    # 第二遍：识别位置和内容中的模式
    # 按相似的垂直位置（相对于页面高度）分组
    
    def group_by_position(blocks, threshold=0.02):
        """将出现在相似位置的块分组"""
        position_groups = defaultdict(list)
        
        # 首先按相似的y位置对块进行分组
        for block in blocks:
            # 将位置四舍五入到最接近的阈值进行分组
            pos_key = round(block["rel_pos"] / threshold) * threshold
            position_groups[pos_key].append(block)
        
        return position_groups
    
    # 按位置对页眉和页脚进行分组
    header_groups = group_by_position(top_blocks)
    footer_groups = group_by_position(bottom_blocks)
    
    print("\n页眉位置组:")
    for pos, blocks in header_groups.items():
        print(f"距顶部 {pos*100:.1f}% 的位置: {len(blocks)} 次出现")
        # 统计每个文本的出现次数
        text_counts = defaultdict(int)
        for block in blocks:
            text_counts[block["text"]] += 1
        
        # 查找在此位置重复出现的文本
        for text, count in text_counts.items():
            if count >= max(2, len(doc) * 0.5):  # 在至少50%的页面上出现
                print(f"  - '{text[:50]}...' 出现 {count} 次")
    
    print("\n页脚位置组:")
    for pos, blocks in footer_groups.items():
        print(f"距顶部 {pos*100:.1f}% 的位置: {len(blocks)} 次出现")
        # 统计每个文本的出现次数
        text_counts = defaultdict(int)
        for block in blocks:
            text_counts[block["text"]] += 1
        
        # 查找在此位置重复出现的文本
        for text, count in text_counts.items():
            if count >= max(2, len(doc) * 0.5):  # 在至少50%的页面上出现
                print(f"  - '{text[:50]}...' 出现 {count} 次")
    
    # 定义页眉/页脚移除标准
    # 我们将移除满足以下条件的块:
    # 1. 在多个页面的相似位置出现（至少50%的页面）
    # 2. 包含相同或相似的文本
    
    # 步骤3：创建一个移除页眉和页脚的新文档
    new_doc = fitz.open()
    
    # 跟踪我们确定为页眉/页脚的位置
    header_positions = []
    for pos, blocks in header_groups.items():
        text_counts = defaultdict(int)
        for block in blocks:
            text_counts[block["text"]] += 1
        
        # 如果任何文本在此位置至少出现在50%的页面上
        if any(count >= max(2, len(doc) * 0.5) for count in text_counts.values()):
            header_positions.append(pos)
    
    footer_positions = []
    for pos, blocks in footer_groups.items():
        text_counts = defaultdict(int)
        for block in blocks:
            text_counts[block["text"]] += 1
        
        # 如果任何文本在此位置至少出现在50%的页面上
        if any(count >= max(2, len(doc) * 0.5) for count in text_counts.values()):
            footer_positions.append(pos)
    
    print(f"\n识别出 {len(header_positions)} 个页眉位置和 {len(footer_positions)} 个页脚位置")
    
    # 创建一个没有页眉和页脚的新PDF
    for page_num in range(len(doc)):
        page = doc[page_num]
        width, height = page_sizes[page_num]
        
        # 创建一个新页面
        new_page = new_doc.new_page(width=width, height=height)
        
        # 编辑页眉和页脚
        blocks_to_redact = []
        
        # 获取此页的所有块
        blocks = page.get_text("dict")["blocks"]
        
        for block in blocks:
            if block.get("type") == 0:  # 文本块
                for line in block.get("lines", []):
                    for span in line.get("spans", []):
                        bbox = span.get("bbox")
                        if not bbox:
                            continue
                            
                        # 相对位置
                        y0_rel = bbox[1] / height
                        y1_rel = bbox[3] / height
                        
                        # 检查是否在页眉位置
                        is_header = any(abs(y0_rel - pos) < 0.02 for pos in header_positions)
                        # 检查是否在页脚位置
                        is_footer = any(abs(y1_rel - pos) < 0.02 for pos in footer_positions)
                        
                        if is_header or is_footer:
                            # 为此跨度创建编辑注释
                            redact_rect = fitz.Rect(bbox)
                            page.add_redact_annot(redact_rect)
        
        # 应用编辑
        page.apply_redactions()
        
        # 复制清理后的页面
        new_page.show_pdf_page(new_page.rect, doc, page_num)
    
    output_dir = "output"
    os.makedirs(output_dir, exist_ok=True)
    output_filename = os.path.basename(input_pdf_path).replace(".pdf", "_no_header_footer.pdf")
    output_pdf_path = os.path.join(output_dir, output_filename)

    # 保存输出
    new_doc.save(output_pdf_path)
    new_doc.close()
    doc.close()
    
    print(f"\nPDF处理成功。已保存到 {output_pdf_path}")
    return output_pdf_path

# 执行过滤页眉页脚方法 返回新的PDF文件路径
filter_pdf_path = remove_header_footer(pdf_path)

#### 1. unstructured 表格提取
**优化策略**
1. 使用cleaners过滤页脚信息
2. 通过coordinates去除页脚
3. 通过metadata识别并排除页脚

In [None]:
from unstructured.partition.pdf import partition_pdf

elements = partition_pdf(
    filter_pdf_path,
    strategy="hi_res",  # 保持高精度策略
    ocr_languages=["eng"],  # 明确指定英语OCR，因为表格主要是英文
    extract_images_in_pdf=False,  # 减少对图像的处理，专注于文本和表格
    infer_table_structure=True,  # 保持表格结构推断
    max_characters=100000,  # 增加处理字符数上限，确保完整捕获
    include_page_breaks=True,  # 添加页面分隔符，便于后续处理
    pdf_image_dpi=300,  # 提高DPI，增强特殊符号识别
    detection_model_name="llm",  # 使用基于LLM的表格检测模型可能提高准确性
    image_output_directory_path=None,  # 不输出中间图像
    ocr_kwargs={"preserve_interword_spaces": True}  # 保留词间空格，提高格式保留度
)  # 解析PDF文档

# 创建一个元素ID到元素的映射
element_map = {element.id: element for element in elements if hasattr(element, "id")}

# 创建一个元素索引到元素的映射
element_index_map = {i: element for i, element in enumerate(elements)}

# 处理表格数据
for i, element in enumerate(elements):
    if element.category == "Table":
        print("\n表格数据:")
        print("表格元数据:", vars(element.metadata))
        print("表格内容:")
        # 打印表格内容
        # 检查表格是否有HTML结构
        if hasattr(element, "metadata") and hasattr(element.metadata, "text_as_html"):
            print("表格HTML结构:")
            print(element.metadata.text_as_html)
        else:
            print("表格结构:")
            print(element.text)

        # 获取并打印父节点信息
        parent_id = getattr(element.metadata, "parent_id", None)
        if parent_id and parent_id in element_map:
            parent_element = element_map[parent_id]
            # 跳过页眉页脚的父节点
            if parent_element.category not in ["Header", "Footer"]:
                print("\n父节点信息:")
                print(f"类型: {parent_element.category}")
                print(f"内容: {parent_element.text}")
                if hasattr(parent_element, "metadata"):
                    print(f"父节点元数据: {vars(parent_element.metadata)}")
        else:
            print(f"未找到父节点 (ID: {parent_id})")
            # 如果未找到父节点,从前3个节点中寻找Title
            title_content = None
            for j in range(max(0, i - 3), i):
                prev_element = element_index_map.get(j)
                if prev_element and prev_element.category == "Title":
                    title_content = prev_element.text
                    break

            if title_content:
                print(f"\n找到最近的标题内容:")
                print(f"类型: {prev_element.category}")
                print(f"内容: {prev_element.text}")
                if hasattr(prev_element, "metadata"):
                    print(f"最近标题元数据: {vars(prev_element.metadata)}")
            else:
                print("\n未找到相关标题内容")
        print("-" * 100)

# 将表格数据和标题内容转换为Markdown格式
import re
import os

print("\n开始生成Markdown文件...")

# 创建一个列表来存储所有表格及其相关信息
tables_with_context = []

for i, element in enumerate(elements):
    if element.category == "Table":
        table_content = element.text
        table_html = getattr(element.metadata, "text_as_html", None)

        # 尝试获取标题信息
        title_content = None
        parent_id = getattr(element.metadata, "parent_id", None)

        # 方法1: 通过父节点获取标题
        if parent_id and parent_id in element_map:
            parent_element = element_map[parent_id]
            if parent_element.category not in ["Header", "Footer"]:
                title_content = parent_element.text

        # 方法2: 如果没有找到父节点标题，从前3个节点中寻找Title
        if not title_content:
            for j in range(max(0, i - 3), i):
                prev_element = element_index_map.get(j)
                if prev_element and prev_element.category == "Title":
                    title_content = prev_element.text
                    break

        # 如果仍然没有标题，使用默认标题
        if not title_content:
            title_content = f"表格 {len(tables_with_context) + 1}"

        # 将表格和标题添加到列表中
        tables_with_context.append(
            {
                "title": title_content.strip(),
                "content": table_content,
                "html": table_html,
            }
        )

# 生成Markdown文件
markdown_content = "# PDF表格提取结果\n\n"

for i, table_info in enumerate(tables_with_context, 1):
    # 添加标题
    markdown_content += f"## {table_info['title']}\n\n"

    # 将HTML表格转换为Markdown表格格式
    if table_info["html"]:
        # 提取表格内容
        html_content = table_info["html"]

        # 使用正则表达式提取表格行和单元格
        rows = re.findall(r"<tr.*?>(.*?)</tr>", html_content, re.DOTALL)

        markdown_table = ""
        header_processed = False

        for row in rows:
            # 提取单元格内容
            cells = re.findall(r"<t[hd].*?>(.*?)</t[hd]>", row, re.DOTALL)
            if cells:
                # 清理单元格内容中的HTML标签
                cleaned_cells = [re.sub(r"<.*?>", "", cell).strip() for cell in cells]

                # 添加表格行
                markdown_table += "| " + " | ".join(cleaned_cells) + " |\n"

                # 添加表头分隔符
                if not header_processed:
                    markdown_table += (
                        "| " + " | ".join(["---"] * len(cleaned_cells)) + " |\n"
                    )
                    header_processed = True

        markdown_content += markdown_table + "\n"
    else:
        # 如果没有HTML，使用原始文本内容
        markdown_content += f"```\n{table_info['content']}\n```\n\n"

# 保存Markdown文件
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, "extracted_tables.md")

with open(output_file, "w", encoding="utf-8") as f:
    f.write(markdown_content)

print(f"Markdown文件已生成: {output_file}")
print(f"共提取了 {len(tables_with_context)} 个表格")

#### 2. 使用LlamaParse解析PDF

In [None]:
from llama_parse import LlamaParse
import time
import nest_asyncio

nest_asyncio.apply()

# 记录开始时间
start_time = time.time()

# 使用LlamaParse解析PDF
parser = LlamaParse(
    result_type="markdown",
)

# 加载并解析PDF文档
documents = parser.load_data(filter_pdf_path)

# 记录结束时间
end_time = time.time()
print(f"PDF解析耗时: {end_time - start_time:.2f}秒")

# 打印过滤后的解析结果
print("\n文档内容:")
for i, doc in enumerate(documents, 1):
    print(f"\n文档 {i} 内容:")
    print(doc.text)