In [36]:
import re
import json
from unidecode import unidecode
import dateparser

def extract_full_dates(text):
    """
    提取 sent 中所有完整日期短语（返回原文短语列表，法语格式）
    """
    # 匹配 "1er février 1877"、"2 mars 1912"、"01 avril 1855" 等
    pattern = r"\b(?:1er|\d{1,2})\s+[a-zéû]+(?:\s+\d{4})"
    results = []
    for m in re.finditer(pattern, text, re.IGNORECASE):
        phrase = text[m.start():m.end()]
        # 只保留能被 dateparser 正确解析为日期的
        dt = dateparser.parse(phrase, languages=["fr"])
        if dt:
            results.append(phrase.strip())
    return results

def replace_incomplete_dates(sent, triples):
    # 提取句中所有完整日期短语（如“1er février 1877”）
    full_dates = extract_full_dates(unidecode(sent))
    # 构建映射表 { (年,月): 原始短语 }
    mapping = {}
    for d in full_dates:
        dt = dateparser.parse(d, languages=["fr"])
        if dt:
            key = (str(dt.year), str(dt.month).zfill(2))
            mapping[key] = d.strip()

    # 批量处理每个 triple
    for triple in triples:
        obj = triple.get("obj", "")
        m = re.fullmatch(r"(\d{4})-(\d{2})", obj)
        if m:
            year, month = m.group(1), m.group(2)
            if (year, month) in mapping:
                # 替换成句中真实时间短语
                triple["obj"] = mapping[(year, month)]
        # 如需处理 yyyy-mm-dd，可继续补充
        m2 = re.fullmatch(r"(\d{4})-(\d{2})-(\d{2})", obj)
        if m2:
            year, month, day = m2.group(1), m2.group(2), int(m2.group(3))
            for dstr, dphrase in mapping.items():
                if dstr == (year, month):
                    if str(day) in dphrase or f"{day:02d}" in dphrase or (day == 1 and "1er" in dphrase):
                        triple["obj"] = dphrase
    return triples

# ====== 文件批量处理 ======

input_file = "C:/Users/jguo/Desktop/PURE-main/data/new_test.jsonl"    # 输入文件路径
output_file = "C:/Users/jguo/Desktop/PURE-main/data/test_fixed.jsonl"  # 输出文件路径

with open(input_file, "r", encoding="utf-8") as fin, open(output_file, "w", encoding="utf-8") as fout:
    for line in fin:
        d = json.loads(line)
        d["triples"] = replace_incomplete_dates(d["sent"], d["triples"])
        fout.write(json.dumps(d, ensure_ascii=False) + "\n")

print("处理完成！")


处理完成！


In [37]:
import json
import dateparser
import re

def fr_date_to_iso(date_str):
    dt = dateparser.parse(date_str, languages=['fr'])
    if dt:
        return dt.strftime("%Y-%m-%d")
    return date_str

def update_triples_date_obj(triples):
    # 对所有 triple 的 obj，若是完整法语日期短语，替换为ISO
    pattern = r"\b(?:1er|\d{1,2})\s+[a-zéû]+(?:\s+\d{4})"
    for triple in triples:
        obj = triple.get("obj", "")
        # 判断是不是法语日期（如“1er fevrier 1877”或“21 mai 1956”等）
        if re.fullmatch(pattern, obj, re.IGNORECASE):
            iso_date = fr_date_to_iso(obj)
            triple["obj"] = iso_date
    return triples

# ========== 批量处理JSONL文件 ==========
input_file = "C:/Users/jguo/Desktop/PURE-main/data/test_fixed.jsonl"
output_file = "C:/Users/jguo/Desktop/PURE-main/data/new_test.jsonl"

with open(input_file, "r", encoding="utf-8") as fin, open(output_file, "w", encoding="utf-8") as fout:
    for line in fin:
        d = json.loads(line)
        d["triples"] = update_triples_date_obj(d["triples"])
        fout.write(json.dumps(d, ensure_ascii=False) + "\n")

print("全部处理完毕！")


全部处理完毕！


