In [48]:
import os
from glob import glob
import polars as pl

# ===== 路径配置（按你给的）=====
RAW_DIR = r"D:\BaiduNetdiskDownload\中国专利数据库1985-2025.2\分年份保存数据"
RAW_GLOB = os.path.join(RAW_DIR, "*.csv")

FILTERED_MAIN = r"../../artifacts_full/patent_quality_output.csv"
WORK_DIR = r"./"

# ===== 输出目录与文件 =====
EXTRA_PARTS_DIR = os.path.join(WORK_DIR, "extra_parts")
os.makedirs(EXTRA_PARTS_DIR, exist_ok=True)

TARGET_IDS_PARQUET = os.path.join(WORK_DIR, "target_ids.parquet")
EXTRA_ALL_PARQUET = os.path.join(WORK_DIR, "extra_all.parquet")
EXTRA_ALL_DEDUP_PARQUET = os.path.join(WORK_DIR, "extra_all_dedup.parquet")

MAIN_PARQUET = os.path.join(WORK_DIR, "main.parquet")
FINAL_PARQUET = os.path.join(WORK_DIR, "main_enriched.parquet")

# ===== 关键列名 =====
ID_COL = "申请号"

NEED_COLS = [
    ID_COL,
    "专利类型",
    "公开公告年份",
    "IPC主分类号",
    "专利权人类型",
    "统一社会信用代码",
    "引证次数",
    "被引证次数",
    "自引次数",
    "他引次数",
    "被自引次数",
    "被他引次数",
    "家族引证次数",
    "家族被引证次数",
]

# ===== 编码（如果报错/乱码，改这里）=====
CSV_ENCODING = "utf8"  # 常见：gb18030 / utf8 / utf8-lossy / gbk

In [49]:
# ===== 1) Build target ids =====
# ===== 1) 生成 target_ids.parquet：从主表抽取 “申请号” 并去重（只需做一次）=====
if not os.path.exists(TARGET_IDS_PARQUET):
    print("Building target ids parquet...")
    target = (
        pl.read_csv(
            FILTERED_MAIN,
            columns=[ID_COL],
            encoding=CSV_ENCODING,
            ignore_errors=True,
        )
        .select(pl.col(ID_COL).cast(pl.Utf8))
        .unique()
    )
    target.write_parquet(TARGET_IDS_PARQUET)
    print(f"Saved: {TARGET_IDS_PARQUET} | unique ids = {target.height}")
else:
    print(f"Found existing: {TARGET_IDS_PARQUET}")


Building target ids parquet...
Saved: ./target_ids.parquet | unique ids = 5092683


In [50]:
# ===== 2) Extract per-year parquet parts =====
# ===== 2) 逐年抽取：原始年度CSV -> extra_parts/*.parquet（保留全部多行，不去重）=====

files = sorted(glob(RAW_GLOB))
if not files:
    raise RuntimeError(f"No csv files found: {RAW_GLOB}")

target_lf = pl.scan_parquet(TARGET_IDS_PARQUET).select(pl.col(ID_COL).cast(pl.Utf8))

for csv_path in files:
    base = os.path.basename(csv_path)
    out_path = os.path.join(EXTRA_PARTS_DIR, base.replace(".csv", ".parquet"))

    if os.path.exists(out_path):
        print(f"[skip] {out_path}")
        continue

    print(f"[scan] {base}")

    raw_lf = (
        pl.scan_csv(
            csv_path,
            encoding=CSV_ENCODING,
            infer_schema_length=0,  # 避免 schema 推断不准
            ignore_errors=True,     # 遇到坏行尽量跳过
        )
        .select([pl.col(c) for c in NEED_COLS])      # 列裁剪（关键）
        .with_columns(pl.col(ID_COL).cast(pl.Utf8))  # join key 统一类型
        .join(target_lf, on=ID_COL, how="inner")     # 行过滤（关键）
    )

    df_part = raw_lf.collect(streaming=True)
    df_part.write_parquet(out_path)
    print(f"[wrote] {out_path} | rows={df_part.height}")


[scan] 中国专利数据库1985年.csv
[wrote] ./extra_parts\中国专利数据库1985年.parquet | rows=181
[scan] 中国专利数据库1986年.csv
[wrote] ./extra_parts\中国专利数据库1986年.parquet | rows=1352
[scan] 中国专利数据库1987年.csv
[wrote] ./extra_parts\中国专利数据库1987年.parquet | rows=1972
[scan] 中国专利数据库1988年.csv
[wrote] ./extra_parts\中国专利数据库1988年.parquet | rows=2509
[scan] 中国专利数据库1989年.csv


  df_part = raw_lf.collect(streaming=True)


[wrote] ./extra_parts\中国专利数据库1989年.parquet | rows=2835
[scan] 中国专利数据库1990年.csv
[wrote] ./extra_parts\中国专利数据库1990年.parquet | rows=3228
[scan] 中国专利数据库1991年.csv
[wrote] ./extra_parts\中国专利数据库1991年.parquet | rows=3244
[scan] 中国专利数据库1992年.csv
[wrote] ./extra_parts\中国专利数据库1992年.parquet | rows=3740
[scan] 中国专利数据库1993年.csv
[wrote] ./extra_parts\中国专利数据库1993年.parquet | rows=3768
[scan] 中国专利数据库1994年.csv
[wrote] ./extra_parts\中国专利数据库1994年.parquet | rows=4287
[scan] 中国专利数据库1995年.csv
[wrote] ./extra_parts\中国专利数据库1995年.parquet | rows=4272
[scan] 中国专利数据库1996年.csv
[wrote] ./extra_parts\中国专利数据库1996年.parquet | rows=4453
[scan] 中国专利数据库1997年.csv
[wrote] ./extra_parts\中国专利数据库1997年.parquet | rows=5220
[scan] 中国专利数据库1998年.csv
[wrote] ./extra_parts\中国专利数据库1998年.parquet | rows=6426
[scan] 中国专利数据库1999年.csv
[wrote] ./extra_parts\中国专利数据库1999年.parquet | rows=7437
[scan] 中国专利数据库2000年.csv
[wrote] ./extra_parts\中国专利数据库2000年.parquet | rows=12546
[scan] 中国专利数据库2001年.csv
[wrote] ./extra_parts\中国专利数据库2001年.parquet | rows=1

