## 对Google Patents进行解析

- 格式为`JSON`
- 对于标量类型的字段，直接作为主表字段
- 对于一对多关系的复合类型字段，拆分成包含publication_id的分表
- 对于多对多关系的复合类型字段，先查询实体ID，然后输出关系表
- 部分冗余的脏字段可以抛弃

In [None]:
RAW_DATA_PATH = r'/data/dataset/google_patents'

OUTPUT_BASE_PATH = r'/data/users/liangzhentao/Projects/google-patent-parser/data/output/'

INVENTOR_HARMONIZED_PATH = OUTPUT_BASE_PATH + 'inventor_harmonized.csv'
ASSIGNEE_HARMONIZED_PATH = OUTPUT_BASE_PATH + 'assignee_harmonized.csv'
EXAMINER_PATH = OUTPUT_BASE_PATH + 'examiner.csv'

OUTPUT_CONFIG = {
    # 标量字段 (Scalar Fields)
    'publication': {
        'path': OUTPUT_BASE_PATH + 'publication.csv',
        'headers': ['publication_number', 'application_number', 'country_code', 'kind_code', 'application_kind', 'pct_number', 'family_id', 'spif_publication_number', 'spif_application_number', 'publication_date', 'filing_date', 'grant_date', 'priority_date', 'entity_status', 'art_unit'],
        'json_field': None  # 标量字段直接从根对象提取
    },
    # 一对多字段 (One-to-Many Fields)
    'title': {
        'path': OUTPUT_BASE_PATH + 'publication_title.csv',
        'headers': ['publication_number', 'text', 'language', 'truncated'],
        'json_field': 'title_localized'
    },
    'abstract': {
        'path': OUTPUT_BASE_PATH + 'publication_abstract.csv',
        'headers': ['publication_number', 'text', 'language', 'truncated'],
        'json_field': 'abstract_localized'
    },
    'claims': {
        'path': OUTPUT_BASE_PATH + 'publication_claims.csv',
        'headers': ['publication_number', 'text', 'language', 'truncated'],
        'json_field': 'claims_localized'
    },
    'description': {
        'path': OUTPUT_BASE_PATH + 'publication_description.csv',
        'headers': ['publication_number', 'text', 'language', 'truncated'],
        'json_field': 'description_localized'
    },
    # 分类字段 (Classification Fields) - 结构类似，可以复用逻辑
    'uspc': {'path': OUTPUT_BASE_PATH + 'publication_uspc.csv', 'headers': ['publication_number', 'code', 'inventive', 'first', 'tree'], 'json_field': 'uspc'},
    'ipc': {'path': OUTPUT_BASE_PATH + 'publication_ipc.csv', 'headers': ['publication_number', 'code', 'inventive', 'first', 'tree'], 'json_field': 'ipc'},
    'cpc': {'path': OUTPUT_BASE_PATH + 'publication_cpc.csv', 'headers': ['publication_number', 'code', 'inventive', 'first', 'tree'], 'json_field': 'cpc'},
    'fi': {'path': OUTPUT_BASE_PATH + 'publication_fi.csv', 'headers': ['publication_number', 'code', 'inventive', 'first', 'tree'], 'json_field': 'fi'},
    'fterm': {'path': OUTPUT_BASE_PATH + 'publication_fterm.csv', 'headers': ['publication_number', 'code', 'inventive', 'first', 'tree'], 'json_field': 'fterm'},
    'locarno': {'path': OUTPUT_BASE_PATH + 'publication_locarno.csv', 'headers': ['publication_number', 'code', 'inventive', 'first', 'tree'], 'json_field': 'locarno'},
    # 多对多/复杂关系字段 (Many-to-Many / Complex Relations)
    'inventor_map': {
        'path': OUTPUT_BASE_PATH + 'publication_inventor.csv',
        'headers': ['publication_number', 'inventor_id', 'sequence'],  # 添加 'sequence'
        'json_field': 'inventor_harmonized'
    },
    'assignee_map': {
        'path': OUTPUT_BASE_PATH + 'publication_assignee.csv',
        'headers': ['publication_number', 'assignee_id', 'sequence'],  # 添加 'sequence'
        'json_field': 'assignee_harmonized'
    },
    'examiner_map': {
        'path': OUTPUT_BASE_PATH + 'publication_examiner.csv',
        'headers': ['publication_number', 'examiner_id'],
        'json_field': 'examiner'
    },
    'patent_citation': {
        'path': OUTPUT_BASE_PATH + 'publication_citation.csv',
        'headers': ['publication_number', 'cited_publication_number', 'type', 'category'], 
        'json_field': 'citation'
    },
    'non_patent_citation': {
        'path': OUTPUT_BASE_PATH + 'publication_non_patent_reference.csv',
        'headers': ['publication_number', 'npl_text', 'type', 'category'],
        'json_field': 'citation'
    },
    'priority_claim': {
        'path': OUTPUT_BASE_PATH + 'publication_priority_claim.csv',
        'headers': ['publication_number', 'priority_application_number', 'filing_date'],
        'json_field': 'priority_claim'
    },
    'child': {
        'path': OUTPUT_BASE_PATH + 'publication_child.csv',
        'headers': ['publication_number', 'child_application_number', 'type', 'filing_date'], 
        'json_field': 'child'
    },
    'parent': {
        'path': OUTPUT_BASE_PATH + 'publication_parent.csv',
        'headers': ['publication_number', 'parent_application_number', 'type', 'filing_date'], 
        'json_field': 'parent'
    }
}