In [41]:
# prétraitement du jeu de donées pour supprimer des triplets raisonnés
#ex. à supprimer: "sub": "Boulevard de Sébastopol", "rel": "isLandmarkType", "obj": "thoroughfare" 
#"sub": "Boulevard de Sébastopol", "rel": "hasGeometryChangeOn", "obj": "noTime"
# si le sujet et l'objet entre la relation "hasNewName" sont pareils, on supprime le triplet. {"sub": "Pont Alexandre III", "rel": "hasNewName", "obj": "pont Alexandre III"}
import json

# changer le chemin d'accès aux fichiers d'entrée et de sortie
input_file = "C:/Users/jguo/Desktop/PURE-main/data/new_train.jsonl"
output_file = "C:/Users/jguo/Desktop/PURE-main/data/train_filtered.jsonl"

with open(input_file, "r", encoding="utf-8") as fin, open(output_file, "w", encoding="utf-8") as fout:
    for line in fin:
        data = json.loads(line)
        if "triples" in data:
            new_triples = []
            for triple in data["triples"]:
                rel = triple.get("rel", "")
                obj = triple.get("obj", "")
                sub = triple.get("sub", "")

                # 1：sauter isLandmarkType
                if rel == "isLandmarkType":
                    continue
                # 2：sauter obj == noTime
                if obj == "noTime":
                    continue
                # 3：sauter hasNewName et sub.lower() == obj.lower()
                if rel == "hasNewName" and sub.strip().lower() == obj.strip().lower():
                    continue
                if rel == "hasOldName" and sub.strip().lower() == obj.strip().lower():
                    continue

                # garder ce triplet
                new_triples.append(triple)

            # actualiser les triplets
            data["triples"] = new_triples

        # Chaque ligne de sortie reste un dictionnaire JSON.
        fout.write(json.dumps(data, ensure_ascii=False) + "\n")

print(" Tous les filtres ont été appliqués avec succès. Le fichier de sortie a été enregistré à l’emplacement suivant ：", output_file)

 Tous les filtres ont été appliqués avec succès. Le fichier de sortie a été enregistré à l’emplacement suivant ： C:/Users/jguo/Desktop/PURE-main/data/train_filtered.jsonl


ex. 待处理的数据集: {"id": "11001_ouverture", "sent": "rue georges berger || Ouverture || Décret du 10 avril 1867 (UP)", "triples": [{"sub": "Rue Georges Berger", "rel": "hasGeometryChangeOn", "obj": "1867-04-10"}]} 
我需要做成的数据集是和bert pure的格式一致的
* "doc_key": 原先数据集对应的id内容
* "dataset": "évolutions d'événements"
* "sentences":  "sent"被cambert-base AutoTokenizer后的内容
* "ner": bert识别出来的每个实体的token起始位置以及他们的标签
关于ner部分, 实体的features:
 * *给的原始句子中 (rue georges berger || Ouverture || ... ) || 之前的地点实体统一被标注为LandmarkType
** "1867-04-10"或"du 25 avril 1994"有时间的时间统一被标注成 Time
**|| Classement || 在两个竖线中间的事件类型统一标注成EventTpye
* "relations": 两个实体各自的起始位置以及他们的关系类型(以下是我的数据集的所有关系类型:"hasNameChangeOn","isNumberedOn","hasNewName", "isClassifiedOn", "hasOldName", "appearsOn", "hasGeometryChangeOn")
以上就是我对于输出的文件的要求

In [None]:
!pip install transformers

In [2]:
!pip install unidecode

Collecting unidecode
  Using cached Unidecode-1.4.0-py3-none-any.whl.metadata (13 kB)
Using cached Unidecode-1.4.0-py3-none-any.whl (235 kB)
Installing collected packages: unidecode
Successfully installed unidecode-1.4.0


In [4]:
!pip install dateparser


Collecting dateparser
  Downloading dateparser-1.2.2-py3-none-any.whl.metadata (29 kB)
Collecting pytz>=2024.2 (from dateparser)
  Using cached pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzlocal>=0.2 (from dateparser)
  Downloading tzlocal-5.3.1-py3-none-any.whl.metadata (7.6 kB)
Collecting tzdata (from tzlocal>=0.2->dateparser)
  Using cached tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading dateparser-1.2.2-py3-none-any.whl (315 kB)
