In [1]:
from pymongo.mongo_client import MongoClient
import json

## Utils

In [2]:
mappings_dir = "utils/ontology_mappings/"

with open(mappings_dir+'prop2label.json', 'r') as f:
    PROP_2_LABEL = json.load(f)

with open(mappings_dir+'entity_type2label.json', 'r') as f:
    ENTITY_2_LABEL = json.load(f)


In [None]:
def get_mongo_client(mongo_uri):
    client = MongoClient(mongo_uri)
    return client

mongo_client = get_mongo_client("mongodb://localhost:27018/?directConnection=true")
db = mongo_client.get_database("wikidata_ontology")
db.list_collection_names()

['entity_types',
 'entity_type_aliases',
 'properties',
 'property_aliases',
 'triplets',
 'entity_aliases']

In [4]:
print(db.get_collection('entity_type_aliases').find_one().keys())
print(db.get_collection('entity_types').find_one().keys())
print(db.get_collection('properties').find_one().keys())
print(db.get_collection('property_aliases').find_one().keys())

dict_keys(['_id', 'alias_label', 'entity_type_id', 'alias_text_embedding'])
dict_keys(['_id', 'entity_type_id', 'label', 'parent_type_ids', 'valid_subject_property_ids', 'valid_object_property_ids'])
dict_keys(['_id', 'label', 'property_id', 'valid_subject_type_ids', 'valid_object_type_ids'])
dict_keys(['_id', 'alias_label', 'relation_id', 'alias_text_embedding'])


In [5]:
print(list(db.get_collection('entity_type_aliases').list_search_indexes()))
print(list(db.get_collection('property_aliases').list_search_indexes()))

[{'id': '682744721ff4d706002c97b0', 'name': 'entity_type_aliases', 'type': 'search', 'status': 'READY', 'queryable': True, 'latestVersion': 0, 'latestDefinition': {'mappings': {'dynamic': True, 'fields': {'alias_text_embedding': {'type': 'knnVector', 'dimensions': 768, 'similarity': 'cosine'}}}}}]
[{'id': '682744861ff4d706002c97b1', 'name': 'property_aliases_ids', 'type': 'search', 'status': 'READY', 'queryable': True, 'latestVersion': 0, 'latestDefinition': {'mappings': {'dynamic': True, 'fields': {'alias_text_embedding': {'type': 'knnVector', 'dimensions': 768, 'similarity': 'cosine'}, 'relation_id': {'type': 'token'}}}}}]


## Functions for structural search

In [6]:
from utils.structured_dynamic_index_utils import Aligner as Aligner

  from .autonotebook import tqdm as notebook_tqdm


## Triple extraction tests

In [103]:
from utils.openai_utils import LLMTripletExtractor
from utils.structured_dynamic_index_utils_with_db import Aligner as DBAligner

aligner = DBAligner(db)
model_name = 'gpt-4o'
extractor = LLMTripletExtractor(model=model_name)

In [104]:
aligner.retrieve_properties_labels_and_constraints(['P291', 'P189'])

{'P189': {'label': 'location of discovery',
  'valid_subject_type_ids': [],
  'valid_object_type_ids': ['Q3895768', 'Q17334923', 'Q27096213']},
 'P291': {'label': 'place of publication',
  'valid_subject_type_ids': ['Q11424',
   'Q386724',
   'Q2031291',
   'Q3331189',
   'Q3523102',
   'Q17489659'],
  'valid_object_type_ids': ['Q220505', 'Q15796005', 'Q27096213', 'Q27787439']}}

In [105]:
from unidecode import unidecode
import re