In [51]:
# ===== 3) Merge parts into extra_all =====
# ===== 3) 合并所有年度 part -> extra_all.parquet（仍然保留全部多行）=====

if not os.path.exists(EXTRA_ALL_PARQUET):
    parts_glob = os.path.join(EXTRA_PARTS_DIR, "*.parquet")
    print("Merging:", parts_glob)

    extra_all_lf = pl.scan_parquet(parts_glob)
    extra_all_lf.collect(streaming=True).write_parquet(EXTRA_ALL_PARQUET)
    print(f"Saved: {EXTRA_ALL_PARQUET}")
else:
    print(f"Found existing: {EXTRA_ALL_PARQUET}")


Merging: ./extra_parts\*.parquet


  extra_all_lf.collect(streaming=True).write_parquet(EXTRA_ALL_PARQUET)


Saved: ./extra_all.parquet


In [52]:
# ===== 4) Check duplicates in extra_all =====
# ===== 4) 重复检查 + 类型验证：统计申请号重复情况；统计“专利类型”分布；抽 10 个重复申请号例子 =====

extra_all_lf = pl.scan_parquet(EXTRA_ALL_PARQUET).with_columns(pl.col(ID_COL).cast(pl.Utf8))

rows_total = extra_all_lf.select(pl.len()).collect().item()
unique_ids = extra_all_lf.select(pl.col(ID_COL).n_unique()).collect().item()
dup_rows = rows_total - unique_ids

dup_id_cnt = (
    extra_all_lf
    .group_by(ID_COL)
    .agg(pl.len().alias("cnt"))
    .filter(pl.col("cnt") > 1)
    .select(pl.len())
    .collect()
    .item()
)

print(f"rows_total={rows_total:,}")
print(f"unique_ids={unique_ids:,}")
print(f"duplicate_rows={dup_rows:,} (ratio={dup_rows/rows_total:.6f})")
print(f"duplicate_ids(count>1)={dup_id_cnt:,}")

# ---- (A) 统计专利类型分布（验证你的“申请 vs 授权”猜想）----

type_stats = (
    extra_all_lf
    .group_by("专利类型")
    .agg(pl.len().alias("rows"))
    .sort("rows", descending=True)
    .collect()
)
print("\n[专利类型 行数统计]")
print(type_stats)

# 统计“按申请号去重后”的类型分布（看每个申请号是否同时存在两种类型）
# 这里不是最终方案，只是帮助你理解结构
type_per_id = (
    extra_all_lf
    .group_by(ID_COL)
    .agg(pl.col("专利类型").n_unique().alias("type_n_unique"))
    .group_by("type_n_unique")
    .agg(pl.len().alias("id_cnt"))
    .sort("type_n_unique")
    .collect()
)
print("\n[每个申请号有多少种类型(type_n_unique) 的分布]")
print(type_per_id)

# ---- (B) 抽 10 个重复申请号例子，并把这些申请号的所有行打出来 ----
dup_ids_10 = (
    extra_all_lf
    .group_by(ID_COL)
    .agg(pl.len().alias("cnt"))
    .filter(pl.col("cnt") > 1)
    .sort("cnt", descending=True)
    .select(ID_COL, "cnt")
    .limit(10)
    .collect()
)

print("\n[重复申请号 Top10（按出现次数）]")
print(dup_ids_10)

ids_list = dup_ids_10[ID_COL].to_list()

examples = (
    extra_all_lf
    .filter(pl.col(ID_COL).is_in(ids_list))
    .collect()
)

# 为了更易读，按申请号排序（这只是展示排序，不影响你整体性能）
examples = examples.sort(ID_COL)

print("\n[Top10 重复申请号的所有行示例（用于肉眼验证）]")
examples.head(200)  # 如果太多可以调小/调大

rows_total=10,439,900
unique_ids=5,092,683
duplicate_rows=5,347,217 (ratio=0.512190)
duplicate_ids(count>1)=5,092,683

[专利类型 行数统计]
shape: (2, 2)
┌──────────┬─────────┐
│ 专利类型 ┆ rows    │
│ ---      ┆ ---     │
│ str      ┆ u32     │
╞══════════╪═════════╡
│ 发明申请 ┆ 5238482 │
│ 发明授权 ┆ 5201418 │
└──────────┴─────────┘

[每个申请号有多少种类型(type_n_unique) 的分布]
shape: (1, 2)
┌───────────────┬─────────┐
│ type_n_unique ┆ id_cnt  │
│ ---           ┆ ---     │
│ u32           ┆ u32     │
╞═══════════════╪═════════╡
│ 2             ┆ 5092683 │
└───────────────┴─────────┘

[重复申请号 Top10（按出现次数）]
shape: (10, 2)
┌──────────────────┬─────┐
│ 申请号           ┆ cnt │
│ ---              ┆ --- │
│ str              ┆ u32 │
╞══════════════════╪═════╡
│ CN201310247773.5 ┆ 9   │
│ CN201810082449.5 ┆ 9   │
│ CN201710906907.8 ┆ 9   │
│ CN201310245984.5 ┆ 9   │
│ CN201310244387.0 ┆ 9   │
│ CN202211450960.9 ┆ 8   │
│ CN201510881121.6 ┆ 8   │
│ CN201710916918.4 ┆ 7   │
│ CN201710939799.4 ┆ 7   │
│ CN201710920020.4 ┆ 7   │