Using cached pytz-2025.2-py2.py3-none-any.whl (509 kB)
Downloading tzlocal-5.3.1-py3-none-any.whl (18 kB)
Using cached tzdata-2025.2-py2.py3-none-any.whl (347 kB)
Installing collected packages: pytz, tzdata, tzlocal, dateparser

   ---------------------------------------- 0/4 [pytz]
   ---------------------------------------- 0/4 [pytz]
   ---------------------------------------- 0/4 [pytz]
   ---------------------------------------- 0/4 [pytz]
   ---------------------------------------- 0/4 [pytz]
   ---------------------

In [42]:
import json
from transformers import AutoTokenizer
from unidecode import unidecode
import dateparser
import difflib
import re
# 你用的camembert-base tokenizer
tokenizer = AutoTokenizer.from_pretrained("C:/Users/jguo/Desktop/PURE-main/camembert-base")

def normalize(text):
    """小写去变音去空格，方便宽松匹配。"""
    return unidecode(text).lower().replace(" ", "").replace("’", "").replace("'", "")

def find_token_span(tokens, text, entity, entity_type=None):
    if entity_type == "date":
        mois_fr = ["janvier", "fevrier", "mars", "avril", "mai", "juin", "juillet",
                   "aout", "septembre", "octobre", "novembre", "decembre"]
        mois_fr_accent = ["janvier", "février", "mars", "avril", "mai", "juin", "juillet",
                          "août", "septembre", "octobre", "novembre", "décembre"]
        # 兼容 YYYY-MM-DD / YYYY-MM / YYYY
        m_full = re.fullmatch(r"(\d{4})-(\d{2})-(\d{2})", entity)
        m_month = re.fullmatch(r"(\d{4})-(\d{2})", entity)
        m_year = re.fullmatch(r"(\d{4})", entity)
        year, month_idx, day = None, None, None

        if m_full:
            year = m_full.group(1)
            month_idx = int(m_full.group(2)) - 1
            day = int(m_full.group(3))
        elif m_month:
            year = m_month.group(1)
            month_idx = int(m_month.group(2)) - 1
        elif m_year:
            year = m_year.group(1)

        txt_norm = unidecode(text).lower()
        candidates = []

        # A. 带“1er”/数字/无日的月-年组合
        if year and month_idx is not None:
            for mois in [mois_fr[month_idx], mois_fr_accent[month_idx]]:
                # 1. 精确日匹配 ("3 fevrier 1877"、"03 fevrier 1877"、"3 février 1877"、"1er fevrier 1877")
                if day:
                    # 支持1er和数字前缀
                    regexs = [
                        rf"\b{day}\s+{mois}\s+{year}\b",
                        rf"\b{str(day).zfill(2)}\s+{mois}\s+{year}\b",
                        rf"\b1er\s+{mois}\s+{year}\b" if day == 1 else "",
                    ]
                    for rgx in regexs:
                        if not rgx: continue
                        mobj = re.search(rgx, txt_norm)
                        if mobj:
                            candidates.append((mobj.start(), mobj.end()))
                # 2. 宽松月-年（如 "fevrier 1877"）
                rgx = rf"\b{mois}\s+{year}\b"
                mobj = re.search(rgx, txt_norm)
                if mobj:
                    candidates.append((mobj.start(), mobj.end()))
        # B. 仅年份
        elif year:
            mobj = re.search(rf"\b{year}\b", txt_norm)
            if mobj:
                candidates.append((mobj.start(), mobj.end()))

        # 按最先出现位置返回
        if candidates:
            candidates.sort()
            char_start, char_end = candidates[0]
            return char_to_token_span(tokens, unidecode(text), char_start, char_end)

        # fallback: ISO日期或直接字符串
        if entity in text:
            char_start = text.index(entity)
            return char_to_token_span(tokens, text, char_start, char_start + len(entity))

        print(f"[WARN] 日期实体 {entity} 无法在文本 [{text}] 匹配")
        return None, None

    # ...后续你的原始代码不变...

    # 2. 先尝试原文直接查找
    idx_raw = text.find(entity)
    if idx_raw != -1:
        char_start = idx_raw
        char_end = idx_raw + len(entity)
        return char_to_token_span(tokens, text, char_start, char_end)

    # 3. 宽松归一化后匹配
    text_norm = normalize(text)
    entity_norm = normalize(entity)
    idx_norm = text_norm.find(entity_norm)
    if idx_norm != -1:
        # 滑窗法在原文text上找归一化后片段
        best_start, best_end = None, None
        for start in range(len(text)):
            for end in range(start + 1, min(len(text), start + len(entity) + 8) + 1):
                frag = text[start:end]
                if normalize(frag) == entity_norm:
                    best_start, best_end = start, end
                    break
            if best_start is not None:
                break
        if best_start is not None and best_end is not None:
            return char_to_token_span(tokens, text, best_start, best_end)
        else:
            print(f"[WARN] 归一化后未能匹配到实体 '{entity}' in 原文 '{text}'")
            # 进入模糊滑窗

    # 4. Fuzzy模糊滑窗查找（归一化后字符距离相似度>0.8的片段）
    max_ratio = 0
    best_start, best_end = None, None
    for start in range(len(text)):
        for end in range(start + max(2, len(entity) - 4), min(len(text), start + len(entity) + 8)):
            frag = text[start:end]
            frag_norm = normalize(frag)
            if len(frag_norm) < 3:  # 过滤掉无意义的小片段
                continue
            ratio = difflib.SequenceMatcher(None, frag_norm, entity_norm).ratio()
            if ratio > 0.82 and ratio > max_ratio:
                max_ratio = ratio
                best_start, best_end = start, end
    if best_start is not None and best_end is not None:
        print(f"[INFO] 模糊匹配 '{entity}' ≈ '{text[best_start:best_end]}'，相似度={max_ratio:.2f}")
        return char_to_token_span(tokens, text, best_start, best_end)

    # 5. fallback: 事件类型直接查找
    if entity in text:
        char_start = text.index(entity)
        return char_to_token_span(tokens, text, char_start, char_start + len(entity))

    print(f"Entity [{entity}] not found in [{text}]")
    return None, None


