In [10]:
import pdfplumber
import json


def extract_tables_to_json(pdf_path, output_path):
    tables_data = []

    with pdfplumber.open(pdf_path) as pdf:
        for page_num, page in enumerate(pdf.pages):
            # 提取当前页面的所有表格
            tables = page.extract_tables()

            for table_index, table in enumerate(tables):
                # 将表格转换为字典格式
                table_dict = {
                    'page': page_num + 1,
                    'table_index': table_index,
                    'data': table,
                }
                tables_data.append(table_dict)

    # 保存为JSON文件
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(tables_data, f, ensure_ascii=False, indent=2)


# 使用示例
extract_tables_to_json(r'D:\user\PythonProject\AI_env2\uploads\1_tender_招标文件正文.pdf', 'tables.json')


In [11]:
import pdfplumber
import json


def extract_tables_with_headers(pdf_path, output_path):
    tables_data = []

    with pdfplumber.open(pdf_path) as pdf:
        for page_num, page in enumerate(pdf.pages):
            tables = page.extract_tables()

            for table_index, table in enumerate(tables):
                if not table:
                    continue

                # 假设第一行为表头
                headers = table[0] if table else []
                rows = table[1:] if len(table) > 1 else []

                # 转换为键值对格式
                table_rows = []
                for row in rows:
                    if len(row) == len(headers):
                        row_dict = dict(zip(headers, row))
                        table_rows.append(row_dict)

                table_dict = {
                    'page': page_num + 1,
                    'table_index': table_index,
                    'headers': headers,
                    'rows': table_rows,
                }
                tables_data.append(table_dict)

    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(tables_data, f, ensure_ascii=False, indent=2)


# 使用示例
extract_tables_with_headers(r'D:\user\PythonProject\AI_env2\uploads\1_tender_招标文件正文.pdf', 'tables_with_headers.json')


In [14]:
import pdfplumber
import json


def extract_and_smart_merge_tables(pdf_path, output_path, table_groups):
    """
    智能合并表格（基于表格组定义）
    :param pdf_path: PDF文件路径
    :param output_path: 输出JSON文件路径
    :param table_groups: 表格组定义，如 [{"pages": [4, 5, 6, 7], "name": "招标要求"}, ...]
    """
    tables_data = []

    with pdfplumber.open(pdf_path) as pdf:
        for group_index, group in enumerate(table_groups):
            merged_table = []
            first_headers = None

            # 遍历组内所有页面
            for page_num in group['pages']:
                if page_num <= len(pdf.pages):
                    page = pdf.pages[page_num - 1]
                    tables = page.extract_tables()

                    # 处理当前页面的所有表格
                    for table in tables:
                        if table:
                            # 第一个表格的表头作为标准
                            if first_headers is None:
                                first_headers = table[0] if table else []
                                merged_table.extend(table)
                            else:
                                # 检查是否与第一个表格结构匹配
                                if len(table[0]) == len(first_headers):
                                    # 去掉表头行再合并
                                    data_rows = table[1:] if len(table) > 1 else []
                                    merged_table.extend(data_rows)
                                else:
                                    # 结构不匹配，当作独立表格处理
                                    table_dict = {
                                        'page': page_num,
                                        'group': group.get(
                                            'name', f'Group {group_index}'
                                        ),
                                        'headers': table[0] if table else [],
                                        'rows': [
                                            dict(zip(table[0] if table else [], row))
                                            for row in table[1:]
                                            if table
                                            and len(row)
                                            == len(table[0] if table else [])
                                        ],
                                    }
                                    tables_data.append(table_dict)

            # 处理合并后的表格
            if merged_table and first_headers:
                rows = merged_table[1:] if len(merged_table) > 1 else []

                # 转换为键值对格式
                table_rows = []
                for row in rows:
                    if len(row) == len(first_headers):
                        row_dict = dict(zip(first_headers, row))
                        table_rows.append(row_dict)

                table_dict = {
                    'group': group.get('name', f'Group {group_index}'),
                    'pages': group['pages'],
                    'headers': first_headers,
                    'rows': table_rows,
                }
                tables_data.append(table_dict)

    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(tables_data, f, ensure_ascii=False, indent=2)


# 使用示例
# table_groups = [
#     {'pages': [4, 5, 6, 7, 8], 'name': '招标基本信息'},
#     {'pages': [14, 15, 16], 'name': '评标标准'},
#     {'pages': [27, 28], 'name': '设备清单'},
#     {'pages': [45, 46], 'name': '责任分工'},
# ]
table_groups = [    
    {'pages': [14, 15, 16], 'name': '评标标准'}
]