申请号,专利类型,公开公告年份,IPC主分类号,专利权人类型,统一社会信用代码,引证次数,被引证次数,自引次数,他引次数,被自引次数,被他引次数,家族引证次数,家族被引证次数
str,str,str,str,str,str,str,str,str,str,str,str,str,str
"""CN201310244387.0""","""发明申请""","""2013""","""F16K17/30""",,,"""8.0""","""1.0""","""2.0""","""6.0""",,"""1.0""","""8.0""","""1.0"""
"""CN201310244387.0""","""发明申请""","""2013""","""F16K17/30""",,,"""8.0""","""1.0""","""2.0""","""6.0""",,"""1.0""","""8.0""","""1.0"""
"""CN201310244387.0""","""发明申请""","""2013""","""F16K17/30""",,,"""8.0""","""1.0""","""2.0""","""6.0""",,"""1.0""","""8.0""","""1.0"""
"""CN201310244387.0""","""发明授权""","""2015""","""F16K17/30""","""有限责任公司""","""91350182337596233K""","""8.0""",,,"""8.0""",,,"""8.0""","""1.0"""
"""CN201310244387.0""","""发明授权""","""2015""","""F16K17/30""","""有限责任公司""","""91350182337596233K""","""8.0""",,,"""8.0""",,,"""8.0""","""1.0"""
…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""CN202211450960.9""","""发明申请""","""2024""","""A61B10/00""","""其他有限责任公司""","""912301103009300153""","""21.0""",,,"""21.0""",,,"""21.0""",
"""CN202211450960.9""","""发明申请""","""2024""","""A61B10/00""","""其他有限责任公司""","""912301103009300153""","""21.0""",,,"""21.0""",,,"""21.0""",
"""CN202211450960.9""","""发明授权""","""2024""","""A61B10/00""","""其他有限责任公司""","""912301103009300153""","""21.0""",,,"""21.0""",,,"""21.0""",
"""CN202211450960.9""","""发明授权""","""2024""","""A61B10/00""","""其他有限责任公司""","""912301103009300153""","""21.0""",,,"""21.0""",,,"""21.0""",


In [53]:
import polars as pl

# ===== 4.1) 在 extra_all 上，对每种专利类型单独算重复指标 =====
extra_all_lf = pl.scan_parquet(EXTRA_ALL_PARQUET).with_columns(pl.col(ID_COL).cast(pl.Utf8))

def dup_stats(lf: pl.LazyFrame, name: str):
    rows_total = lf.select(pl.len()).collect().item()
    unique_ids = lf.select(pl.col(ID_COL).n_unique()).collect().item()
    dup_rows = rows_total - unique_ids
    dup_id_cnt = (
        lf.group_by(ID_COL)
          .agg(pl.len().alias("cnt"))
          .filter(pl.col("cnt") > 1)
          .select(pl.len())
          .collect()
          .item()
    )
    print(f"\n[{name}]")
    print(f"rows_total={rows_total:,}")
    print(f"unique_ids={unique_ids:,}")
    print(f"duplicate_rows={dup_rows:,} (ratio={dup_rows/rows_total if rows_total else 0:.6f})")
    print(f"duplicate_ids(count>1)={dup_id_cnt:,}")

# 发明授权子集
auth_lf = extra_all_lf.filter(pl.col("专利类型") == "发明授权")
dup_stats(auth_lf, "发明授权 子集重复统计")

# 发明申请子集
app_lf = extra_all_lf.filter(pl.col("专利类型") == "发明申请")
dup_stats(app_lf, "发明申请 子集重复统计")



[发明授权 子集重复统计]
rows_total=5,201,418
unique_ids=5,092,683
duplicate_rows=108,735 (ratio=0.020905)
duplicate_ids(count>1)=104,244

[发明申请 子集重复统计]
rows_total=5,238,482
unique_ids=5,092,683
duplicate_rows=145,799 (ratio=0.027832)
duplicate_ids(count>1)=139,835


In [57]:
# ===== 4.2) 找“发明授权”内部重复的申请号 Top10，并打印这些申请号的授权行明细 =====

auth_dup_top10 = (
    auth_lf
    .group_by(ID_COL)
    .agg(pl.len().alias("cnt"))
    .filter(pl.col("cnt") > 1)
    .sort("cnt", descending=True)
    .limit(10)
    .collect()
)

print("\n[发明授权内部重复 Top10]")
print(auth_dup_top10)

auth_ids = auth_dup_top10[ID_COL].to_list()

auth_examples = (
    auth_lf
    .filter(pl.col(ID_COL).is_in(auth_ids))
    .collect()
    .sort([ID_COL])  # 只为展示更清楚
)

print("\n[发明授权内部重复 Top10 的行明细（用于肉眼看重复原因）]")
for pid, sub in auth_examples.group_by("申请号"):
    print("\n" + "=" * 80)
    print("申请号:", pid)
    for row in sub.iter_rows(named=True):
        print(row)


[发明授权内部重复 Top10]
shape: (10, 2)
┌──────────────────┬─────┐
│ 申请号           ┆ cnt │
│ ---              ┆ --- │
│ str              ┆ u32 │
╞══════════════════╪═════╡
│ CN201310567446.8 ┆ 4   │
│ CN202210835009.9 ┆ 4   │
│ CN201710940197.0 ┆ 4   │
│ CN201310558445.7 ┆ 4   │
│ CN202210855896.6 ┆ 4   │
│ CN202210778028.2 ┆ 4   │
│ CN201811385756.7 ┆ 4   │
│ CN202210876308.7 ┆ 4   │
│ CN201310577266.8 ┆ 4   │
│ CN201310217255.9 ┆ 4   │
└──────────────────┴─────┘

[发明授权内部重复 Top10 的行明细（用于肉眼看重复原因）]

申请号: ('CN201310217255.9',)
{'申请号': 'CN201310217255.9', '专利类型': '发明授权', '公开公告年份': '2016', 'IPC主分类号': 'G05B19/418', '专利权人类型': '有限责任公司', '统一社会信用代码': '91110102580865018T', '引证次数': None, '被引证次数': None, '自引次数': None, '他引次数': None, '被自引次数': None, '被他引次数': None, '家族引证次数': '7.0', '家族被引证次数': '21.0'}
{'申请号': 'CN201310217255.9', '专利类型': '发明授权', '公开公告年份': '2016', 'IPC主分类号': 'G05B19/418', '专利权人类型': None, '统一社会信用代码': None, '引证次数': None, '被引证次数': None, '自引次数': None, '他引次数': None, '被自引次数': None, '被他引次数': None, '家族引

In [58]:
# ===== 4.3) 判断：发明授权内部的重复，是“维度展开”还是“纯重复脏数据” =====
# 思路：
# 对发明授权子集，构造一个“关键列组合键”（除申请号以外的列）
# 如果同一申请号下，这些关键列组合也重复很多次 -> 可能存在完全重复行（脏重复）
# 如果关键列组合基本不重复 -> 更多是维度展开（比如权利人/信用代码/IPC不同）

KEY_COLS = [c for c in NEED_COLS if c != ID_COL]  # 你抽出来的所有字段，除了申请号
# 可选：如果你觉得某些列本来就会缺失/噪声，可以从 KEY_COLS 里去掉

auth_rep = (
    auth_lf
    .group_by(ID_COL)
    .agg([
        pl.len().alias("rows"),
        pl.struct(KEY_COLS).n_unique().alias("unique_rows_by_key")  # 关键列组合的不同取值数量
    ])
    .with_columns((pl.col("rows") - pl.col("unique_rows_by_key")).alias("exact_dup_rows_by_key"))
)

summary = (
    auth_rep
    .select([
        pl.len().alias("id_total"),
        (pl.col("rows") > 1).sum().alias("id_with_multi_rows"),
        (pl.col("exact_dup_rows_by_key") > 0).sum().alias("id_with_exact_dup"),
        pl.sum("exact_dup_rows_by_key").alias("exact_dup_rows_total"),
    ])
    .collect()
)

print("\n[发明授权内部：是否存在“完全重复行”的总体统计]")
print(summary)

# 再抽几个“完全重复行很多”的申请号看看
top_exact_dup = (
    auth_rep
    .filter(pl.col("exact_dup_rows_by_key") > 0)
    .sort("exact_dup_rows_by_key", descending=True)
    .limit(10)
    .collect()
)
print("\n[发明授权内部：完全重复行最多的申请号 Top10]")
print(top_exact_dup)



[发明授权内部：是否存在“完全重复行”的总体统计]
shape: (1, 4)
┌──────────┬────────────────────┬───────────────────┬──────────────────────┐
│ id_total ┆ id_with_multi_rows ┆ id_with_exact_dup ┆ exact_dup_rows_total │
│ ---      ┆ ---                ┆ ---               ┆ ---                  │
│ u32      ┆ u32                ┆ u32               ┆ u32                  │
╞══════════╪════════════════════╪═══════════════════╪══════════════════════╡
│ 5092683  ┆ 104244             ┆ 89395             ┆ 93208                │
└──────────┴────────────────────┴───────────────────┴──────────────────────┘

[发明授权内部：完全重复行最多的申请号 Top10]
shape: (10, 4)
┌──────────────────┬──────┬────────────────────┬───────────────────────┐
│ 申请号           ┆ rows ┆ unique_rows_by_key ┆ exact_dup_rows_by_key │
│ ---              ┆ ---  ┆ ---                ┆ ---                   │
│ str              ┆ u32  ┆ u32                ┆ u32                   │
╞══════════════════╪══════╪════════════════════╪═══════════════════════╡
│ CN20221077802

In [65]:
# ===== 5) Deduplicate extra_all by 申请号（最终版）=====
# 目标：
# 1) 只保留【发明授权】记录
# 2) 引用相关列：统一转 Int64；如果同一申请号仍有多个不同值 -> 取 max（消解冲突）
# 3) 其他列：逐列“非空覆盖合并”（优先取非空）；若出现多个不同非空值 -> 记录到 conf 文件
# 4) 引用类的冲突仍输出统计（可选导出清单），但 dedup 结果里用 max 解决

import os
import polars as pl

EXTRA_ALL_DEDUP_PARQUET = os.path.join(WORK_DIR, "extra_all_dedup.parquet")
# 只记录“非引用列仍冲突”的明细
CONFLICT_OTHER_DETAIL_PARQUET = os.path.join(WORK_DIR, "conflicts_other_detail.parquet")
CONFLICT_OTHER_SUMMARY_PARQUET = os.path.join(WORK_DIR, "conflicts_other_summary.parquet")
# 引用列冲突的申请号清单（可选）
CONFLICT_CITATION_IDS_PARQUET = os.path.join(WORK_DIR, "conflicts_citation_ids.parquet")

ID_COL = "申请号"
TYPE_COL = "专利类型"
AUTH_VALUE = "发明授权"

# ===== 引用类列（全部转 Int64，并在聚合时取 max）=====
CITATION_COLS = [
    "引证次数",
    "被引证次数",
    "自引次数",
    "他引次数",
    "被自引次数",
    "被他引次数",
    "家族引证次数",
    "家族被引证次数",
]

# 非引用列：仍然做“非空覆盖合并”，但如果出现多个不同非空值 -> 记录到conf
OTHER_COLS = [c for c in NEED_COLS if c not in CITATION_COLS and c != ID_COL]

def cast_citation_to_int(colname: str) -> pl.Expr:
    """
    把 "5" / "5.0" / 5.0 / None → Int64
    非法值自动变 null
    """
    return (
        pl.col(colname)
        .cast(pl.Float64, strict=False)
        .cast(pl.Int64, strict=False)
        .alias(colname)
    )

def pick_non_null(colname: str) -> pl.Expr:
    """
    非引用列的合并策略：取任意一个非空值（通常同一申请号应一致）
    - 全空 -> null
    - 有非空 -> 取第一个非空值
    """
    return pl.col(colname).drop_nulls().first().alias(colname)

if not os.path.exists(EXTRA_ALL_DEDUP_PARQUET):
    print("Stage 5 (final): 发明授权 + 引用列max + 其他列非空覆盖 + 其他冲突落盘 ...")

    # ===== (1) 读 extra_all，只保留需要列，并筛选发明授权 =====
    lf = (
        pl.scan_parquet(EXTRA_ALL_PARQUET)
        .select([pl.col(c) for c in NEED_COLS])
        .with_columns(pl.col(ID_COL).cast(pl.Utf8))
        .filter(pl.col(TYPE_COL) == AUTH_VALUE)
    )

    # ===== (2) 引用列统一转 Int64（先规范类型，避免 5 vs 5.0 伪冲突）=====
    lf = lf.with_columns([cast_citation_to_int(c) for c in CITATION_COLS])

    # ===== (3) 统计“引用列仍冲突”的申请号（但 dedup 会用 max 消解）=====
    # 这里的冲突定义：同一申请号某引用列存在多个不同的非空值
    citation_conf_summary = (
        lf.group_by(ID_COL)
          .agg([
              pl.col(c).drop_nulls().n_unique().alias(f"{c}__nn_nunique")
              for c in CITATION_COLS
          ])
          .with_columns([
              pl.sum_horizontal([
                  (pl.col(f"{c}__nn_nunique") > 1).cast(pl.Int32)
                  for c in CITATION_COLS
              ]).alias("citation_conflict_col_cnt")
          ])
    )

    citation_conf_ids = (
        citation_conf_summary
        .filter(pl.col("citation_conflict_col_cnt") > 0)
        .select(ID_COL)
    )

    citation_conf_cnt = citation_conf_ids.select(pl.len()).collect().item()
    print(f"引用列仍冲突的申请号数量（会用max消解）: {citation_conf_cnt:,}")

    # 可选：导出“引用冲突申请号清单”
    if citation_conf_cnt > 0 and not os.path.exists(CONFLICT_CITATION_IDS_PARQUET):
        citation_conf_ids.collect(engine="streaming").write_parquet(CONFLICT_CITATION_IDS_PARQUET)
        print("Wrote:", CONFLICT_CITATION_IDS_PARQUET)

    # ===== (4) 检测“非引用列”冲突：若同一申请号某非引用列出现多个不同非空值 -> 记录conf =====
    other_conf_summary = (
        lf.group_by(ID_COL)
          .agg([
              pl.col(c).drop_nulls().n_unique().alias(f"{c}__nn_nunique")
              for c in OTHER_COLS
          ])
          .with_columns([
              pl.sum_horizontal([
                  (pl.col(f"{c}__nn_nunique") > 1).cast(pl.Int32)
                  for c in OTHER_COLS
              ]).alias("other_conflict_col_cnt")
          ])
          .filter(pl.col("other_conflict_col_cnt") > 0)
    )

    other_conf_cnt = other_conf_summary.select(pl.len()).collect().item()
    print(f"非引用列仍冲突的申请号数量（将落盘供你检查）: {other_conf_cnt:,}")

    # 导出冲突摘要（每个申请号哪些列 nunique>1）
    if other_conf_cnt > 0 and not os.path.exists(CONFLICT_OTHER_SUMMARY_PARQUET):
        other_conf_summary.collect(engine="streaming").write_parquet(CONFLICT_OTHER_SUMMARY_PARQUET)
        print("Wrote:", CONFLICT_OTHER_SUMMARY_PARQUET)

    # 导出冲突明细（原始行级数据，便于你肉眼看）
    if other_conf_cnt > 0 and not os.path.exists(CONFLICT_OTHER_DETAIL_PARQUET):
        other_conf_ids = other_conf_summary.select(ID_COL)
        other_conf_detail = (
            lf.join(other_conf_ids, on=ID_COL, how="inner")
              .collect(engine="streaming")
        )
        other_conf_detail.write_parquet(CONFLICT_OTHER_DETAIL_PARQUET)
        print("Wrote:", CONFLICT_OTHER_DETAIL_PARQUET, "| rows=", other_conf_detail.height)

    # ===== (5) 真正的 dedup 聚合：一申请号一行 =====
    # - 非引用列：取第一个非空（覆盖空值）
    # - 引用列：取 max（消解冲突）
    agg_exprs = []
    agg_exprs += [pick_non_null(c) for c in OTHER_COLS]
    agg_exprs += [pl.col(c).max().alias(c) for c in CITATION_COLS]  # 关键：引用列用 max

    extra_dedup = (
        lf.group_by(ID_COL)
          .agg(agg_exprs)
          .collect(engine="streaming")
    )

    extra_dedup.write_parquet(EXTRA_ALL_DEDUP_PARQUET)
    print(f"Saved: {EXTRA_ALL_DEDUP_PARQUET} | rows={extra_dedup.height:,}")

else:
    print(f"Found existing: {EXTRA_ALL_DEDUP_PARQUET}")
    if os.path.exists(CONFLICT_OTHER_SUMMARY_PARQUET):
        print(f"Found existing: {CONFLICT_OTHER_SUMMARY_PARQUET}")
    if os.path.exists(CONFLICT_OTHER_DETAIL_PARQUET):
        print(f"Found existing: {CONFLICT_OTHER_DETAIL_PARQUET}")
    if os.path.exists(CONFLICT_CITATION_IDS_PARQUET):
        print(f"Found existing: {CONFLICT_CITATION_IDS_PARQUET}")


Stage 5 (final): 发明授权 + 引用列max + 其他列非空覆盖 + 其他冲突落盘 ...
引用列仍冲突的申请号数量（会用max消解）: 555
Wrote: ./conflicts_citation_ids.parquet
非引用列仍冲突的申请号数量（将落盘供你检查）: 347
Wrote: ./conflicts_other_summary.parquet
Wrote: ./conflicts_other_detail.parquet | rows= 713
Saved: ./extra_all_dedup.parquet | rows=5,092,683


In [69]:
CITATION_COLS = [
    "引证次数",
    "被引证次数",
    "自引次数",
    "他引次数",
    "被自引次数",
    "被他引次数",
    "家族引证次数",
    "家族被引证次数",
]

for c in CITATION_COLS:
    print(c, "->", dedup.schema.get(c))
dedup.select([
    pl.col("引证次数").min().alias("min"),
    pl.col("引证次数").max().alias("max"),
    pl.col("引证次数").null_count().alias("nulls"),
])

引证次数 -> Int64
被引证次数 -> Int64
自引次数 -> Int64
他引次数 -> Int64
被自引次数 -> Int64
被他引次数 -> Int64
家族引证次数 -> Int64
家族被引证次数 -> Int64


min,max,nulls
i64,i64,u32
1,588,1139287


In [73]:
import polars as pl

conf = pl.read_parquet("./conflicts_other_summary.parquet")

MAX_IDS = 20        # 最多看多少个申请号
MAX_ROWS_PER_ID = 20  # 每个申请号最多打印多少行（防止炸屏）

cnt = 0
for pid, sub in conf.group_by("申请号"):
    cnt += 1
    if cnt > MAX_IDS:
        break

    print("\n" + "=" * 80)
    print("申请号:", pid)

    for i, row in enumerate(sub.iter_rows(named=True)):
        if i >= MAX_ROWS_PER_ID:
            print(f"... ({sub.height - MAX_ROWS_PER_ID} more rows)")
            break
        print(row)



申请号: ('CN201510129718.5',)
{'申请号': 'CN201510129718.5', '专利类型__nn_nunique': 1, '公开公告年份__nn_nunique': 2, 'IPC主分类号__nn_nunique': 1, '专利权人类型__nn_nunique': 1, '统一社会信用代码__nn_nunique': 1, 'other_conflict_col_cnt': 1}

申请号: ('CN201910771390.5',)
{'申请号': 'CN201910771390.5', '专利类型__nn_nunique': 1, '公开公告年份__nn_nunique': 1, 'IPC主分类号__nn_nunique': 1, '专利权人类型__nn_nunique': 2, '统一社会信用代码__nn_nunique': 2, 'other_conflict_col_cnt': 2}

申请号: ('CN202011436796.7',)
{'申请号': 'CN202011436796.7', '专利类型__nn_nunique': 1, '公开公告年份__nn_nunique': 2, 'IPC主分类号__nn_nunique': 1, '专利权人类型__nn_nunique': 1, '统一社会信用代码__nn_nunique': 1, 'other_conflict_col_cnt': 1}

申请号: ('CN201410457843.4',)
{'申请号': 'CN201410457843.4', '专利类型__nn_nunique': 1, '公开公告年份__nn_nunique': 2, 'IPC主分类号__nn_nunique': 1, '专利权人类型__nn_nunique': 1, '统一社会信用代码__nn_nunique': 1, 'other_conflict_col_cnt': 1}

申请号: ('CN201610221791.X',)
{'申请号': 'CN201610221791.X', '专利类型__nn_nunique': 1, '公开公告年份__nn_nunique': 2, 'IPC主分类号__nn_nunique': 1, '专利权人类型__nn_nunique': 0, '

In [74]:
import polars as pl

conf = pl.read_parquet("./conflicts_other_detail.parquet")

MAX_IDS = 50        # 最多看多少个申请号
MAX_ROWS_PER_ID = 20  # 每个申请号最多打印多少行（防止炸屏）

cnt = 0
for pid, sub in conf.group_by("申请号"):
    cnt += 1
    if cnt > MAX_IDS:
        break

    print("\n" + "=" * 80)
    print("申请号:", pid)

    for i, row in enumerate(sub.iter_rows(named=True)):
        if i >= MAX_ROWS_PER_ID:
            print(f"... ({sub.height - MAX_ROWS_PER_ID} more rows)")
            break
        print(row)



申请号: ('CN202010848938.4',)
{'申请号': 'CN202010848938.4', '专利类型': '发明授权', '公开公告年份': '2022', 'IPC主分类号': 'G01F15/00', '专利权人类型': '有限责任公司(自然人独资)', '统一社会信用代码': '913713027892566276', '引证次数': 6, '被引证次数': None, '自引次数': None, '他引次数': 6, '被自引次数': None, '被他引次数': None, '家族引证次数': 6, '家族被引证次数': None}
{'申请号': 'CN202010848938.4', '专利类型': '发明授权', '公开公告年份': '2023', 'IPC主分类号': 'G01F15/00', '专利权人类型': '有限责任公司', '统一社会信用代码': '91440300326621851M', '引证次数': 5, '被引证次数': None, '自引次数': None, '他引次数': 5, '被自引次数': None, '被他引次数': None, '家族引证次数': 6, '家族被引证次数': None}

申请号: ('CN201610336278.5',)
{'申请号': 'CN201610336278.5', '专利类型': '发明授权', '公开公告年份': '2019', 'IPC主分类号': 'A01N47/36', '专利权人类型': '有限责任公司(自然人投资或控股的法人独资)', '统一社会信用代码': '916101247669689556', '引证次数': 4, '被引证次数': None, '自引次数': None, '他引次数': 4, '被自引次数': None, '被他引次数': None, '家族引证次数': 4, '家族被引证次数': 1}
{'申请号': 'CN201610336278.5', '专利类型': '发明授权', '公开公告年份': '2018', 'IPC主分类号': 'A01N47/36', '专利权人类型': '有限责任公司(自然人投资或控股的法人独资)', '统一社会信用代码': '916101247669689556', '引证次数': 4, '被引证次数

In [76]:
import polars as pl

conf = pl.read_parquet("./conflicts_citation_ids.parquet")

MAX_IDS = 10        # 最多看多少个申请号
MAX_ROWS_PER_ID = 20  # 每个申请号最多打印多少行（防止炸屏）

cnt = 0
for pid, sub in conf.group_by("申请号"):
    cnt += 1
    if cnt > MAX_IDS:
        break

    print("\n" + "=" * 80)
    print("申请号:", pid)

    for i, row in enumerate(sub.iter_rows(named=True)):
        if i >= MAX_ROWS_PER_ID:
            print(f"... ({sub.height - MAX_ROWS_PER_ID} more rows)")
            break
        print(row)



申请号: ('CN201910844347.7',)
{'申请号': 'CN201910844347.7'}

申请号: ('CN201910882353.1',)
{'申请号': 'CN201910882353.1'}

申请号: ('CN201910876843.0',)
{'申请号': 'CN201910876843.0'}

申请号: ('CN201910844122.1',)
{'申请号': 'CN201910844122.1'}

申请号: ('CN201510877720.0',)
{'申请号': 'CN201510877720.0'}

申请号: ('CN201910823447.1',)
{'申请号': 'CN201910823447.1'}

申请号: ('CN201110080906.5',)
{'申请号': 'CN201110080906.5'}

申请号: ('CN201810458508.4',)
{'申请号': 'CN201810458508.4'}

申请号: ('CN201910899083.5',)
{'申请号': 'CN201910899083.5'}

申请号: ('CN201910877332.0',)
{'申请号': 'CN201910877332.0'}


In [77]:
# ===== 6) Convert main CSV to Parquet =====
if not os.path.exists(MAIN_PARQUET):
    print("Converting main CSV -> Parquet ...")
    main_df = pl.read_csv(FILTERED_MAIN, encoding=CSV_ENCODING, ignore_errors=True)
    # 确保 join key 类型一致
    main_df = main_df.with_columns(pl.col(ID_COL).cast(pl.Utf8))
    main_df.write_parquet(MAIN_PARQUET)
    print(f"Saved: {MAIN_PARQUET} | rows={main_df.height:,} cols={len(main_df.columns)}")
else:
    print(f"Found existing: {MAIN_PARQUET}")


Converting main CSV -> Parquet ...
Saved: ./main.parquet | rows=5,092,683 cols=10


In [86]:
# ===== 7) Join and output final (avoid column-name collisions) =====
if not os.path.exists(FINAL_PARQUET):
    print("Joining main + extra ...")

    main_lf = pl.scan_parquet(MAIN_PARQUET).with_columns(pl.col(ID_COL).cast(pl.Utf8))
    extra_lf = pl.scan_parquet(EXTRA_ALL_DEDUP_PARQUET).with_columns(pl.col(ID_COL).cast(pl.Utf8))

    # 读取 main 的列名（很小，collect schema 不会贵）
    main_cols = pl.read_parquet(MAIN_PARQUET, n_rows=0).columns

    # extra 只保留 join key + main 中没有的列
    extra_cols = [c for c in pl.read_parquet(EXTRA_ALL_DEDUP_PARQUET, n_rows=0).columns
                  if (c == ID_COL) or (c not in main_cols)]
    extra_lf = extra_lf.select(extra_cols)

    final_df = (
        main_lf
        .join(extra_lf, on=ID_COL, how="left")
        .sort("申请年份")
        .collect(engine="streaming")
    )

    final_df.write_parquet(FINAL_PARQUET)
    print(f"Saved: {FINAL_PARQUET} | rows={final_df.height:,} cols={len(final_df.columns)}")
else:
    print(f"Found existing: {FINAL_PARQUET}")


Joining main + extra ...


Saved: ./main_enriched.parquet | rows=5,092,683 cols=23


In [87]:
import polars as pl

# df = pl.read_parquet("./extra_all.parquet")
df = pl.read_parquet("./main_enriched.parquet")

df.head(10)


申请号,申请年份,专利名称,BS,FS,Quality_q,申请人,申请人类型,申请人地址,申请人城市,专利类型,公开公告年份,IPC主分类号,专利权人类型,统一社会信用代码,引证次数,被引证次数,自引次数,他引次数,被自引次数,被他引次数,家族引证次数,家族被引证次数
str,i64,str,f64,f64,f64,str,str,str,str,str,str,str,str,str,i64,i64,i64,i64,i64,i64,i64,i64
"""CN85106332.2""",1985,"""一种氯化亚锡的制备方法""",0.0,0.0,0.0,"""个旧市化工一厂""","""企业""","""云南省个旧市鸡街""","""红河哈尼族彝族自治州""","""发明授权""","""1986""","""C01G19/06""",,,,18.0,,,,18.0,,26.0
"""CN85100985.9""",1985,"""同步电动机失步保护装置""",0.0,0.0,0.0,"""核工业部电机运行技术开发公司""","""企业""","""北京市海淀区阜成路桥西""","""北京市""","""发明授权""","""1986""","""H02H7/06""",,,,,,,,,1.0,2.0
"""CN85103449.7""",1985,"""一种冷硬铸造凸轮轴的工艺方法""",0.0,0.0,0.0,"""清华大学""","""学校""","""北京市海淀区清华园""","""北京市""","""发明授权""","""1986""","""B22C9/22""","""公立""","""12100000400000624D""",,2.0,,,,2.0,1.0,4.0
"""CN85101005.9""",1985,"""高压旋转干喷法和双管旋转干喷器""",0.0,0.0,0.0,"""冶金工业部建筑研究总院""","""科研单位""","""北京市学院路43号""","""北京市""","""发明授权""","""1986""","""E02D3/12""",,,,1.0,,,,1.0,1.0,7.0
"""CN85101438.0""",1985,"""框架式弹性桥机构""",0.0,0.0,0.0,"""唐锦生""","""个人""","""北京市甘家口景王坟乙13西""","""北京市""","""发明授权""","""1986""","""B60G25/00""",,,,1.0,,,1.0,,22.0,19.0
"""CN85101885.8""",1985,"""混合稀土元素萃取分离""",0.0,0.0,0.0,"""中国科学院长春应用化学研究所""","""科研单位""","""吉林省长春市斯大林大街109号""","""长春市""","""发明授权""","""1986""","""C01F17/00""",,"""121000006051000987""",,,,,,,3.0,6.0
"""CN85103584.1""",1985,"""红曲色素多糖片的制造方法""",0.0,0.0,0.0,"""宁夏轻工业设计研究所""","""科研单位""","""宁夏回族自治区银川市育新巷10号""","""银川市""","""发明授权""","""1986""","""C12P17/18""",,,,4.0,,,,4.0,1.0,10.0
"""CN85103140.4""",1985,"""铁路车辆减速顶""",0.0,0.0,0.0,"""哈尔滨铁路局减速顶调速系统研究中心""","""企业""","""黑龙江省哈尔滨市邮政街141号""","""哈尔滨市""","""发明授权""","""1986""","""B61K7/08""","""国有企业""","""912301007028386685""",,7.0,,,2.0,5.0,1.0,9.0
"""CN85100686.8""",1985,"""新钙试剂的合成方法""",0.0,0.0,0.0,"""华东师范大学""","""学校""","""上海市中山北路3663号""","""上海市""","""发明授权""","""1987""","""C07D231/46""",,"""12100000425006133D""",,,,,,,,
"""CN85100899.2""",1985,"""明矾石低碱度水泥及其用途""",0.0,0.0,0.0,"""国家建筑材料工业局建筑材料科学研究院""","""科研单位""","""北京市朝阳区管庄""","""北京市""","""发明授权""","""1987""","""C04B7/345""",,,,,,,,,,


In [88]:
print(df.select([
    pl.col("申请年份").min().alias("min_year"),
    pl.col("申请年份").max().alias("max_year"),
]))

shape: (1, 2)
┌──────────┬──────────┐
│ min_year ┆ max_year │
│ ---      ┆ ---      │
│ i64      ┆ i64      │
╞══════════╪══════════╡
│ 1985     ┆ 2024     │
└──────────┴──────────┘


In [89]:
df.filter(pl.col("申请年份") == 2010).head(5)


申请号,申请年份,专利名称,BS,FS,Quality_q,申请人,申请人类型,申请人地址,申请人城市,专利类型,公开公告年份,IPC主分类号,专利权人类型,统一社会信用代码,引证次数,被引证次数,自引次数,他引次数,被自引次数,被他引次数,家族引证次数,家族被引证次数
str,i64,str,f64,f64,f64,str,str,str,str,str,str,str,str,str,i64,i64,i64,i64,i64,i64,i64,i64
"""CN201010211885.1""",2010,"""基于地表菲涅耳反射的多因素大气偏振建模方法""",258.437403,962.054489,3.722582,"""合肥工业大学""","""学校""","""230009 安徽省合肥市包河区屯溪路193号""","""合肥市""","""发明授权""","""2013""","""G06F17/50""","""公立""","""12100000400016984P""",2.0,,2.0,,,,4,5
"""CN201010560351.X""",2010,"""一种液体型烟草专用复合叶面肥""",43.249207,143.634743,3.321095,"""合肥工业大学""","""学校""","""230009 安徽省合肥市屯溪路193号""","""合肥市""","""发明授权""","""2013""","""C05G3/00""","""公立""","""12100000400016984P""",,,,,,,6,15
"""CN201010209312.5""",2010,"""一种花生杀虫专用肥及其生产方法""",123.341562,384.338497,3.11605,"""安徽省农业科学院土壤肥料研究所""","""科研单位""","""230031 安徽省合肥市农科南路39号土壤肥料研究所""","""合肥市""","""发明授权""","""2013""","""C05G3/02""",,"""12340000485002695K""",6.0,1.0,,6.0,,1.0,6,13
"""CN201010588406.8""",2010,"""一种将柴油发动机改造成汽油发动机的方法""",296.034988,1086.743179,3.670996,"""合肥宝发动力技术有限公司""","""企业""","""230022 安徽省合肥经济技术开发区始信路128号""","""合肥市""","""发明授权""","""2013""","""F02F3/26""","""有限责任公司(自然人投资或控股)""","""91340100051483305C""",3.0,,,3.0,,,3,7
"""CN201010235355.0""",2010,"""一种无糖蜂蜜绿茶""",101.171509,253.805862,2.508669,"""安徽鸿汇食品(集团)有限公司; 纪鸿; 吴其才""","""个人,企业""","""231600 安徽省肥东县龙塘工业聚集区""","""合肥市""","""发明授权""","""2013""","""A23F3/14""","""有限责任公司""","""91340122711728161B""",,,,,,,6,9