def char_to_token_span(tokens, text, char_start, char_end):
    """把字符区间转为token区间"""
    curr_char = 0
    start_token_idx = None
    end_token_idx = None
    for idx, token in enumerate(tokens):
        # CamemBERT token恢复方式
        token_str = token.replace("▁", " ")
        token_str = token_str.strip()
        # 在text中查找token的位置
        while curr_char < len(text) and text[curr_char].isspace():
            curr_char += 1
        token_begin = curr_char
        token_end = curr_char + len(token_str)
        if start_token_idx is None and token_begin <= char_start < token_end:
            start_token_idx = idx
        if token_begin < char_end <= token_end:
            end_token_idx = idx
        curr_char = token_end
    if start_token_idx is not None and end_token_idx is not None:
        return start_token_idx, end_token_idx
    return None, None

def tag_event_type(sent):
    # || Ouverture || / || Classement || / || Dénomination || / ...
    for e in ["Ouverture", "Classement", "Dénomination", "Numérotation", "Historique"]:
        pattern = f"|| {e} ||"
        idx = sent.find(pattern)
        if idx != -1:
            return e, sent.split(pattern)[0], idx, idx+len(pattern)
    return None, None, -1, -1

def extract_time(sent):
    # 简单匹配日期
    import re
    m = re.search(r"\d{1,2} [a-zéû]+ \d{4}|\d{4}-\d{2}-\d{2}", sent)
    if m:
        return m.group(), m.start(), m.end()
    return None, -1, -1

def build_ner(sent, tokens):
    ner = []
    # 地点实体（||前面的）：LandmarkType
    event, before_event, event_start, event_end = tag_event_type(sent)
    
    if before_event is not None and before_event.strip():
        loc_tokens = tokenizer.tokenize(before_event.strip())
        ner.append([0, len(loc_tokens)-1, "LandmarkType"])

    # 事件类型：EventType
    if event and event_start > 0:
        event_tokens = tokenizer.tokenize(event, )
        # 找到event在分词中的位置
        event_idx = sent.split().index(event)
        ner.append([event_idx, event_idx+len(event_tokens)-1, "EventType"])
    # 时间：Time
    time_str, t_start, t_end = extract_time(sent)
    if time_str:
        # 找时间在token中的下标
        sent_before_time = sent[:t_start]
        before_tokens = tokenizer.tokenize(sent_before_time, )
        time_tokens = tokenizer.tokenize(time_str, )
        ner.append([len(before_tokens), len(before_tokens)+len(time_tokens)-1, "Time"])
    return ner