extract_and_smart_merge_tables(
    r'D:\user\PythonProject\AI_env2\uploads\1_tender_招标文件正文.pdf',
    'grouped_merged_tables.json',
    table_groups,
)


In [4]:
import pdfplumber
import json


def comprehensive_table_extraction(pdf_path, output_path, config):
    """
    综合表格提取方案
    :param pdf_path: PDF文件路径
    :param output_path: 输出JSON文件路径
    :param config: 配置信息，包含各种表格处理规则
    """
    tables_data = []

    with pdfplumber.open(pdf_path) as pdf:
        # 1. 处理独立页面表格
        if 'single_pages' in config:
            for page_num in config['single_pages']:
                if page_num <= len(pdf.pages):
                    page = pdf.pages[page_num - 1]
                    tables = page.extract_tables()

                    for table_index, table in enumerate(tables):
                        if table:
                            headers = table[0] if table else []
                            rows = table[1:] if len(table) > 1 else []

                            table_rows = []
                            for row in rows:
                                if len(row) == len(headers):
                                    row_dict = dict(zip(headers, row))
                                    table_rows.append(row_dict)

                            table_dict = {
                                'type': 'single_page',
                                'page': page_num,
                                'table_index': table_index,
                                'headers': headers,
                                'rows': table_rows,
                            }
                            tables_data.append(table_dict)

        # 2. 处理合并表格
        if 'merged_ranges' in config:
            for range_info in config['merged_ranges']:
                merged_table = []
                first_headers = None
                pages = range_info['pages']

                for page_num in pages:
                    if page_num <= len(pdf.pages):
                        page = pdf.pages[page_num - 1]
                        tables = page.extract_tables()

                        if tables and tables[0]:
                            table = tables[0]
                            if first_headers is None:
                                first_headers = table[0] if table else []
                                merged_table.extend(table)
                            else:
                                data_rows = table[1:] if len(table) > 1 else []
                                merged_table.extend(data_rows)

                if merged_table and first_headers:
                    rows = merged_table[1:] if len(merged_table) > 1 else []

                    table_rows = []
                    for row in rows:
                        if len(row) == len(first_headers):
                            row_dict = dict(zip(first_headers, row))
                            table_rows.append(row_dict)

                    table_dict = {
                        'type': 'merged',
                        'name': range_info.get('name', ''),
                        'pages': pages,
                        'headers': first_headers,
                        'rows': table_rows,
                    }
                    tables_data.append(table_dict)

    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(tables_data, f, ensure_ascii=False, indent=2)


# 使用示例
config = {
    # 'single_pages': [51, 72, 73, 79, 80, 82, 83],  # 独立表格页面
    'merged_ranges': [
        # {'name': '招标要求', 'pages': [4, 5, 6, 7, 8]},
        {'name': '评标标准', 'pages': [14, 15, 16]},
        # {'name': '设备清单', 'pages': [27, 28]},
        # {'name': '责任分工', 'pages': [45, 46]},
    ],
}

comprehensive_table_extraction(r'D:\user\PythonProject\AI_env2\uploads\1_tender_招标文件正文.pdf', 'comprehensive_tables.json', config)


In [5]:
import pdfplumber
import json


