In [None]:
import json
from collections import Counter

from SPARQLWrapper import SPARQLWrapper, JSON
import time
import re

from tqdm import tqdm

import torch
from transformers import AutoTokenizer, AutoModel
import numpy as np
from langdetect import detect

# import faiss
from tenacity import retry, wait_random_exponential, before_sleep_log

In [None]:
ONTOLOGY_MAPPINGS_DIR = "../utils/ontology_mappings/"

## Collecting relation names and constraints

### Collecting labels and data types of relations

In [None]:
PROP_2_LABEL = {}
PROP_2_DATA_TYPE = {}

sparql = SPARQLWrapper("https://query.wikidata.org/sparql")

# SPARQL query for properties with data types: Item, Quantity, Point in time
query = """
SELECT ?property ?propertyLabel ?typeLabel WHERE {
  ?property a wikibase:Property .
  ?property wikibase:propertyType ?type .
  
  VALUES ?type { wikibase:WikibaseItem wikibase:Quantity wikibase:Time }
  
  BIND(
    IF(?type = wikibase:WikibaseItem, "Item",
      IF(?type = wikibase:Quantity, "Quantity",
        IF(?type = wikibase:Time, "Point in time", "Unknown")
      )
    ) AS ?typeLabel
  )
  
  SERVICE wikibase:label { bd:serviceParam wikibase:language "en". }
}
"""

sparql.setQuery(query)
sparql.setReturnFormat(JSON)

try:
    results = sparql.query().convert()

    for result in results["results"]["bindings"]:
        prop = result["property"]["value"].split("/")[-1]
        label = result.get("propertyLabel", {}).get("value", "No label")
        data_type = result.get("typeLabel", {}).get("value", "Unknown")

        PROP_2_LABEL[prop] = label
        PROP_2_DATA_TYPE[prop] = data_type        

except Exception as e:
    print(f"Error executing SPARQL query: {e}")

In [None]:
len(PROP_2_LABEL), len(PROP_2_DATA_TYPE)

In [None]:
set(PROP_2_DATA_TYPE.values())

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+"prop2data_type.json", 'w') as f:
    json.dump(PROP_2_DATA_TYPE, f)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+"prop2label.json", 'w') as f:
    json.dump(PROP_2_LABEL, f)

### Collecting relation aliases

In [None]:
len(set(PROP_2_LABEL.keys())), len(set(PROP_2_LABEL.values()))

In [None]:
@retry(wait=wait_random_exponential(multiplier=1, max=60))
def get_property_aliases(property_id):
    sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
    
    query = f"""
    SELECT ?alias WHERE {{
      wd:{property_id} skos:altLabel ?alias .
      FILTER (lang(?alias) = "en")
    }}
    """
    
    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)
    results = sparql.query().convert()
    
    aliases = [result["alias"]["value"] for result in results["results"]["bindings"]]
    return aliases

PROP2ALIASES = {}

for property_id in tqdm(PROP_2_LABEL.keys()):
    PROP2ALIASES[property_id] = get_property_aliases(property_id)

In [None]:
alias_set = set()
alias_list = []
for prop, aliases in PROP2ALIASES.items():
    alias_set.update(aliases)
    alias_list.extend(aliases)
print(len(alias_set), len(alias_list))

In [None]:
for prop in PROP_2_LABEL:
    print(PROP_2_LABEL[prop], PROP2ALIASES[prop])

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+"prop2label.json", 'w') as f:
    json.dump(PROP_2_LABEL, f)

with open(ONTOLOGY_MAPPINGS_DIR+"prop2aliases.json", 'w') as f:
    json.dump(PROP2ALIASES, f)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+"prop2label.json", 'r') as f:
    PROP_2_LABEL = json.load(f)

with open(ONTOLOGY_MAPPINGS_DIR+"prop2aliases.json", 'r') as f:
    PROP2ALIASES = json.load(f)

### Collecting subject and value constraints of relations

In [None]:
from SPARQLWrapper import SPARQLWrapper, JSON

