# 40 - Coreference detection and entities creation : example of the addresses

In [1]:
import pandas as pd
import rdflib
from rdflib import Graph, URIRef, Literal, RDFS, Namespace, BNode
from rdflib.namespace import SKOS, RDF, RDFS, DCTERMS, XSD
import glob
import json
import time
import datetime
import re

In [2]:
import sys
import os
# Access to the utils directory
current_dir = os.getcwd()
utils_dir = os.path.join(current_dir, '..', 'utils')
sys.path.append(utils_dir)

In [3]:
from linking_utils import PrepareQueriesForEL
from string_utils import NormalizeText

  from .autonotebook import tqdm as notebook_tqdm
[nltk_data] Downloading package punkt to /home/STual/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/STual/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


### Retrieve the files and values

In [4]:
ROOT = "/home/STual/DAN-cadastre/"
FOLDER = "LHAY"
DEP = "94"
SAVE_FOLDER = f"/home/STual/DAN-cadastre/data/{FOLDER}"
JSONS = glob.glob(f'{ROOT}inference/{FOLDER}/*.json') #WHere JSONS produced in DAN format output are saved

In [5]:
plotaddresses = PrepareQueriesForEL.retrieve_mentions(JSONS, ROOT + 'inference/LHAY/', 'Ⓓ', False)
print(f"Number of documents : {len(plotaddresses)}")

Number of documents : 26


In [6]:
distinct_plotaddresses = PrepareQueriesForEL.distinct_mentions_without_ne(plotaddresses)
print(f"Number of distinct plotaddresses mentions : {len(distinct_plotaddresses)}")

for i in range(len(distinct_plotaddresses)):
    if distinct_plotaddresses[i] is None:
        distinct_plotaddresses[i] = "MISSING"
    else:
        distinct_plotaddresses[i] = distinct_plotaddresses[i]
distinct_plotaddresses[0:3]

Number of distinct plotaddresses mentions : 24


['la plaine', 'd', 'La plaine']

## 1. Data normalization

In [7]:
plotaddresses_mentions = []
remove_chars_regex = '→()↑↓×±.,!?;:-@#$%^&*'
replacement_char = ''

for d in distinct_plotaddresses:
    new_json = {}
    if len(d) > 0:
        new_json["address"] = NormalizeText.remove_accents(NormalizeText.replace_characters(NormalizeText.replace_characters(d.lower(), '→', ' '),remove_chars_regex,replacement_char))
    else:
        new_json["address"] = ""
    if new_json["address"] != "missing" or new_json["address"] != "ø":
        plotaddresses_mentions.append(new_json)
plotaddresses_mentions

[{'address': 'la plaine'},
 {'address': 'd'},
 {'address': 'la plaine'},
 {'address': 'lavoie des'},
 {'address': 'de lhay'},
 {'address': 'foptaine'},
 {'address': 'lapplaine'},
 {'address': 'les'},
 {'address': 'ø'},
 {'address': 'laplaine'},
 {'address': 'la plane'},
 {'address': 'la voie des'},
 {'address': 'la voie'},
 {'address': 'des postey'},
 {'address': 'laptaine'},
 {'address': 'de 2thay'},
 {'address': 'laptain de '},
 {'address': 'la voie des'},
 {'address': 'missing'},
 {'address': 'de phay'},
 {'address': 'voie des'},
 {'address': 'voue des postes'},
 {'address': 'les sablons'},
 {'address': 'la plane'}]

## 2. Create the clusters

In [8]:
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import Levenshtein as lev

# Load the embedding model
model = SentenceTransformer("all-MiniLM-L6-v2")

# Helper function to compute normalized Levenshtein distance
def normalized_levenshtein(str1, str2):
    """Returns a normalized Levenshtein distance (1 - distance)"""
    
    # Handle case where either string is empty
    if len(str1) == 0 and len(str2) == 0:
        return 1.0  # Both empty, consider them identical
    
    if len(str1) == 0 or len(str2) == 0:
        return 0.0  # One is empty, completely dissimilar

    # Normalized Levenshtein distance formula
    return 1 - lev.distance(str1, str2) / max(len(str1), len(str2))