class PDFTextLocator:
    def __init__(self, pdf_path):
        self.pdf_path = pdf_path
        self.pdf = pdfplumber.open(pdf_path)

    def find_text(self, search_text, method='chars', case_sensitive=True):
        """
        查找文本位置的主方法
        :param search_text: 要查找的文本
        :param method: 查找方法 ('chars', 'words', 'text')
        :param case_sensitive: 是否区分大小写
        """
        positions = []

        for page_num, page in enumerate(self.pdf.pages, 1):
            if method == 'chars':
                positions.extend(
                    self._find_with_chars(page, page_num, search_text, case_sensitive)
                )
            elif method == 'words':
                positions.extend(
                    self._find_with_words(page, page_num, search_text, case_sensitive)
                )
            elif method == 'text':
                positions.extend(
                    self._find_with_text(page, page_num, search_text, case_sensitive)
                )

        return positions

    def _find_with_chars(self, page, page_num, search_text, case_sensitive):
        """使用字符对象查找"""
        positions = []
        chars = page.chars
        search = search_text if case_sensitive else search_text.lower()

        for i in range(len(chars) - len(search_text) + 1):
            match = True
            matched_chars = []

            for j in range(len(search_text)):
                if i + j < len(chars):
                    char_text = chars[i + j].get('text', '')
                    compare_text = char_text if case_sensitive else char_text.lower()
                    if compare_text == search[j]:
                        matched_chars.append(chars[i + j])
                    else:
                        match = False
                        break
                else:
                    match = False
                    break

            if match and matched_chars:
                x0 = min(char['x0'] for char in matched_chars)
                y0 = min(char['top'] for char in matched_chars)
                x1 = max(char['x1'] for char in matched_chars)
                y1 = max(char['bottom'] for char in matched_chars)

                positions.append(
                    {
                        'page': page_num,
                        'method': 'chars',
                        'text': search_text,
                        'bbox': {'x0': x0, 'y0': y0, 'x1': x1, 'y1': y1},
                        'center': {'x': (x0 + x1) / 2, 'y': (y0 + y1) / 2},
                        'page_size': {'width': page.width, 'height': page.height},
                    }
                )

        return positions

    def _find_with_words(self, page, page_num, search_text, case_sensitive):
        """使用单词对象查找"""
        positions = []
        words = page.extract_words()
        search = search_text if case_sensitive else search_text.lower()

        for word in words:
            word_text = word['text'] if case_sensitive else word['text'].lower()
            if search in word_text:
                positions.append(
                    {
                        'page': page_num,
                        'method': 'words',
                        'text': search_text,
                        'matched_text': word['text'],
                        'bbox': {
                            'x0': word['x0'],
                            'y0': word['top'],
                            'x1': word['x1'],
                            'y1': word['bottom'],
                        },
                        'center': {
                            'x': (word['x0'] + word['x1']) / 2,
                            'y': (word['top'] + word['bottom']) / 2,
                        },
                        'page_size': {'width': page.width, 'height': page.height},
                    }
                )

        return positions

    def _find_with_text(self, page, page_num, search_text, case_sensitive):
        """使用页面文本查找（近似位置）"""
        positions = []
        text = page.extract_text()
        search = search_text if case_sensitive else search_text.lower()
        page_text = text if case_sensitive else text.lower()

        if search in page_text:
            # 这只能提供页面级别的信息，无法提供精确坐标
            positions.append(
                {
                    'page': page_num,
                    'method': 'text',
                    'text': search_text,
                    'approximate': True,
                    'page_size': {'width': page.width, 'height': page.height},
                }
            )

        return positions

    def save_positions(self, positions, output_path):
        """保存位置信息到JSON文件"""
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(positions, f, ensure_ascii=False, indent=2)

    def close(self):
        """关闭PDF文件"""
        self.pdf.close()


# 使用示例
locator = PDFTextLocator(
    r'D:\user\PythonProject\AI_env2\uploads\1_tender_招标文件正文.pdf'
)

# 使用不同方法查找文本
positions_chars = locator.find_text('评标办法', method='chars')
print(positions_chars)
# positions_words = locator.find_text('投标保证金', method='words')
# print(positions_words)
# 保存结果
# locator.save_positions(positions_chars, '招标人_positions.json')
# locator.save_positions(positions_words, '投标保证金_positions.json')

# 关闭文件
locator.close()