@retry(wait=wait_random_exponential(multiplier=1, max=60))
def get_constraints(property_id):
    """Retrieve value-type and subject-type constraints for a specified Wikidata property."""
    
    sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
    
    query = f"""
    SELECT ?constraintType ?entity ?entityLabel WHERE {{
      VALUES ?property {{ wd:{property_id} }}  

      ?property p:P2302 ?statement.  # Property constraints
      ?statement ps:P2302 ?constraintEntity.  # Constraint type

      VALUES ?constraintEntity {{ wd:Q21510865 wd:Q21503250 }}  # Value-type & Subject-type constraints

      ?statement pq:P2308 ?entity.  # The constrained entity type (allowed type)

      BIND(
        IF(?constraintEntity = wd:Q21510865, "Value-type constraint", "Subject type constraint")
        AS ?constraintType
      )
    }}
    """
    # SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
    
    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)
    results = sparql.query().convert()

    constraints = {"Value-type constraint": [], "Subject type constraint": []}
    for result in results["results"]["bindings"]:
        constraints[result["constraintType"]["value"]].append(result["entity"]["value"].split("/")[-1])

    return constraints

# Example usage:
property_id = "P40"  # Replace with any Wikidata property ID
constraints = get_constraints(property_id)

print(constraints)

In [None]:
constraint_dict = {}

for prop in tqdm(PROP_2_LABEL.keys()):
    constraint_dict[prop] = get_constraints(prop)
    time.sleep(0.1)
len(constraint_dict)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+"prop2data_type.json", 'r') as f:
    PROP_2_DATA_TYPE = json.load(f)

In [None]:
PROP_2_DATA_TYPE

In [None]:
constraint_dict['P2294']

In [None]:
wo_constraint = []
for prop in constraint_dict:
    if len(constraint_dict[prop]["Value-type constraint"]) == 0 and len(constraint_dict[prop]["Subject type constraint"]) == 0:
            wo_constraint.append(prop)
len(wo_constraint)

In [None]:
quantity_props = []
time_props = []
other_props = []
for prop in wo_constraint:
    if PROP_2_DATA_TYPE[prop] == "Quantity": 
        quantity_props.append(prop)
    elif PROP_2_DATA_TYPE[prop] == "Point in time": 
        time_props.append(prop)
    else:
        other_props.append(prop)
        
        # print(PROP_2_LABEL[prop])
len(time_props), len(quantity_props), len(other_props)

In [None]:
sum((28, 295, 258))

In [None]:
for prop in constraint_dict:
    if PROP_2_DATA_TYPE[prop] == "Point in time":
        constraint_dict[prop]["Value-type constraint"].append('Q186408')

    elif PROP_2_DATA_TYPE[prop] == 'Quantity':
        constraint_dict[prop]["Value-type constraint"].append('Q309314')

In [None]:
wo_constraint = []
for prop in constraint_dict:
    if len(constraint_dict[prop]["Value-type constraint"]) == 0 and len(constraint_dict[prop]["Subject type constraint"]) == 0:
            wo_constraint.append(prop)
len(wo_constraint)

In [None]:
wo_constraint = []
for prop in constraint_dict:
    if len(constraint_dict[prop]["Value-type constraint"]) == 0:
            constraint_dict[prop]["Value-type constraint"] = ['ANY']
    if len(constraint_dict[prop]["Subject type constraint"]) == 0:
        constraint_dict[prop]["Subject type constraint"] = ['ANY']
len(wo_constraint)

In [None]:
wo_constraint = []
for prop in constraint_dict:
    if len(constraint_dict[prop]["Value-type constraint"]) == 0 and len(constraint_dict[prop]["Subject type constraint"]) == 0:
            wo_constraint.append(prop)
len(wo_constraint)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+'prop2constraints.json', 'w') as f:
    json.dump(constraint_dict, f)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+'prop2constraints.json', 'r') as f:
    constraint_dict = json.load(f)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+"prop2data_type.json", 'r') as f:
    PROP_2_DATA_TYPE = json.load(f)

## Colecting entities' metadata

In [None]:
entities = set()
for prop, constraint in constraint_dict.items():

    for const_type in constraint:
        for entity in constraint[const_type]:
            entities.add(entity)
entities = list(entities)

In [None]:
len(entities)

### Collecting entities' hierarchy of superclasses

In [None]:
@retry(wait=wait_random_exponential(multiplier=1, max=60))
def get_subclass_hierarchy(entity_id):
      sparql = SPARQLWrapper("https://query.wikidata.org/sparql")

      # SPARQL query to get all subclasses (direct and indirect) of the given entity
      query = f"""
      SELECT DISTINCT ?subclass ?subclassLabel WHERE {{
          {{
              wd:{entity_id} wdt:P31/wdt:P279* ?subclass.
          }}
            UNION
          {{
              wd:{entity_id} wdt:P279* ?subclass.
          }}
      }}
      """
    # SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }}

      sparql.setQuery(query)
      sparql.setReturnFormat(JSON)

      results = sparql.query().convert()

      subclass_hierarchy = []

      for result in results["results"]["bindings"]:
          subclass_id = result["subclass"]["value"].split("/")[-1]
          subclass_hierarchy.append(subclass_id)

      return subclass_hierarchy