# Helper function to cluster texts based on Levenshtein distance threshold
def cluster_by_levenshtein(texts, threshold):
    if len(texts) == 0:
        return []

    # Create a similarity matrix based on Levenshtein distance
    sim_matrix = np.zeros((len(texts), len(texts)))

    for i in range(len(texts)):
        for j in range(i + 1, len(texts)):
            sim = normalized_levenshtein(texts[i], texts[j])
            sim_matrix[i][j] = sim_matrix[j][i] = sim

    visited = set()
    groups = []

    for i in range(len(texts)):
        if i in visited:
            continue
        group = [i]
        visited.add(i)
        for j in range(i + 1, len(texts)):
            if j not in visited and sim_matrix[i][j] >= threshold:
                group.append(j)
                visited.add(j)
        groups.append(group)
    
    return groups

# Helper function to cluster texts based on cosine similarity threshold
def cluster_by_embeddings_similarity(texts, threshold):
    if len(texts) == 0:
        return []

    embeddings = model.encode(texts, convert_to_numpy=True)
    sim_matrix = cosine_similarity(embeddings)

    visited = set()
    groups = []

    for i in range(len(texts)):
        if i in visited:
            continue
        group = [i]
        visited.add(i)
        for j in range(i + 1, len(texts)):
            if j not in visited and sim_matrix[i][j] >= threshold:
                group.append(j)
                visited.add(j)
        groups.append(group)
    return groups

# Main function to group mentions
def group_mentions_addresses(
    mentions,
    threshold=0.85,
    mesure="normalizedlevenshtein"
):
    """
    mentions: list of dicts, each with 'ADDRESS'
    returns: list of list of indices, each list represents a group
    """
    indices = list(range(len(mentions)))
    values = [m['address'] for m in mentions]

    # Step 1: Group by NAME similarity
    if mesure == "embeddingcosinus":
        final_groups = cluster_by_embeddings_similarity(values, threshold)
    elif "normalizedlevenshtein":
        final_groups = cluster_by_levenshtein(values, threshold)

    return final_groups

In [9]:
groups = group_mentions_addresses(
    plotaddresses_mentions,
    threshold=0.80,
    mesure="normalizedlevenshtein"#embeddingcosinus normalizedlevenshtein
)

In [10]:
counter = 0
for li in groups:
    for i in li:
        counter += 1
counter

24

In [11]:
assert counter == len(distinct_plotaddresses)

In [12]:
from embedding_similarity import merge_lists_with_common_elements

fusion_unique_groups = merge_lists_with_common_elements(groups)
print(f"Number of distinct groups : {len(fusion_unique_groups)}. Number of mentions : {len(distinct_plotaddresses)}")

Number of distinct groups : 15. Number of mentions : 24


In [13]:
DISPLAY_GROUPS = True
if DISPLAY_GROUPS : 
    for l in fusion_unique_groups:
        print("####################")
        for i in l:
            print(distinct_plotaddresses[i])

####################
la plaine
La plaine
lapplaine
laplaine
la plane
La plane
####################
d
####################
Lavoie des
La voie des
La Voie des
Voie des
####################
de Lhay
de Phay
####################
foptaine
####################
Les
####################
Ø
####################
La Voie
####################
des postey
####################
laptaine
####################
de 2thay
####################
Laptain de→
####################
MISSING
####################
Voue des→Postes
####################
Les sablons


## 3. Create RDF Resource

In [14]:
import Levenshtein as lev
from collections import Counter
import uuid

# Helper function to calculate the representative name (minimal Levenshtein distance)
def get_representative_name(names):
    min_distance = float('inf')
    representative_name = None
    
    for name in names:
        avg_distance = sum(normalized_levenshtein(name, other_name) for other_name in names) / len(names)
        if avg_distance < min_distance:
            min_distance = avg_distance
            representative_name = name
    
    return representative_name

# Helper function to calculate the longest string
def get_longest_string(strings):
    return max(strings, key=len)

# Helper function to calculate the most appropriate family status
def get_representative_familystatus(familystatuses):
    # Filter out "id" and "idem" if they are not the only values
    filtered_statuses = [status for status in familystatuses if status.lower() not in ["id", "idem"]]
    
    if filtered_statuses:
        return get_longest_string(filtered_statuses)  # Choose the longest one among filtered values
    else:
        return get_longest_string(familystatuses)  # If "id" or "idem" are the only options, choose the longest one

# Function to create RDF entities based on the most representative values
def create_rdf_entities(groups, mentions):
    rdf_entities = []

    for group in groups:
        group_mentions = [mentions[i] for i in group]
        
        # Step 1: Choose the representative name
        names = [m['address'] for m in group_mentions]
        representative_name = get_representative_name(names)

        # Create RDF entity for this group
        rdf_entity = {
            'uuid': str(uuid.uuid4()),
            'address': representative_name.title(),
            'mentions': group  # Link this entity to the list of mentions in the group
        }

        rdf_entities.append(rdf_entity)
    
    return rdf_entities