def extract_triplets(text, sample_id):
    """Extract and refine knowledge graph triplets from text using LLM.
    
    Args:
        text (str): Input text to extract triplets from
        
    Returns:
        tuple: (final_triplets, filtered_triplets) where:
            - final_triplets: List of validated and refined triplets
            - filtered_triplets: List of triplets that couldn't be validated
    """
    # Extract initial triplets using LLM
    extracted_triplets = extractor.extract_triplets_from_text(text)
    
    final_triplets = []
    filtered_triplets = []

    for triplet in extracted_triplets['triplets']:        # Get candidate entity types
        subj_type_ids, obj_type_ids = aligner.retrieve_similar_entity_types(triplet=triplet)
        
        # Get candidate properties/relations
        properties = aligner.retrieve_properties_for_entity_type(
            target_relation=triplet['relation'],
            object_types=obj_type_ids, 
            subject_types=subj_type_ids,
            k=5
        )

        prop_2_label_and_constraint = aligner.retrieve_properties_labels_and_constraints(property_id_list=[p[0] for p in properties])
        entity_type_id_2_label = aligner.retrieve_entity_type_labels(subj_type_ids + obj_type_ids)

        # Build candidate triplet backbones
        candidates = []
        

        for prop_id, prop_direction in properties:
            valid_subject_type_ids = prop_2_label_and_constraint[prop_id]['valid_subject_type_ids']
            valid_object_type_ids = prop_2_label_and_constraint[prop_id]['valid_object_type_ids']
            property_label = prop_2_label_and_constraint[prop_id]['label']

            if prop_direction == 'direct':
                subject_types = set(subj_type_ids) & set(valid_subject_type_ids)
                object_types = set(obj_type_ids) & set(valid_object_type_ids)
            else:
                subject_types = set(obj_type_ids) & set(valid_subject_type_ids)
                object_types = set(subj_type_ids) & set(valid_object_type_ids) 

            # Use original type sets if no constraints matched
            subject_types = subj_type_ids if len(subject_types) == 0 else subject_types
            object_types = obj_type_ids if len(object_types) == 0 else object_types

            candidates.append({
                "subject": triplet['subject'] if prop_direction == 'direct' else triplet['object'],
                "relation": property_label,
                'object': triplet['object'] if prop_direction == 'direct' else triplet['subject'],
                "subject_types": [entity_type_id_2_label[t]['label'] for t in subject_types],
                "object_types": [entity_type_id_2_label[t]['label'] for t in object_types]
            })


        # Refine relation and entity types using LLM - choose among valid backbones for triplet
        backbone_triplet = extractor.refine_relation_and_entity_types(
            text=text, 
            triplet=triplet,
            candidate_triplets=candidates,
        )
        backbone_triplet['qualifiers'] = triplet['qualifiers']

        # Refine entity names
        final_triplet = refine_entities(text, backbone_triplet, aligner, sample_id)
        
        final_triplets.append(final_triplet)
        # print("2nd resulted triplet: ", final_triplet)

    # print("-"*100)
    aligner.add_triplets(final_triplets, sample_id=sample_id)
    return final_triplets, filtered_triplets