def looks_like_date(s):
    """
    判断字符串是否为日期（兼容法语和ISO格式）
    """
    s = s.strip()
    # ISO日期
    if re.fullmatch(r"\d{4}-\d{2}-\d{2}", s):
        return True
    # 法语日期：13 février 1911
    if re.fullmatch(r"\d{1,2} [a-zéû]+ \d{4}", s.lower()):
        return True
    # 法语仅年份
    if re.fullmatch(r"\d{4}", s):
        return True
    # 其他可扩展
    return False

def build_relations(sent, tokens, ner, triples):
    relations = []
    for triple in triples:
        sub_span = None
        obj_span = None
        sub_type = "landmark"
        # 新日期判断
        obj_type = "date" if looks_like_date(triple["obj"]) else "landmark"
        for i, (ent, label, ent_type) in enumerate([
            (triple["sub"], "LandmarkType", sub_type),
            (triple["obj"], "Time", obj_type)
        ]):
            span = find_token_span(tokens, sent, ent, entity_type=ent_type)
            if span and span[0] is not None:
                if i == 0:
                    sub_span = span
                else:
                    obj_span = span
        if sub_span is not None and obj_span is not None:
            relations.append([sub_span[0], sub_span[1], obj_span[0], obj_span[1], triple["rel"]])
    return relations


def process_one_line(d):
    sent = d["sent"]
    doc_key = d["id"]
    tokens = tokenizer.tokenize(sent, )
    ner = build_ner(sent, tokens)
    relations = build_relations(sent, tokens, ner, d.get("triples", []))
    return {
        "doc_key": doc_key,
        "dataset": "évolutions d'événements",
        "sentences": [tokens],
        "ner": [ner],
        "relations": [relations]
    }
input_file = "C:/Users/jguo/Desktop/PURE-main/data/train_filtered.jsonl"
output_file = "C:/Users/jguo/Desktop/PURE-main/data/train_pure.jsonl"

# 批量处理你的数据
with open(input_file, "r", encoding="utf-8") as f:
    dataset = [json.loads(line) for line in f]
with open(output_file, "w", encoding="utf-8") as fout:
    for data in dataset:
        ex = process_one_line(data)
        fout.write(json.dumps(ex, ensure_ascii=False) + "\n")