In [15]:
rdf_entities = create_rdf_entities(fusion_unique_groups, plotaddresses_mentions)

In [16]:
len(rdf_entities)

15

In [17]:
rdf_entities[2]

{'uuid': '8faa03de-adbd-44e0-b936-eed5f0a916c9',
 'address': 'Voie Des',
 'mentions': [3, 11, 17, 20]}

In [18]:
def generate_rdf_resource(rdf_entities, distinct_mentions):
    """
    Create a simple RDF resource for each group with additional properties (address, activity, title).
    """
    g = Graph()
    ADDR = Namespace("http://rdf.geohistoricaldata.org/def/address#")
    CAD = Namespace("http://rdf.geohistoricaldata.org/def/cadastre#")
    LTYPE = Namespace("http://rdf.geohistoricaldata.org/id/codes/address/landmarkType/")
    LANDMARK = Namespace("http://rdf.geohistoricaldata.org/id/landmark/")
    CAD_LTYPE = Namespace("http://rdf.geohistoricaldata.org/id/codes/cadastre/landmarkType/")
    LRTYPE = Namespace("http://rdf.geohistoricaldata.org/id/codes/address/landmarkRelationType/")
    LR = Namespace("http://rdf.geohistoricaldata.org/id/landmarkRelation/")
    g.bind("addr", ADDR)
    g.bind("cad", CAD)
    g.bind("landmark", LANDMARK)
    g.bind("cad_ltype", CAD_LTYPE)
    g.bind("lrtype", LRTYPE)
    g.bind("landmarkRelation", LR)
    uris_dict = {}
    
    for rdf_entity in rdf_entities:
        if rdf_entity['address'] != 'Ø':
            uri = URIRef(LANDMARK + rdf_entity['uuid'])
            g.add((uri, RDF.type, ADDR.Landmark))
            g.add((uri, ADDR.isLandmarkType, LTYPE.Undefined))

            #uri_lr = URIRef(BNode().n3())
            p1 = uri.replace("http://rdf.geohistoricaldata.org/id/landmark/",uri)
            uri_lr = URIRef(LR+p1 + '_' + DEP + '_' + FOLDER)
            g.add((uri_lr, RDF.type, ADDR.LandmarkRelation))
            g.add((uri_lr, ADDR.isLandmarkRelationType, LRTYPE.Within))
            g.add((uri_lr, ADDR.locatum, uri))
            g.add((uri_lr, ADDR.relatum, URIRef(LANDMARK + DEP + '_' + FOLDER)))
    
            mentions = rdf_entity["mentions"]
            for m in mentions:
                key = distinct_mentions[m]
                uris_dict[key] = str(uri)
        
            # Address name
            if len(rdf_entity['address']) > 0:
                label = rdf_entity['address']
            
            label = re.sub('→',' ',label)
            label = re.sub('↑',' ',label)
            label = re.sub('↓',' ',label)
            label = re.sub('  ',' ',label)
            label = re.sub('[ ]+$','',label)
            g.add((uri, RDFS.label, Literal(label)))

    return g, uris_dict

In [19]:
# Generate RDF resource
graph, uris_dict = generate_rdf_resource(rdf_entities, distinct_plotaddresses)

# Print the RDF graph in Turtle format
graph.serialize(destination=f"{SAVE_FOLDER}/rdf/lieu-dit.ttl", format="turtle")

<Graph identifier=Nb1426a20fa1740b0a1153ea3f35eac3b (<class 'rdflib.graph.Graph'>)>

## 4. Annotate the table

In [20]:
for JSON in JSONS:
    with open(JSON) as f:
        page = json.load(f)
    page_uuid = JSON.replace(ROOT + "inference/LHAY","").replace('.json','')

    for line in page["entities"]:
        if "Ⓓ" in list(line.keys()):
            if line["Ⓓ"]["interpreted_text"] != None:
                if line["Ⓓ"]["interpreted_text"] != 'Ø':
                    line["Ⓓ"]['uris'] = uris_dict[line["Ⓓ"]["interpreted_text"]]

    with open(JSON,'w', encoding='utf-8') as f:
        json.dump(page, f, ensure_ascii=False, indent=4)