def refine_entities(text, triplet, aligner, sample_id):
    """Refine entity names using type constraints."""

    triplet['subject'] = unidecode(triplet['subject'])
    triplet['object'] = unidecode(triplet['object'])

    ################################ Handle object refinement ################################
    obj_hierarchy = aligner.retrieve_entity_type_hirerarchy(triplet['object_type'])
    updated_obj = 'None'
    
    # do not change time or quantity entities
    if not any(t in ['Q186408', 'Q309314'] for t in obj_hierarchy):
        similar_objects = aligner.retrieve_entity_by_type(
            entity_name=triplet['object'],
            entity_type=triplet['object_type'],
            sample_id=sample_id
        )
        if len(similar_objects) > 0:
            if triplet['object'] in similar_objects:
                updated_obj = similar_objects[triplet['object']]['label']
            else:
                updated_obj = extractor.refine_entity(
                    text=text,
                    triplet=triplet,
                    candidates=list(similar_objects.values()),
                    is_object=True
                )
                updated_obj = unidecode(updated_obj)
    
    if re.sub(r'[^\w\s]', '', updated_obj) != 'None':
        if triplet['object'] != updated_obj:
            aligner.add_entity(entity_name=updated_obj, alias=triplet['object'], entity_type=triplet['object_type'], sample_id=sample_id)
        triplet['object'] = updated_obj
    else:
        aligner.add_entity(entity_name=triplet['object'], alias=triplet['object'], entity_type=triplet['object_type'], sample_id=sample_id)

    ################################# Handle subject refinement ################################
    updated_subj = 'None'
    similar_subjects = aligner.retrieve_entity_by_type(
        entity_name=triplet['subject'],
        entity_type=triplet['subject_type'],
        sample_id=triplet['object_type']
    )
    if len(similar_subjects) > 0:
        if triplet['subject'] in similar_subjects:
            updated_subj = similar_objects[triplet['subject']]['label']
        else:
            updated_subj = extractor.refine_entity(
                text=text,
                triplet=triplet,
                candidates=list(similar_subjects.values()),
                is_object=False
            )
            updated_subj = unidecode(updated_subj)
    
    if re.sub(r'[^\w\s]', '', updated_subj) != 'None':
        if triplet['subject'] != updated_subj:
            aligner.add_entity(entity_name=updated_subj, alias=triplet['subject'], entity_type=triplet['subject_type'], sample_id=sample_id)
        triplet['subject'] = updated_subj
    else:
        aligner.add_entity(entity_name=triplet['subject'], alias=triplet['subject'], entity_type=triplet['subject_type'], sample_id=sample_id)
        
    return triplet

In [106]:
text = "Borisov is up for Best Supporting Actor for his role in Sean Baker's “Anora” at the Academy Awards on March 2, making him the first Russian to be nominated in an acting category since the fall of the Soviet Union."

In [107]:
import time

start_time = time.time()
result = extract_triplets(text, 0)
end_time = time.time()
print(f"\nExecution time: {end_time - start_time:.2f} seconds")