In [10]:
import orjson as json
import csv
from tqdm import tqdm
from glob import glob

读取inventor、assignee、examiner的ID

In [3]:
inventor_id_dict = {}
with open(INVENTOR_HARMONIZED_PATH, 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in tqdm(reader, desc='Reading inventor ids...'):
        inventor_id_dict[f"{row['name']}@@{row['country_code']}"] = row['inventor_id']

assignee_id_dict = {}
with open(ASSIGNEE_HARMONIZED_PATH, 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in tqdm(reader, desc='Reading assignee ids...'):
        assignee_id_dict[f"{row['name']}@@{row['country_code']}"] = row['assignee_id']

examiner_id_dict = {}
with open(EXAMINER_PATH, 'r', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for row in tqdm(reader, desc='Reading examiner ids...'):
        examiner_id_dict[f"{row['name']}@@{row['department']}@@{row['level']}"] = row['examiner_id']

Reading inventor ids...: 29452934it [00:59, 497972.26it/s]
Reading assignee ids...: 17542829it [00:34, 508054.06it/s]
Reading examiner ids...: 238057it [00:00, 390505.62it/s]


准备输入文件

In [4]:
raw_jsonl_file_list = sorted(glob(RAW_DATA_PATH + '/*', recursive=True))
print(f'共有 {len(raw_jsonl_file_list)} 个文件需要处理')
print(f'首个文件名称 {raw_jsonl_file_list[0]}')
print(f'最后文件名称 {raw_jsonl_file_list[-1]}')

共有 13110 个文件需要处理
首个文件名称 /data/dataset/google_patents/patents-000000000000
最后文件名称 /data/dataset/google_patents/patents-000000013109


准备输出函数

In [11]:
def write_batches(batches, initial_write=False):
    """将所有批处理数据写入对应的CSV文件。"""
    for key, data in batches.items():
        if not data:
            continue
        
        config = OUTPUT_CONFIG[key]
        mode = 'w' if initial_write else 'a'
        
        with open(config['path'], mode, newline='', encoding='utf-8') as f:
            writer = csv.writer(f, quoting=csv.QUOTE_ALL)
            if initial_write:
                writer.writerow(config['headers'])
            writer.writerows(data)

### 解析JSON文件并分批写入到结果CSV

In [None]:
BATCH_SIZE = 1000  # 每处理10000条记录就写入一次文件

# 使用字典统一管理所有批处理列表
batches = {key: [] for key in OUTPUT_CONFIG.keys()}
is_first_write = True
line_counter = 0

for file in tqdm(raw_jsonl_file_list, desc='Processing files'):
    with open(file, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                patent = json.loads(line)
                pub_num = patent.get('publication_number')
                if not pub_num:
                    continue

                # --- 1. 处理标量字段 ---
                scalar_row = [patent.get(h) for h in OUTPUT_CONFIG['publication']['headers']]
                batches['publication'].append(scalar_row)

                # --- 2. 处理通用的一对多关系 (Localized Text & Classifications) ---
                one_to_many_keys = ['title', 'abstract', 'claims', 'description', 
                                    'uspc', 'ipc', 'cpc', 'fi', 'fterm', 'locarno']
                for key in one_to_many_keys:
                    config = OUTPUT_CONFIG[key]
                    if config['json_field'] in patent:
                        for item in patent[config['json_field']]:
                            # 将tree字段（如果是列表）转换为字符串
                            if 'tree' in item and isinstance(item.get('tree'), list):
                                item['tree'] = ';'.join(item.get('tree', []))
                            row = [pub_num] + [item.get(h) for h in config['headers'][1:]]
                            batches[key].append(row)
                
                # --- 3. 处理复杂的多对多关系 ---
                # Inventor (*** 已修改 ***)
                if 'inventor_harmonized' in patent:
                    # 使用 enumerate 来获取顺序号 (从0开始)
                    for i, item in enumerate(patent['inventor_harmonized']):
                        lookup_key = f"{item.get('name')}@@{item.get('country_code')}"
                        inventor_id = inventor_id_dict.get(lookup_key)
                        sequence = i + 1
                        if inventor_id:
                            # 在行末尾添加顺序号 i
                            batches['inventor_map'].append([pub_num, inventor_id, sequence])

                # Assignee (*** 已修改 ***)
                if 'assignee_harmonized' in patent:
                    # 使用 enumerate 来获取顺序号 (从0开始)
                    for i, item in enumerate(patent['assignee_harmonized']):
                        lookup_key = f"{item.get('name')}@@{item.get('country_code')}"
                        assignee_id = assignee_id_dict.get(lookup_key)
                        sequence = i + 1
                        if assignee_id:
                            # 在行末尾添加顺序号 i
                            batches['assignee_map'].append([pub_num, assignee_id, sequence])

                # Examiner
                if 'examiner' in patent:
                    for item in patent['examiner']:
                        lookup_key = f"{item.get('name')}@@{item.get('department')}@@{item.get('level')}"
                        examiner_id = examiner_id_dict.get(lookup_key)
                        if examiner_id:
                            batches['examiner_map'].append([pub_num, examiner_id])

                # Citation (分为专利和非专利)
                if 'citation' in patent:
                    for item in patent['citation']:
                        # 优先使用application_number，因为它更稳定
                        cited_pub_num = item.get('publication_number')
                        if cited_pub_num: # 认为是专利引用
                            batches['patent_citation'].append([pub_num, cited_pub_num, item.get('type'), item.get('category')])
                        elif item.get('npl_text'): # 认为是非专利引用
                            batches['non_patent_citation'].append([pub_num, item.get('npl_text'), item.get('type'), item.get('category')])
                
                # Priority Claim
                if 'priority_claim' in patent:
                    for item in patent['priority_claim']:
                        app_num = item.get('application_number')
                        if app_num:
                            batches['priority_claim'].append([pub_num, app_num, item.get('filing_date')])

                # Child
                if 'child' in patent:
                    for item in patent['child']:
                        app_num = item.get('application_number')
                        if app_num:
                             batches['child'].append([pub_num, app_num, item.get('type'), item.get('filing_date')])
                
                # Parent
                if 'parent' in patent:
                    for item in patent['parent']:
                        app_num = item.get('application_number')
                        if app_num:
                             batches['parent'].append([pub_num, app_num, item.get('type'), item.get('filing_date')])

                line_counter += 1
                
                # --- 4. 批量写入 ---
                if line_counter % BATCH_SIZE == 0:
                    write_batches(batches, initial_write=is_first_write)
                    # 清空所有批处理列表
                    batches = {key: [] for key in OUTPUT_CONFIG.keys()}
                    is_first_write = False
                    
            except Exception as e:
                # 增加异常捕获，避免单个错误行导致整个程序中断
                print(f"Error processing line: {line_counter}. Error: {e}")
                continue

# ==============================================================================
# 5. 处理并写入最后一批剩余的数据 (Process and write the final remaining batch)
# ==============================================================================
write_batches(batches, initial_write=is_first_write)
print("All files processed.")

Processing files:   0%|          | 6/13110 [00:05<3:34:04,  1.02it/s]


KeyboardInterrupt: 