In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Data Loading

---



In [40]:
data_pc = []
with open('/content/drive/MyDrive/Colab Notebooks/TFM/data/Full/prueba.short05.test.en', 'r', encoding='utf-8') as file:
    for line in file:
        data_pc.append(line)


In [41]:
data_pc_es = []
with open('/content/drive/MyDrive/Colab Notebooks/TFM/data/Full/prueba.short05.test.es', 'r', encoding='utf-8') as file:
    for line in file:
        data_pc_es.append(line)


In [5]:

wanted_labels = ['Place', 'Person', 'PopulatedPlace', 'Artwork', 'Work', 'Company', 'Infrastructure', 'Agent', 'City']
wanted_labels_non_traduction = ['PERSON', 'PRODUCT', 'WORK_OF_ART']

# Import Spacy Models + DBpedia Spotlight

---



In [None]:
!pip install spacy-transformers
!python -m spacy download en_core_web_trf
!pip install spacy-dbpedia-spotlight
!pip install https://huggingface.co/sdocio/es_spacy_ner_cds/resolve/main/es_spacy_ner_cds-any-py3-none-any.whl
print("The models has been downloaded")

In [7]:
import spacy
from spacy import displacy
from spacy.pipeline import merge_entities

# SparQL Queries to DBpedia

---



In [None]:
!pip install sparqlwrapper

In [9]:
from posixpath import split
from SPARQLWrapper import SPARQLWrapper, JSON

def translate_entity(entity_uri):
    endpoint_url = "https://dbpedia.org/sparql"
    sparql = SPARQLWrapper(endpoint_url)

    query = f"""
        PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX dbo: <http://dbpedia.org/ontology/>
        SELECT ?label WHERE {{
            <{entity_uri}> rdfs:label ?label .
            FILTER(LANG(?label) = "es")
        }}
    """

    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)

    try:
        results = sparql.query().convert()

        res = []
        if results["results"]["bindings"]:
          for i, r in enumerate(results["results"]["bindings"]):
            label = results["results"]["bindings"][i]["label"]["value"]
            res.append(label)
        return res

    except Exception as e:
        print(f"Error occurred during SPARQL query: {str(e)}")

    return None

def type_entity(entity_uri):
    endpoint_url = "https://dbpedia.org/sparql"
    sparql = SPARQLWrapper(endpoint_url)

    query = f"""
        PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX dbo: <http://dbpedia.org/ontology/>
        SELECT ?type WHERE {{
            <{entity_uri}> rdf:type ?type .
            FILTER(STRSTARTS(STR(?type), str(dbo:)))
        }}
    """

    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)

    try:
        results = sparql.query().convert()

        if results["results"]["bindings"]:
            entity_type_uri = results["results"]["bindings"][0]["type"]["value"]
            entity_type = entity_type_uri.split('/')
            return entity_type[-1]

    except Exception as e:
        print(f"Error occurred during SPARQL query: {str(e)}")

    return None

In [10]:
def es_dbpediaQuery(entity_uri):
    endpoint_url = "https://es.dbpedia.org/sparql"
    sparql = SPARQLWrapper(endpoint_url)

    query = f"""
        PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
        PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
        PREFIX dbo: <http://dbpedia.org/ontology/>
        SELECT ?type WHERE {{
            <{entity_uri}> rdf:type ?type .
            FILTER(STRSTARTS(STR(?type), str(dbo:)))
        }}
    """

    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)

    try:
        results = sparql.query().convert()
        if results["results"]["bindings"]:
            entity_type_uri = results["results"]["bindings"][0]["type"]["value"]
            entity_type = entity_type_uri.split('/')
            return entity_type[-1]

    except Exception as e:
        print(f"Error occurred during SPARQL query: {str(e)}")

    return None

# Create Pipeline for English NER

---



In [11]:
nlp = spacy.load('en_core_web_trf')
nlp.add_pipe('dbpedia_spotlight')

nlp_es= spacy.load("es_spacy_ner_cds")
nlp_es.add_pipe('sentencizer')



<spacy.pipeline.sentencizer.Sentencizer at 0x7b4a59a47300>

#NER with DBpedia and Substitution

In [12]:
threshold = 0.4
def ner_english(sentence):
  doc = nlp(sentence)
  ent_find = []
  for ent in doc.ents:
    db_label = translate_entity(ent.kb_id_)
    best_similarity = 0
    translation = ''
    ent_type = type_entity(ent.kb_id_)
    if ent_type != None and ent_type in wanted_labels:
        if db_label != None:
          for l in db_label:
            similarity = calculate_similarity(ent.text, l)
            if similarity > best_similarity and similarity > threshold:
              translation = l
              best_similarity = similarity
          if translation != '':
            ent_find.append(translation)
          else:
            ent_find.append(ent.text)
        else:
          ent_find.append(ent.text)
  return ent_find


In [13]:
def calculate_similarity(entity1, entity2):
    temp1 = nlp_es(entity1)
    temp2 = nlp_es(entity2)
    similarity = temp1.similarity(temp2)
    return similarity

In [14]:
def calculate_entity_best_similarity(ent_or, en_ent):
  threshold = 0.3
  best_match = None
  best_similarity = 0.0
  for e, label in ent_or:
    if label == 'B':
      similarity = calculate_similarity(en_ent, e)
      if similarity > best_similarity and similarity >= threshold:
        best_match = e
        best_similarity = similarity
  return best_match

In [None]:
threshold = 0.3
best_matches = {}
with open('/content/drive/MyDrive/Colab Notebooks/TFM/data/pruebaEjemplos.es', 'w', encoding='utf-8') as file:
  for index, es_sentence in enumerate(data_pc_es):
      doc_es = nlp_es(es_sentence)

      en_sentence = data_pc[index]
      en_entity= ner_english(en_sentence)

      entidades_originales = [(token.text, token.ent_iob_) for token in merge_entities(doc_es)]
      contador_B = sum(1 for _, label in entidades_originales if label == 'B')
      print(f"Entidades detectadas en español --> {entidades_originales}")
      print(f"Entidades detectadas en inglés (traducidas)--> {en_entity}")
      oracion = es_sentence
      it = 0
      for ent_original, label in entidades_originales:
        if label == 'B':
            best_match = None
            best_similarity = 0.0
            if contador_B == 1 and len(en_entity) == 1:
                new_oracion = oracion.replace(ent_original, en_entity[0])
                oracion = new_oracion

            elif len(en_entity) == 1:
              ent_rep = calculate_entity_best_similarity(entidades_originales, en_entity[0])
              if ent_rep != None:
                new_oracion = oracion.replace(ent_rep, en_entity[0])
                oracion = new_oracion

            else:
              for entity in en_entity:
                ent_rep = calculate_entity_best_similarity(entidades_originales, entity)
                if ent_rep != None:
                  best_match = entity
                  best_matches[ent_rep] = best_match

              if ent_original in best_matches:
                oracion = oracion.replace(ent_original, best_matches[ent_original])
      print(best_matches)
      best_matches = {}
      print(f"Oracion inglés --> {en_sentence}")
      print(f"Oracion español --> {es_sentence}")
      print(f"Oración final --> {oracion}")
      file.writelines(oracion)