Text: "Borisov is up for Best Supporting Actor for his role in Sean Baker's “Anora” at the Academy Awards on March 2, making him the first Russian to be nominated in an acting category since the fall of the Soviet Union.
Extracted Triplet: {"subject": "Borisov", "relation": "nominated for", "object": "Best Supporting Actor", "subject_type": "human", "object_type": "award"}
Candidate Triplets: {"subject": "Borisov", "relation": "nominated for", "object": "Best Supporting Actor", "subject_types": ["group of humans", "human"], "object_types": ["literary award", "award ceremony", "award", "class of award"]}
{"subject": "Borisov", "relation": "award received", "object": "Best Supporting Actor", "subject_types": ["human", "group of humans", "human race", "language (particular system of communication, often named for the region or peoples that use it)", "human language", "human population", "human biblical figure", "sex of humans", "imaginary human", "hypothetical person"], "object_types": ["

In [108]:
extractor.calculate_cost()

0.01436

In [109]:
list(db.get_collection('triplets').find({}))

[{'_id': ObjectId('68279f1430e7643cea0e9db9'),
  'subject': 'Borisov',
  'relation': 'nominated for',
  'object': 'Best Supporting Actor',
  'subject_type': 'human',
  'object_type': 'class of award',
  'qualifiers': [{'relation': 'for work', 'object': 'Anora'},
   {'relation': 'award received', 'object': 'Academy Awards'},
   {'relation': 'point in time', 'object': 'March 2'}],
  'sample_id': 0},
 {'_id': ObjectId('68279f1430e7643cea0e9dba'),
  'subject': 'Borisov',
  'relation': 'nominee',
  'object': 'acting category',
  'subject_type': 'human',
  'object_type': 'class of award',
  'qualifiers': [{'relation': 'since', 'object': 'fall of the Soviet Union'}],
  'sample_id': 0},
 {'_id': ObjectId('68279f1430e7643cea0e9dbb'),
  'subject': 'Anora',
  'relation': 'director',
  'object': 'Sean Baker',
  'subject_type': 'film',
  'object_type': 'human',
  'qualifiers': [],
  'sample_id': 0},
 {'_id': ObjectId('68279f3130e7643cea0e9dc2'),
  'subject': 'Borisov',
  'relation': 'nominated for'

In [101]:
import time

start_time = time.time()
result = extract_triplets(text, 0)
end_time = time.time()
print(f"\nExecution time: {end_time - start_time:.2f} seconds")


Text: "Borisov is up for Best Supporting Actor for his role in Sean Baker's “Anora” at the Academy Awards on March 2, making him the first Russian to be nominated in an acting category since the fall of the Soviet Union.
Extracted Triplet: {"subject": "Borisov", "relation": "nominated for", "object": "Best Supporting Actor", "subject_type": "human", "object_type": "award category"}
Candidate Triplets: {"subject": "Borisov", "relation": "nominated for", "object": "Best Supporting Actor", "subject_types": ["group of humans", "human"], "object_types": ["award ceremony", "award", "class of award"]}
{"subject": "Borisov", "relation": "award received", "object": "Best Supporting Actor", "subject_types": ["human", "group of humans", "human race", "language (particular system of communication, often named for the region or peoples that use it)", "human language", "human population", "human biblical figure", "sex of humans", "imaginary human", "hypothetical person"], "object_types": ["award cer

In [102]:
extractor.calculate_cost()

0.0433075

In [83]:
from utils.structured_dynamic_index_utils import Aligner as Aligner
aligner = Aligner()
extractor = LLMTripletExtractor(model=model_name)

In [84]:
def extract_triplets(text):
    """Extract and refine knowledge graph triplets from text using LLM.
    
    Args:
        text (str): Input text to extract triplets from
        
    Returns:
        tuple: (final_triplets, filtered_triplets) where:
            - final_triplets: List of validated and refined triplets
            - filtered_triplets: List of triplets that couldn't be validated
    """
    # Extract initial triplets using LLM
    extracted_triplets = extractor.extract_triplets_from_text(text)
    
    final_triplets = []
    filtered_triplets = []

    for triplet in extracted_triplets['triplets']:
        # print("1st step triplet: ", triplet)
        
        # Get candidate entity types
        subj_type_ids, obj_type_ids = aligner.retrieve_similar_entity_types(triplet=triplet)
        
        # Get candidate properties/relations
        properties = aligner.retrieve_properties_for_entity_type(
            target_relation=triplet['relation'],
            object_types=obj_type_ids, 
            subject_types=subj_type_ids,
            k=5
        )

        # Build candidate triplet backbones
        # print(properties)
        candidates = []
        for prop_id, prop_label, prop_direction in properties:
            if prop_direction == 'direct':
                subject_types = set(subj_type_ids) & set(aligner.prop2constraints[prop_id]['Subject type constraint'])
                object_types = set(obj_type_ids) & set(aligner.prop2constraints[prop_id]['Value-type constraint'])
            else:
                object_types = set(subj_type_ids) & set(aligner.prop2constraints[prop_id]['Value-type constraint']) 
                subject_types = set(obj_type_ids) & set(aligner.prop2constraints[prop_id]['Subject type constraint'])

            # Use original type sets if no constraints matched
            subject_types = subj_type_ids if len(subject_types) == 0 else subject_types
            object_types = obj_type_ids if len(object_types) == 0 else object_types

            candidates.append({
                "subject": triplet['subject'] if prop_direction == 'direct' else triplet['object'],
                "relation": prop_label,
                'object': triplet['object'] if prop_direction == 'direct' else triplet['subject'],
                "subject_types": [aligner.entity_type2label[t] for t in subject_types],
                "object_types": [aligner.entity_type2label[t] for t in object_types]
            })

            # print({
            #     "subject": triplet['subject'] if prop_direction == 'direct' else triplet['object'],
            #     "relation": prop_label,
            #     'object': triplet['object'] if prop_direction == 'direct' else triplet['subject'],
            #     "subject_types": [aligner.entity_type2label[t] for t in subject_types],
            #     "object_types": [aligner.entity_type2label[t] for t in object_types]
            # })


        # Refine relation and entity types using LLM - choose among valid backbones for triplet
        backbone_triplet = extractor.refine_relation_and_entity_types(
            text=text, 
            triplet=triplet,
            candidate_triplets=candidates
        )
        backbone_triplet['qualifiers'] = triplet['qualifiers']

        # Refine entity names
        final_triplet = refine_entities(text, backbone_triplet, aligner)
        
        final_triplets.append(final_triplet)
        # print("2nd resulted triplet: ", final_triplet)

    # print("-"*100)
    return final_triplets, filtered_triplets


def refine_entities(text, triplet, aligner):
    """Refine entity names using type constraints."""

    triplet['subject'] = unidecode(triplet['subject'])
    triplet['object'] = unidecode(triplet['object'])

    # Handle object refinement
    obj_type = triplet['object_type']
    obj_type_id = aligner.label2entity_type.get(obj_type, '')
    obj_hierarchy = [obj_type_id] + aligner.entity2hierarchy.get(obj_type_id, [])
    updated_obj = 'None'
    
    # do not change time or quantity entities
    if not any(t in ['Q186408', 'Q309314'] for t in obj_hierarchy):
        similar_objects = aligner.retrieve_entity_by_type(
            entity=triplet['object'],
            entity_type=obj_type
        )
        if similar_objects:
            updated_obj = extractor.refine_entity(
                text=text,
                triplet=triplet,
                candidates=similar_objects,
                is_object=True
            )
            updated_obj = unidecode(updated_obj)
    
    if re.sub(r'[^\w\s]', '', updated_obj) != 'None':
        triplet['object'] = updated_obj
    else:
        aligner.add_entity(entity=triplet['object'], entity_type=obj_type)

    # Handle subject refinement  
    updated_subj = 'None'
    similar_subjects = aligner.retrieve_entity_by_type(
        entity=triplet['subject'],
        entity_type=triplet['subject_type']
    )
    if similar_subjects:
        updated_subj = extractor.refine_entity(
            text=text,
            triplet=triplet,
            candidates=similar_subjects,
            is_object=False
        )
        updated_subj = unidecode(updated_subj)
    
    if re.sub(r'[^\w\s]', '', updated_subj) != 'None':
        triplet['subject'] = updated_subj
    else:
        aligner.add_entity(entity=triplet['subject'], entity_type=triplet['subject_type'])
        
    return triplet

In [87]:
import time
# aligner = Aligner()
start_time = time.time()
result = extract_triplets(text)
end_time = time.time()
print(f"\nExecution time: {end_time - start_time:.2f} seconds")


Text: "Borisov is up for Best Supporting Actor for his role in Sean Baker's “Anora” at the Academy Awards on March 2, making him the first Russian to be nominated in an acting category since the fall of the Soviet Union.
Extracted Triplet: {"subject": "Borisov", "relation": "nominated for", "object": "Best Supporting Actor", "subject_type": "human", "object_type": "award category"}
Candidate Triplets: {"subject": "Borisov", "relation": "nominated for", "object": "Best Supporting Actor", "subject_types": ["group of humans", "human"], "object_types": ["award", "class of award"]}
{"subject": "Borisov", "relation": "award received", "object": "Best Supporting Actor", "subject_types": ["human", "human biblical figure", "hypothetical person", "group of humans", "fictional human", "language (particular system of communication, often named for the region or peoples that use it)", "human language", "posture", "imaginary human", "legendary figure"], "object_types": ["award", "class of award"]}
{

In [88]:
extractor.calculate_cost()

0.04104