Entity [rue des Rondonneaux] not found in [rue du cange || Historique || Précédemment, rue des Trois Soeurs]
Entity [voie U/20] not found in [rue fernand raynaud || Historique || Elle avait été provisoirement dénommée U/20]
Entity [C13] not found in [rue du fouarre || Ouverture || Ouverte au commencement du XIIIe siècle]
Entity [voie DE/20] not found in [rue francis picabia || Historique || Elle avait été provisoirement dénommée DE/20]
Entity [voie AE/15] not found in [rue gaston de caillavet || Historique || Elle avait été provisoirement dénommée AE/15]
Entity [voie Q/10] not found in [rue georg friedrich haendel || Historique || Elle avait été provisoirement dénommée Q/10]
Entity [port de l'Hôtel de Ville] not found in [rue goethe || Historique || Précédemment, rue de Cadix]
Entity [port de l'Hôtel de Ville] not found in [rue goethe || Historique || Précédemment, rue de Cadix]
Entity [Impasse de Constantine] not found in [villa de guelma || Historique || Précédemment, impasse de Guel

In [None]:
import json
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("C:/Users/jguo/Desktop/PURE-main/camembert-base")

def load_dataset(path):
    with open(path, 'r', encoding='utf-8') as f:
        return [json.loads(line) for line in f]

def get_span_with_tokenizer(sentence, phrase, tokens, offsets):
    """
    找 phrase 在 token list 中的 span （start_idx, end_idx）
    """
    phrase = phrase.strip().lower()
    for i, (start1, end1) in enumerate(offsets):
        for j in range(i, len(offsets)):
            start2, end2 = offsets[j]
            span_text = sentence[start1:end2].lower().strip()
            if phrase == span_text:
                return i, j
    return None

def convert_entry(entry):
    doc_key = entry["id"]
    sentence = entry["sent"]
    encoding = tokenizer(sentence, return_offsets_mapping=True, add_special_tokens=False)
    tokens = tokenizer.convert_ids_to_tokens(encoding["input_ids"])
    offsets = encoding["offset_mapping"]

    ner = set()
    relations = []

    for triple in entry["triples"]:
        if isinstance(triple, dict):
            sub, rel, obj = triple["sub"], triple["rel"], triple["obj"]
        else:
            sub, rel, obj = triple

        sub_span = get_span_with_tokenizer(sentence, sub, tokens, offsets)
        obj_span = get_span_with_tokenizer(sentence, obj, tokens, offsets)

        if sub_span:
            ner.add((sub_span[0], sub_span[1], "LANDMARK"))
        if obj_span:
            ner.add((obj_span[0], obj_span[1], "LANDMARK"))
        if sub_span and obj_span:
            relations.append([sub_span[0], sub_span[1], obj_span[0], obj_span[1], rel])

    return {
        "doc_key": doc_key,
        "dataset": "voies_paris",
        "sentences": [tokens],
        "ner": list(ner),
        "relations": relations
    }

def save_to_jsonl(data, out_path):
    with open(out_path, "w", encoding="utf-8") as f:
        for item in data:
            f.write(json.dumps(item, ensure_ascii=False) + "/n")

if __name__ == "__main__":
    input_path = "C:/Users/jguo/Desktop/PURE-main/data/train_filtered.jsonl"  # 你原始数据的路径
    output_path = "C:/Users/jguo/Desktop/PURE-main/data/train_pure.jsonl"  # 输出 PURE 格式路径

    data = load_dataset(input_path)
    pure_data = []

    for i, entry in enumerate(data):
        print(f"🔄 正在处理第 {i+1} 条 / {len(data)}：{entry['id']}")
        pure_data.append(convert_entry(entry))

    save_to_jsonl(pure_data, output_path)

    print(f"✅ CamemBERT 分词 + PURE 格式转换完成！共 {len(pure_data)} 条")


🔄 正在处理第 1 条 / 97：10063_historique_1
🔄 正在处理第 2 条 / 97：10328_dénomination
🔄 正在处理第 3 条 / 97：10343_historique_5
🔄 正在处理第 4 条 / 97：10474_historique_2
🔄 正在处理第 5 条 / 97：10474_ouverture
🔄 正在处理第 6 条 / 97：10602_ouverture
🔄 正在处理第 7 条 / 97：10605_historique
🔄 正在处理第 8 条 / 97：10908_dénomination
🔄 正在处理第 9 条 / 97：11056_dénomination
🔄 正在处理第 10 条 / 97：11076_historique_1
🔄 正在处理第 11 条 / 97：11141_dénomination
🔄 正在处理第 12 条 / 97：11150_dénomination
🔄 正在处理第 13 条 / 97：11179_classement
🔄 正在处理第 14 条 / 97：11179_historique
🔄 正在处理第 15 条 / 97：11250_classement
🔄 正在处理第 16 条 / 97：11270_ouverture
🔄 正在处理第 17 条 / 97：11307_ouverture
🔄 正在处理第 18 条 / 97：11592_dénomination
🔄 正在处理第 19 条 / 97：11592_historique_3
🔄 正在处理第 20 条 / 97：11598_numérotation
🔄 正在处理第 21 条 / 97：11800_dénomination
🔄 正在处理第 22 条 / 97：11800_historique
🔄 正在处理第 23 条 / 97：11905_historique_2
🔄 正在处理第 24 条 / 97：12075_dénomination_1
🔄 正在处理第 25 条 / 97：12101_historique
🔄 正在处理第 26 条 / 97：12122_ouverture
🔄 正在处理第 27 条 / 97：12223_numérotation_1
🔄 正在处理第 28 条 / 97：12374_historiqu