ENTITY_2_HIERARCHY = {}
for entity_id in tqdm(entities):
    hierarchy = get_subclass_hierarchy(entity_id)
    ENTITY_2_HIERARCHY[entity_id] = hierarchy

len(ENTITY_2_HIERARCHY)

In [None]:
ents = []
for item in ENTITY_2_HIERARCHY.values():
    ents.extend(item)
len(set(ents)), len(entities)

In [None]:
# leaving only entity types that are used in constraints
for entity in tqdm(ENTITY_2_HIERARCHY):
    filtered_super_entities = [item for item in ENTITY_2_HIERARCHY[entity] if item in entities]
    ENTITY_2_HIERARCHY[entity] = filtered_super_entities

In [None]:
ents = []
for item in ENTITY_2_HIERARCHY.values():
    ents.extend(item)
len(set(ents)), len(entities)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR + 'entity_hierarchy.json', 'w') as f:
    json.dump(ENTITY_2_HIERARCHY, f)

### Collecting entity types' labels

In [None]:
BATCH_SIZE = 50

@retry(wait=wait_random_exponential(multiplier=1, max=60))
def fetch_labels(batch):
    entity_values = " ".join(f"wd:{entity}" for entity in batch)
    
    query = f"""
    SELECT ?entity ?entityLabel WHERE {{
      VALUES ?entity {{ {entity_values} }}
      SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
    }}
    """
    
    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)
    
    try:
        results = sparql.query().convert()
        return {
            result["entity"]["value"].split("/")[-1]: result.get("entityLabel", {}).get("value", "No label")
            for result in results["results"]["bindings"]
        }
    except Exception as e:
        print(f"Error with batch {batch[:5]}...: {e}")
        return {}

ENTITY_2_LABEL = {}

for i in range(0, len(entities), BATCH_SIZE):
    batch = entities[i:i + BATCH_SIZE]
    print(f"Processing batch {i // BATCH_SIZE + 1}/{(len(entities) // BATCH_SIZE) + 1}")
    
    labels = fetch_labels(batch)
    ENTITY_2_LABEL.update(labels)
    
# for entity, label in all_labels.items():
#     print(f"{entity}: {label}")

In [None]:
len(ENTITY_2_LABEL)

In [None]:
len(set(ENTITY_2_LABEL.keys())), len(set(ENTITY_2_LABEL.values()))

In [None]:
label2entity = {}
for entity, label in ENTITY_2_LABEL.items():
    if label not in label2entity:
        label2entity[label] = []
    label2entity[label].append(entity)

for label, entities in label2entity.items():
    if len(entities) > 1:
        print(label, entities)

### Collecting descriptions for entity types with duplicated labels

In [None]:
@retry(wait=wait_random_exponential(multiplier=1, max=60))
def get_entity_info(entity_id):
    sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
    
    query = f"""
    SELECT ?entityLabel ?entityDescription WHERE {{
      wd:{entity_id} rdfs:label ?entityLabel .
      wd:{entity_id} schema:description ?entityDescription .
      FILTER (lang(?entityLabel) = "en")
      FILTER (lang(?entityDescription) = "en")
    }}
    """
    
    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)
    results = sparql.query().convert()
    if results["results"]["bindings"]:
        result = results["results"]["bindings"][0]
        return {
            "label": result["entityLabel"]["value"],
            "description": result["entityDescription"]["value"]
        }
    else:
        return None

for label, entities in label2entity.items():
    if len(entities) > 1:
        for entity_id in entities:
            info = get_entity_info(entity_id)
            ENTITY_2_LABEL[entity_id] = info['label'] + " (" + info['description'] +")"

In [None]:
len(set(ENTITY_2_LABEL.keys())), len(set(ENTITY_2_LABEL.values()))

### Collecting entity types' aliases