[{'page': 5, 'method': 'chars', 'text': '评标办法', 'bbox': {'x0': 256.67865, 'y0': 525.3029999999999, 'x1': 298.68729999999994, 'y1': 535.7529999999999}, 'center': {'x': 277.68297499999994, 'y': 530.5279999999999}, 'page_size': {'width': 595.25, 'height': 841.9}}, {'page': 9, 'method': 'chars', 'text': '评标办法', 'bbox': {'x0': 115.55985000000001, 'y0': 446.34299999999996, 'x1': 157.5685, 'y1': 456.793}, 'center': {'x': 136.564175, 'y': 451.568}, 'page_size': {'width': 595.25, 'height': 841.9}}, {'page': 13, 'method': 'chars', 'text': '评标办法', 'bbox': {'x0': 202.08, 'y0': 282.903, 'x1': 244.80970000000002, 'y1': 293.35299999999995}, 'center': {'x': 223.44485000000003, 'y': 288.128}, 'page_size': {'width': 595.25, 'height': 841.9}}, {'page': 13, 'method': 'chars', 'text': '评标办法', 'bbox': {'x0': 96.0, 'y0': 303.3030000000001, 'x1': 138.6094, 'y1': 313.75300000000004}, 'center': {'x': 117.3047, 'y': 308.5280000000001}, 'page_size': {'width': 595.25, 'height': 841.9}}, {'page': 14, 'method': 'cha

In [None]:
import pdfplumber
import json
from typing import List, Dict, Any


class AdvancedTableMerger:
    def __init__(self, pdf_path: str):
        self.pdf_path = pdf_path
        self.pdf = pdfplumber.open(pdf_path)

    def analyze_table_content(self, table: List[List]) -> Dict:
        """
        分析表格内容特征
        """
        if not table:
            return {}

        analysis = {
            'row_count': len(table),
            'col_count': max(len(row) for row in table) if table else 0,
            'cell_count': sum(len(row) for row in table),
            'non_empty_cells': sum(sum(1 for cell in row if cell) for row in table),
            'content_types': self._analyze_content_types(table),
        }

        return analysis

    def _analyze_content_types(self, table: List[List]) -> Dict:
        """
        分析表格内容类型
        """
        types = {'numeric': 0, 'text': 0, 'mixed': 0, 'empty': 0}

        for row in table:
            for cell in row:
                if not cell:
                    types['empty'] += 1
                elif self._is_numeric(str(cell)):
                    types['numeric'] += 1
                elif self._is_text(str(cell)):
                    types['text'] += 1
                else:
                    types['mixed'] += 1

        return types

    def _is_numeric(self, text: str) -> bool:
        """判断是否为数字"""
        try:
            float(text.replace(',', '').replace(' ', ''))
            return True
        except ValueError:
            return False

    def _is_text(self, text: str) -> bool:
        """判断是否为纯文本"""
        return bool(text and not self._is_numeric(text) and text.strip())

    def calculate_table_similarity(self, table1: Dict, table2: Dict) -> float:
        """
        计算两个表格的相似度
        """
        # 1. 列数匹配度
        col_match = 1.0 if table1['cols'] == table2['cols'] else 0.0

        # 2. 内容类型相似度
        types1 = table1.get('content_analysis', {}).get('content_types', {})
        types2 = table2.get('content_analysis', {}).get('content_types', {})

        type_similarity = self._calculate_type_similarity(types1, types2)

        # 3. 表头相似度
        header_similarity = self.calculate_header_similarity(
            table1['headers'], table2['headers']
        )

        # 综合相似度
        similarity = 0.3 * col_match + 0.4 * type_similarity + 0.3 * header_similarity
        return similarity

    def _calculate_type_similarity(self, types1: Dict, types2: Dict) -> float:
        """计算内容类型相似度"""
        if not types1 or not types2:
            return 0.0

        total1 = sum(types1.values())
        total2 = sum(types2.values())

        if total1 == 0 or total2 == 0:
            return 0.0

        # 计算各类别的比例差异
        similarity = 0.0
        for key in set(types1.keys()) | set(types2.keys()):
            ratio1 = types1.get(key, 0) / total1
            ratio2 = types2.get(key, 0) / total2
            similarity += 1.0 - abs(ratio1 - ratio2)

        return similarity / len(set(types1.keys()) | set(types2.keys()))

    def extract_enhanced_tables(self) -> List[Dict]:
        """
        提取增强版表格信息
        """
        tables_info = []

        for page_num, page in enumerate(self.pdf.pages, 1):
            tables = page.extract_tables()

            for table_index, table in enumerate(tables):
                if table and len(table) > 0:
                    # 基本信息
                    rows = len(table)
                    cols = max(len(row) for row in table) if table else 0
                    headers = table[0] if table else []

                    # 内容分析
                    content_analysis = self.analyze_table_content(table)

                    tables_info.append(
                        {
                            'page': page_num,
                            'table_index': table_index,
                            'rows': rows,
                            'cols': cols,
                            'headers': headers,
                            'content_length': sum(
                                len(str(cell)) for row in table for cell in row if cell
                            ),
                            'data': table,
                            'content_analysis': content_analysis,
                        }
                    )

        return tables_info

    def smart_merge_tables(self, tables_info: List[Dict]) -> List[Dict]:
        """
        智能合并表格
        """
        if not tables_info:
            return []

        merged_tables = []
        i = 0

        while i < len(tables_info):
            current_table = tables_info[i]
            merged_data = [current_table['data']]
            merged_pages = [current_table['page']]

            # 向后查找可能的连续表格
            j = i + 1
            while j < len(tables_info):
                next_table = tables_info[j]

                # 检查是否应该合并
                should_merge = self._should_merge_tables(
                    current_table, next_table, merged_pages
                )

                if should_merge:
                    # 合并数据（去掉可能的重复表头）
                    next_data = next_table['data']
                    if len(next_data) > 1 and self._are_headers_similar(
                        current_table['headers'], next_table['headers']
                    ):
                        # 如果表头相似，跳过下一张表的表头行
                        merged_data.append(next_data[1:])
                    else:
                        merged_data.append(next_data)

                    merged_pages.append(next_table['page'])
                    current_table = next_table
                    j += 1
                else:
                    break

            # 合并所有数据
            final_data = []
            for data_chunk in merged_data:
                final_data.extend(data_chunk)

            # 创建合并后的表格
            merged_table = {
                'start_page': tables_info[i]['page'],
                'end_page': current_table['page'],
                'pages': merged_pages,
                'page_count': len(merged_pages),
                'total_rows': len(final_data),
                'cols': tables_info[i]['cols'],
                'headers': tables_info[i]['headers'],
                'data': final_data,
                'content_analysis': tables_info[i].get('content_analysis', {}),
            }

            merged_tables.append(merged_table)
            i = j

        return merged_tables

    def _should_merge_tables(
        self, table1: Dict, table2: Dict, merged_pages: List[int]
    ) -> bool:
        """
        判断是否应该合并两个表格
        """
        # 1. 必须是连续页面
        if table2['page'] != table1['page'] + 1:
            return False

        # 2. 已经合并过的页面不重复处理
        if table2['page'] in merged_pages:
            return False

        # 3. 列数必须相同
        if table1['cols'] != table2['cols']:
            return False

        # 4. 计算相似度
        similarity = self.calculate_table_similarity(table1, table2)

        # 5. 如果相似度足够高，则合并
        return similarity > 0.6

    def _are_headers_similar(self, headers1: List, headers2: List) -> bool:
        """
        判断表头是否相似
        """
        if not headers1 or not headers2:
            return False

        similarity = self.calculate_header_similarity(headers1, headers2)
        return similarity > 0.7

    def calculate_header_similarity(self, headers1: List, headers2: List) -> float:
        """
        计算表头相似度
        """
        if not headers1 or not headers2:
            return 0.0

        max_len = max(len(headers1), len(headers2))
        matches = 0

        for i in range(min(len(headers1), len(headers2))):
            h1 = str(headers1[i]).strip() if headers1[i] else ''
            h2 = str(headers2[i]).strip() if headers2[i] else ''

            if h1 and h2:
                # 字符串包含关系或完全相等
                if h1 == h2 or h1 in h2 or h2 in h1:
                    matches += 1
            elif h1 == h2:  # 都为空
                matches += 1

        return matches / max_len if max_len > 0 else 0.0

    def process_tables(self, output_path: str = None) -> List[Dict]:
        """
        主处理方法
        """
        # 1. 提取增强版表格信息
        tables_info = self.extract_enhanced_tables()

        # 2. 智能合并表格
        merged_tables = self.smart_merge_tables(tables_info)

        # 3. 转换为结构化格式
        structured_tables = []
        for table_info in merged_tables:
            headers = table_info['headers']
            data = table_info['data']

            # 转换为键值对格式
            rows = []
            if len(data) > 1:
                data_rows = data[1:]  # 跳过表头行
                for row in data_rows:
                    if len(row) == len(headers):
                        row_dict = dict(zip(headers, row))
                        rows.append(row_dict)

            structured_table = {
                'metadata': {
                    'start_page': table_info['start_page'],
                    'end_page': table_info['end_page'],
                    'pages': table_info['pages'],
                    'page_count': table_info['page_count'],
                    'total_rows': table_info['total_rows'],
                    'cols': table_info['cols'],
                },
                'headers': headers,
                'rows': rows,
                'content_analysis': table_info['content_analysis'],
            }

            structured_tables.append(structured_table)

        # 4. 保存结果
        if output_path:
            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump(structured_tables, f, ensure_ascii=False, indent=2)

        return structured_tables

    def close(self):
        """关闭PDF文件"""
        self.pdf.close()


# 使用示例
def advanced_main():
    merger = AdvancedTableMerger(
        r'D:\user\PythonProject\AI_env2\uploads\1_tender_招标文件正文.pdf'
    )

    try:
        result = merger.process_tables('advanced_merged_tables.json')

        print(f'智能识别到 {len(result)} 个跨页表格:')
        for i, table in enumerate(result, 1):
            meta = table['metadata']
            print(f'表格 {i}:')
            print(
                f'  - 跨页范围: 第{meta["start_page"]}页到第{meta["end_page"]}页 (共{meta["page_count"]}页)'
            )
            print(f'  - 页面列表: {meta["pages"]}')
            print(f'  - 总行数: {meta["total_rows"]}')
            print(f'  - 列数: {meta["cols"]}')
            print(
                f'  - 表头: {[h[:20] for h in table["headers"][:5]]}'
            )  # 显示前5个表头的前20个字符
            print(f'  - 数据行数: {len(table["rows"])}')
            print()

    finally:
        merger.close()


# 运行高级示例
# advanced_main()