In [None]:
@retry(wait=wait_random_exponential(multiplier=1, max=60))
def get_entity_aliases(entity_id):
    chinese_japanese_pattern = re.compile(r"[\u4E00-\u9FFF\u3400-\u4DBF\uF900-\uFAFF\u3040-\u309F\u30A0-\u30FF\u31F0-\u31FF\uFF00-\uFFEF]")
    sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
    
    query = f"""
    SELECT ?alias WHERE {{
      wd:{entity_id} skos:altLabel ?alias .
      FILTER (lang(?alias) = "en")
    }}
    """
    
    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)
    results = sparql.query().convert()

    aliases = []
    for result in results["results"]["bindings"]:

        alias = result["alias"]["value"]
        if not chinese_japanese_pattern.search(alias):
          aliases.append(alias)
        # except Exception as e:
        #    continue
    
    return aliases

ENTITY_2_ALIASES = {}
for entity_id in tqdm(ENTITY_2_LABEL.keys()):
    ENTITY_2_ALIASES[entity_id] = get_entity_aliases(entity_id)

In [None]:
# [(ENTITY_2_LABEL[ent], aliases) for ent, aliases in ENTITY_2_ALIASES.items()]

In [None]:
ENTITY_2_LABEL["Q186408"], get_entity_aliases("Q186408")

In [None]:
ENTITY_2_LABEL["Q309314"], get_entity_aliases("Q309314")

## Building inverse mapping - object/subject type constraint to relation

### Checking that relations with 'point in time' and 'quantity' data types don't have other constraints

In [None]:
# for prop, data_type in PROP_2_DATA_TYPE.items():
#     if data_type == "Quantity":    
#         val_constraints = [ENTITY_2_LABEL[ent] for ent in constraint_dict[prop]["Value-type constraint"]]
#         subj_constraints = [ENTITY_2_LABEL[ent] for ent in constraint_dict[prop]["Subject type constraint"]]            
#         # print(PROP_2_LABEL[prop], subj_constraints, val_constraints)
#         assert len(val_constraints) == 0
#         # no value constraints for data type quantity

In [None]:
# for prop, data_type in PROP_2_DATA_TYPE.items():
#     if data_type == "Point in time":    
#         val_constraints = [ENTITY_2_LABEL[ent] for ent in constraint_dict[prop]["Value-type constraint"]]
#         subj_constraints = [ENTITY_2_LABEL[ent] for ent in constraint_dict[prop]["Subject type constraint"]]            
#         # print(PROP_2_LABEL[prop], subj_constraints, val_constraints)
#         assert len(val_constraints) == 0
#         # no value constraints for data type quantity

In [None]:
constraint_dict

In [None]:
subj2prop_constraints = {"<ANY SUBJECT>": []}
# Q309314 - quantity, Q186408 -  point in time 
obj2prop_constraint = {"<ANY OBJECT>": [], "Q309314": [], 'Q186408': []}

for prop, constraint in constraint_dict.items():

    if PROP_2_DATA_TYPE[prop] == "Point in time":
        obj2prop_constraint['Q186408'].append(prop)
    
    elif PROP_2_DATA_TYPE[prop] == 'Quantity':
        obj2prop_constraint['Q309314'].append(prop)
    
    elif constraint["Value-type constraint"] == ['ANY']:
        obj2prop_constraint["<ANY OBJECT>"].append(prop)
    
    else:
        for entity in constraint["Value-type constraint"]:
            if entity not in obj2prop_constraint:
                obj2prop_constraint[entity] = []
            obj2prop_constraint[entity].append(prop)

    
    if constraint["Subject type constraint"] ==  ['ANY']:
        subj2prop_constraints["<ANY SUBJECT>"].append(prop)

    else:
        for entity in constraint["Subject type constraint"]:
            if entity not in subj2prop_constraints:
                subj2prop_constraints[entity] = []
            subj2prop_constraints[entity].append(prop)


len(subj2prop_constraints), len(obj2prop_constraint)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+"subj_constraint2prop.json", 'w') as f:
    json.dump(subj2prop_constraints, f)

with open(ONTOLOGY_MAPPINGS_DIR+"obj_constraint2prop.json", 'w') as f:
    json.dump(obj2prop_constraint, f)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+'entity_type2label.json', 'w') as f:
    json.dump(ENTITY_2_LABEL, f)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+'prop2aliases.json', 'w') as f:
    json.dump(PROP2ALIASES, f)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+'entity_type2aliases.json', 'w') as f:
    json.dump(ENTITY_2_ALIASES, f)

In [None]:
with open(ONTOLOGY_MAPPINGS_DIR+'entity_type2hierarchy.json', 'w') as f:
    json.dump(ENTITY_2_HIERARCHY, f)