In [1]:
import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

import owlready2 as owl
from owlready2 import *
import types

import scipy
#from scipy.spatial import ConvexHull
#import cdd
#from cdd import RepType, Matrix, Polyhedron
#from fractions import Fraction

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
from datetime import datetime
import random





device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [2]:
# dir = '/Users/victorlacerda/Documents/VSCode/ELH Implementation/NormalizedOntologies/goslimyeast.xml.owl'
# dir = '/Users/victorlacerda/Documents/VSCode/ELH Implementation/NormalizedOntologies/galennorm.xml.owl'
# dir = '/Users/victorlacerda/Documents/VSCode/ELH Implementation/NormalizedOntologies/gonorm.xml.owl'
# dir = '/Users/victorlacerda/Documents/VSCode/ELH Implementation/NormalizedOntologies/'
dir = '/Users/victorlacerda/Documents/VSCode/ELHFaithfulness/NormalizedOntologies/family_ontology.owl'

In [3]:
'''
Class for creating entities to
populate the creation of the
canonical models.

The .name attribute is used to
create a single representation
for concepts like A and B / 
B and A, as they are the same.
'''

class CanonicalModelElements:

    concept_names = {}
    concept_intersections = {}
    concept_restrictions = {}
    all_concepts = {}

    def __init__(self, concept):
        self.concept = concept
        self.name = self.get_name()
        self.get_element_dict()

    def get_name(self):

        # add \Top
        
        if type(self.concept) == ThingClass:
            return self.concept.name

        elif type(self.concept) == Restriction:
            return 'exists_' + self.concept.property.name + '.' + self.concept.value.name
        
        else:
            return 'And_' + ''.join(sorted(self.concept.Classes[0].name + self.concept.Classes[1].name)) # The name is sorted to avoid that (e.g) (A \and B) and (B \and A) are treated as different concepts
        
    def get_element_dict(self):

        if type(self.concept) == ThingClass:
            CanonicalModelElements.concept_names[self.name] = self
            CanonicalModelElements.all_concepts[self.name] = self

        elif type(self.concept) == Restriction:
            CanonicalModelElements.concept_restrictions[self.name] = self
            CanonicalModelElements.all_concepts[self.name] = self

        elif type(self.concept) == And:
            CanonicalModelElements.concept_intersections[self.name] = self
            CanonicalModelElements.all_concepts[self.name] = self

In [4]:
def get_canonical_model_elements(concept_names_iter, role_names_iter, ontology):
    
    onto = ontology
    top = owl.Thing
    bottom = owl.Nothing

    CanonicalModelElements(top)
    CanonicalModelElements(bottom)

    for concept_name in concept_names_iter:
        
        CanonicalModelElements(concept_name)
        for concept_name2 in concept_names_iter:
        
            with onto:
                gca = GeneralClassAxiom(concept_name & concept_name2)
                gca.is_a.append(concept_name & concept_name2)
            
            CanonicalModelElements(gca.left_side)

    print('')
    print('')
    print('All Concept Names and Concept Intersections have been preprocessed for the creation of the canonical model.')

    concept_names_iter.append(top)
    # concept_names_iter.append(bottom)

    for role_name in role_names_iter:
        for concept_name in concept_names_iter:
            with onto:
                gca = GeneralClassAxiom(role_name.some(concept_name))
                gca.is_a.append(role_name.some(concept_name))

            CanonicalModelElements(gca.left_side)

    print('')
    print('')
    print('All restrictions have been preprocessed for the creation of the canonical model.')

In [5]:
'''
The main class for creating the canonical model for the ontology.

The canonical model is stored in dictionaries available as class variables 'concept_canonical_interpretation'
and 'role_canonical_interpretation'. 

Args:
    concept_names_dict: a dictionary stored in the CanonicalModelElement class.
    concept_intersection_dict: a dictionary stored in the CanonicalModelElement class.
    concept_restrictions_dict: a dictionary stored in the CanonicalModelElement class.
    all_concepts_dict: a dictionary stored in the CanonicalModelElement class.
    role_names_iter (list): a list containing all role names in the loaded ontology.
'''

class CanonicalModel:

    concept_canonical_interpretation = {}
    role_canonical_interpretation = {}

    def __init__(self, concept_names_dict, concept_intersections_dict, concept_restrictions_dict, all_concepts_dict, role_names_iter):
        
        self.domain = all_concepts_dict
        self.concept_names_dict = concept_names_dict
        self.concept_restrictions_dict = concept_restrictions_dict
        self.concept_intersections_dict = concept_intersections_dict

        self.role_names_iter = role_names_iter

        self.concept_canonical_interp = self.get_concept_name_caninterp() # These are only used to build the concept_canonical_interpretation and role_canonical_interpretation class attributes
        self.role_canonical_interp = self.get_role_name_caninterp()       # The functions do not return anything, they just update the class variables

    def get_concept_name_caninterp(self):

        # The variable concept is a string containing the name of an element of the domain of the canonical model
        # The key to the concept_names_dict variable corresponds to concept.name
        # This name can be used to access the concept in owlready2's format

        for concept in self.concept_names_dict.keys():

            CanonicalModel.concept_canonical_interpretation[concept] = []
            superclasses = self.domain[concept].concept.ancestors(include_self=True, include_constructs=True) # The self.domain[concept] is used to access the CanonicalModelElements type of object,
                                                                                                               # and the attribute .concept is used to access the concept in owlready2 format
                                                                                                              
            for superclass in superclasses:

                if type(superclass) == ThingClass:
                    CanonicalModel.concept_canonical_interpretation[concept].append(superclass.name)

                elif type(superclass) == Restriction:
                    CanonicalModel.concept_canonical_interpretation[concept].append('exists_' + superclass.property.name + '.' + superclass.value.name)

                elif type(superclass) == And:
                    if 'And_' + ''.join(sorted(superclass.Classes[0].name + superclass.Classes[1].name)) in CanonicalModel.concept_canonical_interpretation[concept]:
                        pass
                    else:
                        CanonicalModel.concept_canonical_interpretation[concept].append('And_' + ''.join(sorted(superclass.Classes[0].name + superclass.Classes[1].name)))

    def get_role_name_caninterp(self):

        # First case from Definition 10

        for role_name in self.role_names_iter:

            role_name_str = role_name.name # Accesses the property type object's name as a string
            CanonicalModel.role_canonical_interpretation[role_name_str] = []

            for restriction_name in self.concept_restrictions_dict.keys(): # Where restriction_name denotes a \exists r.B type of concept 'exists_' + self.concept.property.name + '.' + self.concept.value.name
                c_B = self.concept_restrictions_dict[restriction_name].concept.value.name

                if role_name_str in restriction_name:
                    superclasses = self.domain[restriction_name].concept.ancestors(include_self=True, include_constructs=False)

                    for superclass in superclasses:
                        super_superclasses = superclass.ancestors(include_self=True, include_constructs=True)

                        for super_superclass in super_superclasses:

                            if type(super_superclass) == ThingClass:
                                c_D = super_superclass.name
                                CanonicalModel.role_canonical_interpretation[role_name_str].append(tuple(c_D, c_B))

                            elif type(super_superclass) == Restriction:
                                c_D = 'exists_' + super_superclass.property.name + '.' + super_superclass.value.name
                                CanonicalModel.role_canonical_interpretation[role_name_str].append(tuple(c_D, c_B))

                            elif type(super_superclass) == And:
                                c_D = 'And_' + ''.join(sorted(super_superclass.Classes[0].name + super_superclass.Classes[1].name))
                                CanonicalModel.role_canonical_interpretation[role_name_str].append(tuple(c_D, c_B))

        # Second case from Definition 10: r \sqsubset s 
            
        for role_name in self.role_names_iter:

            superroles = role_name.ancestors(include_self=True)
            role_name_str = role_name.name
            
            for superrole in superroles:
                for restriction_name in self.concept_restrictions_dict.keys():
                    if superrole.name in restriction_name:
                        pair = tuple((restriction_name, self.domain[restriction_name].concept.value.name))
                        CanonicalModel.role_canonical_interpretation[role_name_str].append(pair)

In [6]:
'''
Main function for creating the canonical model.

    Args:
        onto_dir (str): a string pointing to the directory where the ontology is stored.

    Returns:
        canmodel (CanonicalModel): returns a variable containing the canonical model. 
        
        Attention: the interpretations of concept names and role names can also be accessed via class variables
        from the CanonicalModel class.
'''

def create_canonical_model(onto_dir):

    onto = get_ontology(onto_dir)
    onto = onto.load()
    gcas_iter = list(onto.general_class_axioms()) # Attention: this will not work unless the generator is converted into a list
    concept_names_iter = list(onto.classes())
    role_names_iter = list(onto.properties())

    #for concept in concept_names_iter:
    #    concept.namespace = onto.ontology

    #for role in role_names_iter:
    #    role.namespace = onto.ontology

    get_canonical_model_elements(concept_names_iter, role_names_iter, onto)

    print('')
    print('============================================================================')
    print('')
    print('Starting to reason.')
    print('')

    with onto:
        sync_reasoner()
        
    # onto.save("inferences_goslimyeast.owl")

    gcas_iter = list(onto.general_class_axioms()) # Attention: this will not work unless the generator is converted into a list
    concept_names_iter = list(onto.classes())
    role_names_iter = list(onto.properties())

    print('')
    print('============================================================================')
    print('')
    print('Done reasoning. Creating the canonical model.')
    print('')
    canmodel = CanonicalModel(CanonicalModelElements.concept_names, CanonicalModelElements.concept_intersections, CanonicalModelElements.concept_restrictions, CanonicalModelElements.all_concepts, role_names_iter)
    print('============================================================================')
    print('')
    print('Concluded creating canonical model.')

    return canmodel

In [7]:
# Instantiates the canonical model

canmodel = create_canonical_model(dir)



All Concept Names and Concept Intersections have been preprocessed for the creation of the canonical model.


All restrictions have been preprocessed for the creation of the canonical model.


Starting to reason.



* Owlready2 * Running HermiT...
    java -Xmx2000M -cp /opt/homebrew/Caskroom/miniforge/base/envs/kgenv/lib/python3.11/site-packages/owlready2/hermit:/opt/homebrew/Caskroom/miniforge/base/envs/kgenv/lib/python3.11/site-packages/owlready2/hermit/HermiT.jar org.semanticweb.HermiT.cli.CommandLine -c -O -D -I file:////var/folders/wg/g5861gcs6k5d3rbq_rncztjw0000gn/T/tmphnw8cq4b




Done reasoning. Creating the canonical model.


Concluded creating canonical model.


* Owlready2 * HermiT took 0.8157849311828613 seconds
* Owlready * (NB: only changes on entities loaded in Python are shown, other changes are done but not listed)


In [8]:
'''
Utility functions for initializing
the class EntityEmbedding. They
allow us to access dictionaries
containing indexes and canonical
interpretation of concepts
and roles as class.
'''

def get_concept_names_idx_dict(canmodel):
   conceptnames_idx_dict = {concept_name: idx for idx, concept_name in enumerate(CanonicalModel.concept_canonical_interpretation.keys())}
   return conceptnames_idx_dict

def get_role_names_idx_dict(canmodel):
    rolenames_idx_dict = {role_name: idx for idx, role_name in enumerate(CanonicalModel.role_canonical_interpretation.keys())}
    return rolenames_idx_dict

def get_entities_idx_dict(canmodel):
    entities_idx_dict = {entity: idx for idx, entity in enumerate(canmodel.domain.keys())}
    return entities_idx_dict

def get_domain_dict(canmodel):
    return canmodel.domain

# Atenção: a função mu está com complexidade alta devido aos for loops

In [120]:
''' 
Class for obtaining the positional 
embedding for each entity in the domain
of the canonical interpretation.
It represents the Mu Function from the
paper.
'''

class EntityEmbedding:

    # Dictionaries for storing the indices of concept names and role names, entities pairs, respectively
    # Keys are strings and values are integers
    
    concept_names_idx_dict = get_concept_names_idx_dict(canmodel)
    role_names_idx_dict = get_role_names_idx_dict(canmodel)
    entities_idx_dict = get_entities_idx_dict(canmodel)

    # Dictionaries accessing the canonical interpretation of concepts and roles
    # Keys and values are strings
    
    concept_canonical_interpretation_dict = CanonicalModel.concept_canonical_interpretation
    role_canonical_interpretation_dict = CanonicalModel.role_canonical_interpretation

    # Dictionary storing the domain of the canonical model being embedded
    # IMPORTANT: Keys are strings and values are CanonicalModelElements type objects
    
    domain_dict = get_domain_dict(canmodel)

    # Dictionary for easy access to entity embeddings
    # It is initialized with empty values, iteratively built by the .get_embedding_vector() method
    # Key (str): Domain Entity / Value (np.array): EntityEmbedding.embedding_vector

    entity_entityvector_dict = dict.fromkeys(domain_dict.keys())

    def __init__(self, entity_name, emb_dim):
        self.name = entity_name
        self.emb_dim = emb_dim
        self.in_interpretation_of = []
        self.embedding_vector = self.get_embedding_vector()

    def get_embedding_vector(self):
        
        embedding_vector = np.zeros((self.emb_dim,))
        EntityEmbedding.entity_entityvector_dict[self.name] = []

        # Applies the embedding function to the concept names portion of the definition

        for concept_name in EntityEmbedding.concept_canonical_interpretation_dict:
            concept_name_idx = EntityEmbedding.concept_names_idx_dict[concept_name]
        
            if self.name in EntityEmbedding.concept_canonical_interpretation_dict[concept_name]:
                embedding_vector[concept_name_idx] = 1 * 10
                self.in_interpretation_of.append(concept_name)

        # Applies the embedding function to the role names portion of the definition

        for role_name in EntityEmbedding.role_canonical_interpretation_dict:
            
            role_name_idx = len(EntityEmbedding.concept_names_idx_dict) + (EntityEmbedding.role_names_idx_dict[role_name] * len(EntityEmbedding.entities_idx_dict))
            role_name_caninterp = EntityEmbedding.role_canonical_interpretation_dict[role_name]

            for pair in role_name_caninterp:

                entity_2 = pair[1]

                if (self.name, entity_2) == pair:
                    entity_2_idx = EntityEmbedding.entities_idx_dict[entity_2]
                    final_role_entity_pair_idx = role_name_idx + entity_2_idx
                    embedding_vector[final_role_entity_pair_idx] = 1 * 10

        # EntityEmbedding.entity_entityvector_dict[self.name].append(embedding_vector)
        EntityEmbedding.entity_entityvector_dict[self.name] = embedding_vector

        return embedding_vector

In [121]:
'''
Function for creating the binary vectors representing
each element of the domain of the canonical interpretation.

    Args:
        emb_dim (int/float): the number of dimensions of the embedding space.

    Returns:
        embedded_entities (list): a list containing all embeddings of the entities
                                  in the domain. 
    
    The embedded_entities are also available in the dictionary EntityEmbeddings.entity_entityvector_dict
'''

def get_domain_embeddings(emb_dim):

    embedded_entities = []
    counter = 0

   # The entities in the domain are strings
    
    for entity_name in EntityEmbedding.domain_dict:
       embedded_entity = EntityEmbedding(entity_name, emb_dim)
       embedded_entities.append(embedded_entity)
       counter += 1
       
       if counter % 1000 == 0:
           print(counter)
       
    return embedded_entities

In [122]:
'''
Final class for creating the dataset.

Inputs: concept or role names, generated
embeddings for entities in the domain of
the canonical model.

Outputs: geometrical interpretation of
concepts and role names, represented
by vertices defining a region.

One can access the GeometricInterpretation
objects either as elements in a list, or as
values in a class variable dictionary.
'''

class GeometricInterpretation:

    concept_geointerps_dict = dict.fromkeys(CanonicalModel.concept_canonical_interpretation.keys())
    role_geointerps_dict = dict.fromkeys(CanonicalModel.role_canonical_interpretation.keys())

    def __init__(self, name, emb_dim):
        self.name = name
        self.emb_dim = emb_dim
        self.vertices = []
        self.centroid = None
        self.bounding_box_vertices = None

    def get_centroid_naive(self):
        if len(self.vertices) == 0 and self.name in self.concept_geointerps_dict.keys():
            centroid = np.zeros((self.emb_dim,))
            return centroid
        
        elif len(self.vertices) == 0 and self.name in self.role_geointerps_dict.keys():
            centroid = np.zeros((self.emb_dim * 2,)) # The centroid for the regions needs to be doubled due to the concat operation
            return centroid
        
        elif len(self.vertices) > 0 and self.name in self.concept_geointerps_dict.keys():
            n = len(self.vertices)
            centroid = np.zeros((self.emb_dim,))
            matrix = np.vstack(self.vertices)
            centroid = 1/n * np.sum(matrix, axis=0)
            return centroid
        
        elif len(self.vertices) > 0 and self.name in self.role_geointerps_dict.keys():
            n = len(self.vertices)
            centroid = np.zeros((self.emb_dim,))
            matrix = np.vstack(self.vertices)
            centroid = 1/n * np.sum(matrix, axis=0)
            return centroid

    def get_bounding_box_vertices(self):
        pass
        

# There has to be a more efficient way of doing the creating of geometric interpretations for concepts and roles

In [123]:
def index_finder(emb_dim, concept_name_idx_dict, role_name_idx_dict, domain_idx_dict):

    index_dict = {k: None for k in range(emb_dim)}

    for k,v in concept_name_idx_dict.items():

        index_dict[v] = k

    for role in role_name_idx_dict:
        role_init_idx = len(concept_name_idx_dict) + (role_name_idx_dict[role] * len(domain_idx_dict))

        for entity in domain_idx_dict:
            entity_init_idx = domain_idx_dict[entity]
            final_role_entity_pair_idx = role_init_idx + entity_init_idx
            index_dict[final_role_entity_pair_idx] = (role, entity)
        
    return index_dict

In [124]:
def get_faithful_concept_geometric_interps(concept_names_interps, domain_embeddings_list, entity_dims_index_dict, emb_dim, canmodel: CanonicalModel):

    faithful_concept_geometric_interps = []

    for concept_name in concept_names_interps.keys():
        concept_name = GeometricInterpretation(concept_name, emb_dim)

        for embedding in domain_embeddings_list:
            if concept_name.name in embedding.in_interpretation_of:
                concept_name.vertices.append(embedding.embedding_vector)
            
        GeometricInterpretation.concept_geointerps_dict[concept_name.name] = concept_name
        concept_name.centroid = concept_name.get_centroid_naive()
        
        faithful_concept_geometric_interps.append(concept_name)

    return faithful_concept_geometric_interps

In [125]:
def get_faithful_role_geometric_interps(role_names_interps, entity_embeddings_list, entity_dims_index_dict, emb_dim, canmodel: CanonicalModel):
    
    faithful_role_geometric_interps = []
    idx_entity_dict = entity_dims_index_dict
    #entity_idx_dict = {v: k for k,v in entity_dims_index_dict}

    relevant_idxs = len(canmodel.concept_names_dict)-1

    for role_name in role_names_interps.keys():
        role_name_str = role_name
        role_name = GeometricInterpretation(role_name_str, emb_dim)

        for entity in entity_embeddings_list:

            onehot_idx_list = np.where(entity.embedding_vector == 1)[0]
            #print(f'This is the entity: {entity} and this is the onehot_idx_list: {onehot_idx_list}')

            for idx in onehot_idx_list: # I could just look at the TRULY relevant indexes
                if idx > relevant_idxs:
                    role_entity_pair = idx_entity_dict[idx]
                    r_name_str = role_entity_pair[0]
                    e_name_str = role_entity_pair[1]

                    if r_name_str == role_name_str:
                        e_embedding = EntityEmbedding.entity_entityvector_dict[e_name_str]
                        role_name.vertices.append(np.concatenate((entity.embedding_vector, e_embedding)))

        GeometricInterpretation.role_geointerps_dict[role_name_str] = role_name
        role_name.centroid = role_name.get_centroid_naive()
        faithful_role_geometric_interps.append(role_name)

    return faithful_role_geometric_interps

In [126]:
def create_tbox_embeddings(canonical_model: CanonicalModel):

    domain = canonical_model.domain # Keys are strings and values are CanonicalModelElements type objects
    concept_names_interps = canonical_model.concept_canonical_interpretation # Keys are strings and values are lists of strings.
    role_names_interps = canonical_model.role_canonical_interpretation # Keys are strings and values are lists of tuples. Tuples are of form ('C', 'D'), with C and D strings.

    EMB_DIM = len(concept_names_interps) + len(role_names_interps) * len(domain)

    print('================EMBEDDING DIMENSION================')
    print(f'Concept Name dimensions: {len(concept_names_interps)}')
    print(f'The number of role names is: {len(role_names_interps)}')
    print(f'The size of the domain is: {len(domain)}')
    print(f'Role names dimensions: {len(role_names_interps) * len(domain)}')
    print('===================================================')
    print('')
    print(f'Final embedding dimension: {EMB_DIM}')
    print(f'The final dimension for role regions is: {EMB_DIM * 2}')

    domain_embeddings_list = get_domain_embeddings(EMB_DIM)
    
    concept_names_ordering = EntityEmbedding.concept_names_idx_dict
    role_names_ordering = EntityEmbedding.role_names_idx_dict
    entities_ordering = EntityEmbedding.entities_idx_dict
    
    print('')
    print('===============FINISHED EMBEDDINGS===============')
    print(f'There are {len(domain_embeddings_list)} vector embeddings.')
    print('')

    index_finder_dict = index_finder(EMB_DIM, concept_names_ordering, role_names_ordering, entities_ordering)

    faithful_concept_geometric_interps = get_faithful_concept_geometric_interps(concept_names_interps, domain_embeddings_list, index_finder_dict, EMB_DIM, canonical_model)

    print('============FINISHED INTERPS CONCEPT=============')
    print(f'There are {len(faithful_concept_geometric_interps)} regions for concept names.')
    print('')

    faithful_role_geometric_interps = get_faithful_role_geometric_interps(role_names_interps, domain_embeddings_list, index_finder_dict, EMB_DIM, canonical_model)

    print('=============FINISHED INTERPS ROLES==============')
    print(f'There are {len(faithful_role_geometric_interps)} regions for role names.')
    print('')

    return domain_embeddings_list, faithful_concept_geometric_interps, faithful_role_geometric_interps, index_finder_dict # Returns the faithful geometric interpretations for concepts and roles as lists

In [127]:
domain_embeddings, concept_geointerps, role_geointerps, idx_finder_dict = create_tbox_embeddings(canmodel)

Concept Name dimensions: 10
The number of role names is: 5
The size of the domain is: 91
Role names dimensions: 455

Final embedding dimension: 465
The final dimension for role regions is: 930

There are 91 vector embeddings.

There are 10 regions for concept names.

There are 5 regions for role names.



In [129]:
'''
Function for creating the pre-split dataset containing facts from the ontology.
Distinguishes between concept assertions and role assertions.


    Args: ontology_dir (str): the directory from the ontology
          concept_geointerps_dict (dict): the geometrical interpretations for concepts generated by create_tbox_embeddings()
          role_geointerps_dict (dict): the geometrical interpretations for roles generated by create_tbox_embeddings()

    Returns:
          X_concepts (np.array): A dataframe with columns 'Concept', 'Entity', 'y_true' (equivalent to concept.centroid())
          X_roles (np.array): A dataframe with columns 'SubjectEntity', 'Role', 'ObjectEntity', 'y_true' (equivalent to role.centroid())
          y_concepts (np.array):
          y_roles (np.array):
          vocabulary_dict (dict): A vocabulary with key (int): value (str) for entities in the domain.
'''

def get_abox_dataset(ontology_dir: str, concept_geointerps_dict: dict, role_geointerps_dict: dict, concept_to_idx: dict, role_to_idx: dict):
    
    ontology = get_ontology(ontology_dir)
    ontology = ontology.load()
    
    X_concepts = []
    X_roles = []
    y_concepts = []
    y_roles = []

    entities = [entity.name for entity in list(ontology.individuals())]
    
    concept_to_idx_vocab = concept_to_idx
    idx_to_concept_vocab = {value: key for key, value in concept_to_idx_vocab.items()}

    role_to_idx_vocab = role_to_idx
    idx_to_role_vocab = {value: key for key, value in role_to_idx_vocab.items()}
    
    entity_to_idx_vocab = {value: index for index, value in enumerate(entities)}
    idx_to_entity_vocab = {value: key for key, value in entity_to_idx_vocab.items()}

    for individual in list(ontology.individuals()):

        all_facts = individual.is_a # Is this actually getting all assertions?

        for concept in all_facts:
            if type(concept) == ThingClass:
                concept = concept_geointerps_dict[concept.name]
                fact = np.array([concept_to_idx_vocab[concept.name], entity_to_idx_vocab[individual.name]])
                y_label = np.array(concept.centroid)
                X_concepts.append(fact)
                y_concepts.append(y_label)

            # There are actually no assertions of the type A and B(a)
            # Check later how to get them
                
            elif type(concept) == And:
                print('And') # remove print statement
                concept1 = concept_geointerps_dict[concept.Classes[0].name]
                concept2 = concept_geointerps_dict[concept.Classes[1].name]
                concept_name = 'And_' + ''.join(sorted(concept1.name + concept2.name))
                fact = np.array([concept_to_idx_vocab[concept_name], entity_to_idx_vocab[individual.name]])
                y_label = np.array((concept1.centroid + concept2.centroid)/2) # The golden label for an intersection is just the average of the centroid of the two regions
                X_concepts.append(fact)
                y_concepts.append(y_label)

            # There are actually no assertions of the type exists r.B
            # Check later how to get them
            
            elif type(concept) == Restriction:
                print('restriction') # remove print statement
                concept = concept_geointerps_dict['exists_' + concept.property.name + '.' + concept.value.name]
                fact = np.array([concept_to_idx_vocab[concept.name], entity_to_idx_vocab[individual.name]])
                y_label = np.array(concept.centroid)
                X_concepts.append(fact)
                y_concepts.append(y_label)

        relevant_roles = individual.get_properties()
        individual_name = individual.name

        for role in relevant_roles:
            role_geo = role_geointerps_dict[role.name]
            subject_list = role[individual] # This syntax is from the owlready2 library
            for subject in subject_list:
                fact = np.array([entity_to_idx_vocab[individual.name], role_to_idx_vocab[role.name], entity_to_idx_vocab[subject.name]])
                X_roles.append(fact)
                y_label = y_roles.append(np.array(role_geo.centroid))

    return np.array(X_concepts), np.array(X_roles), np.array(y_concepts), np.array(y_roles), entity_to_idx_vocab, idx_to_entity_vocab, concept_to_idx_vocab, idx_to_concept_vocab, role_to_idx_vocab, idx_to_role_vocab

In [130]:
X_concepts, X_roles, y_concepts, y_roles, entity_to_idx_vocab, idx_to_entity_vocab, concept_to_idx_vocab, idx_to_concept_vocab, role_to_idx_vocab, idx_to_role_vocab = get_abox_dataset(dir,
                                                                                                                                                                                        GeometricInterpretation.concept_geointerps_dict,
                                                                                                                                                                                        GeometricInterpretation.role_geointerps_dict,
                                                                                                                                                                                        EntityEmbedding.concept_names_idx_dict,
                                                                                                                                                                                        EntityEmbedding.role_names_idx_dict)

In [131]:
from torch.utils.data import DataLoader, Dataset, random_split
from torch.utils.data import Subset

In [144]:
class OntologyDataset(Dataset):
    def __init__(self, data, labels):
        self.X = torch.tensor(data, dtype=torch.long)
        self.y = torch.tensor(labels, dtype=torch.float32)
        
    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx].long(), self.y[idx]

In [145]:
TRAIN_SIZE_PROPORTION = 0.8

In [146]:
ConceptDataset = OntologyDataset(X_concepts, y_concepts)

dataset_size = len(ConceptDataset)
train_size = int(TRAIN_SIZE_PROPORTION * dataset_size)
test_size = dataset_size - train_size

trainConceptDataset, testConceptDataset = random_split(ConceptDataset, [train_size, test_size])

In [147]:
RoleDataset = OntologyDataset(X_roles, y_roles)

dataset_size = len(RoleDataset)
train_size = int(TRAIN_SIZE_PROPORTION * dataset_size)
test_size = dataset_size - train_size

trainRoleDataset, testRoleDataset = torch.utils.data.random_split(RoleDataset, [train_size, test_size])

In [203]:
torch.manual_seed(269)
BATCH_SIZE = 16

train_ConceptDataLoader = DataLoader(trainConceptDataset, batch_size = BATCH_SIZE, shuffle=False)
test_ConceptDataLoader = DataLoader(testConceptDataset, batch_size = BATCH_SIZE, shuffle=False)

train_RoleDataLoader = DataLoader(trainRoleDataset, batch_size = BATCH_SIZE, shuffle=False)
test_RoleDataLoader = DataLoader(testRoleDataset, batch_size = BATCH_SIZE, shuffle=False)

In [218]:
class FaithEL(nn.Module):
    def __init__(self, emb_dim, phi, individual_vocabulary, concept_vocabulary, role_vocabulary):
        super(FaithEL, self).__init__()
        self.emb_dim = emb_dim
        self.phi = phi
        
        self.individual_embedding_dict = nn.Embedding(len(individual_vocabulary),
                                                      emb_dim,
                                                      max_norm=1 * 100
                                                      )
        
        self.concept_embedding_dict = nn.Embedding(len(concept_vocabulary),
                                                   emb_dim,
                                                   max_norm=1 * 100
                                                   )

        self.role_embedding_dict = nn.Embedding(len(role_vocabulary),
                                                emb_dim * 2,
                                                max_norm=1 * 100)
    
    def forward(self, data):
    
        # Concept assertions are of the form ['Concept', 'Entity']
        # Role assertions are of the form ['SubjectEntity', 'Role', 'ObjectEntity']
        
        subj_entity_idx = 1 if len(data[0]) == 2 else 0 # Performs a check to see whether the model has received a C assert or R assert

        if subj_entity_idx == 1:
            concept_idx = 0

            subj_entity = data[:, subj_entity_idx]
            concept = data[:, concept_idx]

            c_assertion_out1 = self.concept_embedding_dict(concept) # Outputs the moving parameter for the concept
            c_assertion_out2 = self.individual_embedding_dict(subj_entity) # Outputs the embedding for the individual
            
            out1 = c_assertion_out1
            out2 = c_assertion_out2
            
            return out1, out2

        elif subj_entity_idx == 0:
            role_idx = 1
            obj_entity_idx = 2
        
            subject_entity = self.individual_embedding_dict(data[:, subj_entity_idx])
            object_entity = self.individual_embedding_dict(data[:, obj_entity_idx])
            role = self.role_embedding_dict(data[:, role_idx])

            r_assertion_out1 = role # Role parameter embedding
            r_assertion_out2 = torch.cat((subject_entity, object_entity), 1) # Concatenation of subject and object

            out1 = r_assertion_out1
            out2 = r_assertion_out2
            
            return out1, out2

# Training params

In [300]:
torch.manual_seed(269)
PHI = 0
EMB_DIM = 465
model = FaithEL(EMB_DIM, PHI, entity_to_idx_vocab, concept_to_idx_vocab, role_to_idx_vocab)

In [301]:
def train(model, concept_dataloader, role_dataloader, loss_fn, optimizer):
    model.train()
    total_loss = 0.0
    num_batches = len(concept_dataloader)

    for i, data in enumerate(concept_dataloader):
        inputs, labels = data
        optimizer.zero_grad()
        outputs1, outputs2 = model(inputs) # Outputs 1 = Concept Parameter, Outputs 2 = Entity Parameter
        loss = loss_fn(outputs2, labels) + loss_fn(outputs1, outputs2) + model.phi * loss_fn(outputs1, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    for i, data in enumerate(role_dataloader):
        inputs, labels = data
        optimizer.zero_grad()
        outputs1, outputs2 = model(inputs) # Outputs 1 = Role Parameter, Outputs 2 = Entity concat parameter
        loss = loss_fn(outputs2, labels) + loss_fn(outputs1, outputs2) + model.phi * loss_fn(outputs1, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    for i, data in enumerate(role_dataloader):
        inputs, labels = data
        optimizer.zero_grad()
        outputs1, outputs2 = model(inputs) # Outputs 1 = Role Parameter, Outputs 2 = Entity concat parameter
        loss = loss_fn(outputs2, labels) + loss_fn(outputs1, outputs2) + model.phi * loss_fn(outputs1, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / num_batches

In [302]:
def test(model, concept_dataloader, role_dataloader, loss_fn):
    model.eval()
    total_loss = 0.0
    num_batches = len(concept_dataloader)

    with torch.no_grad():
        for i, data in enumerate(concept_dataloader):
            inputs, labels = data
            outputs1, outputs2 = model(inputs)
            loss = loss_fn(outputs2, labels) + loss_fn(outputs1, outputs2) + model.phi * loss_fn(outputs1, labels)
            total_loss += loss.item()

        for i, data in enumerate(role_dataloader):
            inputs, labels = data
            outputs1, outputs2 = model(inputs)
            loss = loss_fn(outputs2, labels) + loss_fn(outputs1, outputs2) + model.phi * loss_fn(outputs1, labels)
            total_loss += loss.item()

    return total_loss / num_batches

In [303]:
EPOCHS = 500

In [304]:
log_epoch = 10
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [305]:
for epoch in range(1, EPOCHS + 1):
    train_loss = train(model, train_ConceptDataLoader, train_RoleDataLoader, loss_fn, optimizer)
    test_loss = test(model, test_ConceptDataLoader, test_RoleDataLoader, loss_fn)

    if epoch % log_epoch == 0:
        print(f'Epoch {epoch}/{EPOCHS} -> Train Loss: {train_loss:.4f} | Test Loss: {test_loss:.4f}')

Epoch 10/500 -> Train Loss: 12.8823 | Test Loss: 7.9096
Epoch 20/500 -> Train Loss: 11.9268 | Test Loss: 7.4275
Epoch 30/500 -> Train Loss: 11.0544 | Test Loss: 6.9835
Epoch 40/500 -> Train Loss: 10.2552 | Test Loss: 6.5734
Epoch 50/500 -> Train Loss: 9.5227 | Test Loss: 6.1942
Epoch 60/500 -> Train Loss: 8.8515 | Test Loss: 5.8435
Epoch 70/500 -> Train Loss: 8.2373 | Test Loss: 5.5193
Epoch 80/500 -> Train Loss: 7.6757 | Test Loss: 5.2199
Epoch 90/500 -> Train Loss: 7.1627 | Test Loss: 4.9434
Epoch 100/500 -> Train Loss: 6.6944 | Test Loss: 4.6880
Epoch 110/500 -> Train Loss: 6.2670 | Test Loss: 4.4523
Epoch 120/500 -> Train Loss: 5.8769 | Test Loss: 4.2346
Epoch 130/500 -> Train Loss: 5.5207 | Test Loss: 4.0333
Epoch 140/500 -> Train Loss: 5.1953 | Test Loss: 3.8472
Epoch 150/500 -> Train Loss: 4.8976 | Test Loss: 3.6748
Epoch 160/500 -> Train Loss: 4.6249 | Test Loss: 3.5150
Epoch 170/500 -> Train Loss: 4.3748 | Test Loss: 3.3666
Epoch 180/500 -> Train Loss: 4.1449 | Test Loss: 3.22

### Evaluation metrics

In [306]:
'''
Outputs: ranked list of predictions, with indication of whether the original fact is in the topk ones.

For this I need the full list of entities, I need to choose whether to corrput the tail or the head, I also need the golden label.

For example, lets say I have C(a), then the golden label is just y. Then I create several (how many?) corrupted triples + the correct one.
I then pass them through the model, get the final embedding, take the distance of all of them from the golden label, rank them, and check whether
the correct one is on the top. This is quite straightforward.
'''

'\nOutputs: ranked list of predictions, with indication of whether the original fact is in the topk ones.\n\nFor this I need the full list of entities, I need to choose whether to corrput the tail or the head, I also need the golden label.\n\nFor example, lets say I have C(a), then the golden label is just y. Then I create several (how many?) corrupted triples + the correct one.\nI then pass them through the model, get the final embedding, take the distance of all of them from the golden label, rank them, and check whether\nthe correct one is on the top. This is quite straightforward.\n'

In [307]:
def print_results(hit_ratios, hit_list):

    k_values = [1, 3, 10, 100, 469]
    
    for i in range(len(hit_ratios)):
            print(f'Top{str(k_values[i])} hits: {hit_list[i]}')
            print(f'Hits@{str(k_values[i])}: {hit_ratios[i]}')
            print('')

In [308]:
k_values = [1, 3, 10, 100, 469]
CENTROID_SCORE = False # When set to True, model scores assertion w.r.t distance to the centroid instead of to the moving parameter for concepts/roles

In [309]:
def get_hits_at_k_concept_assertions(model,
                  test_concept_assertions=Dataset, test_role_assertions=Dataset,
                  entity_to_idx_vocab=dict, idx_to_entity_vocab=dict,
                  idx_to_concept_vocab=dict, idx_to_role_vocab=dict,
                  centroid_score = False
                  ):
    
    top1 = 0
    top3 = 0
    top10 = 0
    top100 = 0
    top_all = 0

    model.eval()

    hits = []

    relevant_concept_idx = []

    # Gathers only concepts appearing in the test set (it is not guaranteed that if a concept appears in the dataset, then it appears here)

    for assertion in test_concept_assertions:
        inputs, _ = assertion
        if inputs[0] not in relevant_concept_idx:
            relevant_concept_idx.append(inputs[0])
        else:
            pass

    with torch.no_grad():

        # iterate over concepts appearing on test set
        # rank scores by individual name
        # incrementar os counters se tiver um hit@k
        # dividir os counters de hits pelo total dos sets
        
        print(f'Number of concepts appearing on the test set: {len(relevant_concept_idx)}')
        print('')

        for concept_idx in relevant_concept_idx:
            assertion_scores = []

            for _, entity_idx in entity_to_idx_vocab.items():
                eval_sample = torch.tensor([concept_idx, entity_idx]).unsqueeze(0)
                outputs1, outputs2 = model(eval_sample) # out1 = Concept parameter, out2 = Individual parameter

                if centroid_score == False:
                    assertion_score = torch.dist(outputs1, outputs2, p=2)
                else:
                    assertion_score = torch.dist(GeometricInterpretation.concept_geointerps_dict[idx_to_entity_vocab[entity_idx]].centroid, outputs2, p=2) # Spaghetti, Italians should be proud of my code BUGGED DO NOT USE

                assertion_scores.append((torch.tensor([concept_idx, entity_idx]), assertion_score.item()))
            
            sorted_scores = sorted(assertion_scores, key=lambda x: x[1])

            k_list = [1, 3, 10, 100, len(assertion_scores)]
            hit_k_values = []

            true_samples = [inputs for inputs, _ in test_concept_assertions if inputs[0] == concept_idx] # This is problematic when dealing with big datasets

            #print(f'Dealing with concept idx: {concept_idx}')
            #print(f'True samples: {true_samples}')
            #print(f'Len true_samples: {len(true_samples)}')
            #print(f'Sorted scores: {sorted_scores}')
            #print(f'Len sorted scores: {len(sorted_scores)}')
            #print('')

            for k in k_list:
                hit_k = any(torch.equal(scored_sample[0], true_sample) for true_sample in true_samples for scored_sample in sorted_scores[:k])
                hit_k_values.append(hit_k)
            
            hits.append(hit_k_values)

            top1 += int(hit_k_values[0])
            top3 += int(hit_k_values[1])
            top10 += int(hit_k_values[2])
            top100 += int(hit_k_values[3])
            top_all += int(hit_k_values[4])

            #if hit_k_values[0]:  # Check if top1 hit
            #    top1 += 1
            #if hit_k_values[1]:  # Check if top3 hit
            #    top3 += 1
            #if hit_k_values[2]:  # Check if top10 hit
            #    top10 += 1
            #if hit_k_values[3]:  # Check if top100 hit
            #    top100 += 1
            #if hit_k_values[4]:
            #    top130 += 1

    hits_at_k = [sum(hit_values) / len(hit_values) for hit_values in zip(*hits)]  # Calculate hits_at_k for each k

    return hits_at_k, [top1, top3, top10, top100, top_all]

In [310]:
hits_at_k_concept, hit_list_concept = get_hits_at_k_concept_assertions(model,
                                             testConceptDataset, testRoleDataset,
                                             entity_to_idx_vocab, idx_to_entity_vocab,
                                             idx_to_concept_vocab, role_to_idx_vocab,
                                             centroid_score = CENTROID_SCORE)

Number of concepts appearing on the test set: 6



In [311]:
print_results(hits_at_k_concept, hit_list_concept)

Top1 hits: 0
Hits@1: 0.0

Top3 hits: 0
Hits@3: 0.0

Top10 hits: 3
Hits@10: 0.5

Top100 hits: 5
Hits@100: 0.8333333333333334

Top469 hits: 6
Hits@469: 1.0



In [211]:
print_results(hits_at_k_concept, hit_list_concept)

Top1 hits: 0
Hits@1: 0.0

Top3 hits: 1
Hits@3: 0.16666666666666666

Top10 hits: 4
Hits@10: 0.6666666666666666

Top100 hits: 5
Hits@100: 0.8333333333333334

Top469 hits: 6
Hits@469: 1.0



In [277]:
print_results(hits_at_k_concept, hit_list_concept)

Top1 hits: 0
Hits@1: 0.0

Top3 hits: 2
Hits@3: 0.3333333333333333

Top10 hits: 2
Hits@10: 0.3333333333333333

Top100 hits: 5
Hits@100: 0.8333333333333334

Top469 hits: 6
Hits@469: 1.0



In [327]:
def get_hits_at_k_role_assertions(model,
                  test_concept_assertions=Dataset, test_role_assertions=Dataset,
                  entity_to_idx_vocab=dict, idx_to_entity_vocab=dict,
                  idx_to_concept_vocab=dict, idx_to_role_vocab=dict,
                  centroid_score = False
                  ):
    
    top1 = 0
    top3 = 0
    top10 = 0
    top100 = 0
    top_all = 0

    model.eval()

    hits = []
    
    relevant_head_role_queries = []
    relevant_role_tail_queries = []

    with torch.no_grad():

        for assertion in test_role_assertions:
            inputs, _ = assertion
            head_entity_idx = inputs[0]
            role_entity_idx = inputs[1]
            assertion_scores = []

            for _, tail_entity_idx in entity_to_idx_vocab.items():
                eval_sample = torch.tensor([head_entity_idx, role_entity_idx, tail_entity_idx]).unsqueeze(0)
                outputs1, outputs2 = model(eval_sample)
                if centroid_score == False:
                    assertion_score = torch.dist(outputs1, outputs2, p=2)
                else:
                    pass

            assertion_scores.append((torch.tensor([head_entity_idx, role_entity_idx, tail_entity_idx]), assertion_score.item()))

            sorted_scores = sorted(assertion_scores, key=lambda x: x[1])

            k_list = [1, 3, 10, 100, len(assertion_scores)]
            hit_k_values = []

            for k in k_list:
                hit_k = any(torch.equal(scored_sample[0], inputs) for scored_sample in sorted_scores[:k])
                hit_k_values.append(hit_k)
            
            hits.append(hit_k_values)

            top1 += int(hit_k_values[0])
            top3 += int(hit_k_values[1])
            top10 += int(hit_k_values[2])
            top100 += int(hit_k_values[3])
            top_all += int(hit_k_values[4])


    hits_at_k = [sum(hit_values) / len(hit_values) for hit_values in zip(*hits)]  # Calculate hits_at_k for each k

    return hits_at_k, [top1, top3, top10, top100, top_all]

In [328]:
hits_at_k_role, hit_list_role = get_hits_at_k_role_assertions(model,
                                             testConceptDataset, testRoleDataset,
                                             entity_to_idx_vocab, idx_to_entity_vocab,
                                             idx_to_concept_vocab, role_to_idx_vocab,
                                             centroid_score = False)

In [329]:
# This is with PHI = 0
print_results(hits_at_k_role, hit_list_role)

Top1 hits: 0
Hits@1: 0.0

Top3 hits: 0
Hits@3: 0.0

Top10 hits: 0
Hits@10: 0.0

Top100 hits: 0
Hits@100: 0.0

Top469 hits: 0
Hits@469: 0.0



# Old things

In [312]:
def get_hits_at_k_concept_assertions(model,
                  test_concept_assertions=Dataset, test_role_assertions=Dataset,
                  entity_to_idx_vocab=dict, idx_to_entity_vocab=dict,
                  idx_to_concept_vocab=dict, idx_to_role_vocab=dict,
                  centroid_score = False
                  ):
    
    top1 = 0
    top3 = 0
    top10 = 0
    top100 = 0
    top469 = 0

    model.eval()

    hits = []

    with torch.no_grad():

        # iterate over concepts appearing on test set
        # rank scores by individual name
        # incrementar os counters se tiver um hit@k
        # dividir os counters de hits pelo total dos sets

        for assertion in test_concept_assertions:
            inputs, labels = assertion
            true_idx = inputs[1] # Inputs are of the form ['Concept', 'Individual']
            assertion_scores = [] # Stores the scores for all individuals

            for _, idx in entity_to_idx_vocab.items():
                eval_sample = torch.tensor([inputs[0], idx]).unsqueeze(0)
                outputs1, outputs2 = model(eval_sample) # out1 = Concept parameter, out2 = Individual parameter
                
                if centroid_score == False:
                    assertion_score = torch.dist(outputs1, outputs2, p=2) # The scoring function is just the euclidean distance
                else:
                    assertion_score = torch.dist(labels, outputs2, p=2)
    
                assertion_scores.append((idx, assertion_score.item()))
                
            sorted_scores = sorted(assertion_scores, key=lambda x: x[1])

            k_list = [1, 3, 10, 100, 469]
            hit_k_values = []  # Store hit_k values for each k

            for k in k_list:
                hit_k = any(idx == true_idx for idx, _ in sorted_scores[:k])
                hit_k_values.append(hit_k)

            hits.append(hit_k_values)

            if hit_k_values[0]:  # Check if top1 hit
                top1 += 1
            if hit_k_values[1]:  # Check if top3 hit
                top3 += 1
            if hit_k_values[2]:  # Check if top10 hit
                top10 += 1
            if hit_k_values[3]:  # Check if top100 hit
                top100 += 1
            if hit_k_values[4]:
                top469 += 1

    hits_at_k = [sum(hit_values) / len(hit_values) for hit_values in zip(*hits)]  # Calculate hits_at_k for each k

    return hits_at_k, [top1, top3, top10, top100, top469]

In [313]:
hits_at_k_concept, hit_list_concept = get_hits_at_k_concept_assertions(model,
                                             testConceptDataset, testRoleDataset,
                                             entity_to_idx_vocab, idx_to_entity_vocab,
                                             idx_to_concept_vocab, role_to_idx_vocab,
                                             centroid_score = CENTROID_SCORE)

In [314]:
print(hits_at_k_concept, hit_list_concept)

[0.0, 0.0, 0.038461538461538464, 0.4, 1.0] [0, 0, 5, 52, 130]


In [318]:
def get_hits_at_k_role_assertions(model,
                  test_concept_assertions=Dataset, test_role_assertions=Dataset,
                  entity_to_idx_vocab=dict, idx_to_entity_vocab=dict,
                  idx_to_concept_vocab=dict, idx_to_role_vocab=dict,
                  centroid_score = False
                  ):
    
    top1 = 0
    top3 = 0
    top10 = 0
    top100 = 0
    top469 = 0

    model.eval()

    hits = []

    with torch.no_grad():
        for assertion in test_role_assertions:
            inputs, labels = assertion
            true_idx = inputs[2] # Inputs are of the form ['Head', 'Role', 'Tail']
            assertion_scores = [] # Stores the scores for all individuals

            for _, idx in entity_to_idx_vocab.items():
                eval_sample = torch.tensor([inputs[0], inputs[1], idx]).unsqueeze(0)
                outputs1, outputs2 = model(eval_sample) # out1 = Role parameter, out2 = Tail parameter
                if centroid_score == False:
                    assertion_score = torch.dist(outputs1, outputs2, p=2) # The scoring function is just the euclidean distance
                else:
                    assertion_score = torch.dist(labels, outputs2, p=2)
                assertion_scores.append((idx, assertion_score.item()))
                
            sorted_scores = sorted(assertion_scores, key=lambda x: x[1])

            k_list = [1, 3, 10, 100, 469]
            hit_k_values = []  # Store hit_k values for each k
            for k in k_list:
                hit_k = any(idx == true_idx for idx, _ in sorted_scores[:k])
                hit_k_values.append(hit_k)

            hits.append(hit_k_values)

            if hit_k_values[0]:  # Check if top1 hit
                top1 += 1
            if hit_k_values[1]:  # Check if top3 hit
                top3 += 1
            if hit_k_values[2]:  # Check if top10 hit
                top10 += 1
            if hit_k_values[3]:  # Check if top100 hit
                top100 += 1
            if hit_k_values[4]:
                top469 += 1

    hits_at_k = [sum(hit_values) / len(hit_values) for hit_values in zip(*hits)]  # Calculate hits_at_k for each k

    return hits_at_k, [top1, top3, top10, top100, top469]

In [319]:
hits_at_k_role, hit_list_role = get_hits_at_k_role_assertions(model,
                                             testConceptDataset, testRoleDataset,
                                             entity_to_idx_vocab, idx_to_entity_vocab,
                                             idx_to_concept_vocab, role_to_idx_vocab,
                                             centroid_score = True)

In [317]:
print_results(hits_at_k_role, hit_list_role)

Top1 hits: 0
Hits@1: 0.0

Top3 hits: 11
Hits@3: 0.049107142857142856

Top10 hits: 35
Hits@10: 0.15625

Top100 hits: 107
Hits@100: 0.47767857142857145

Top469 hits: 224
Hits@469: 1.0



In [320]:
print_results(hits_at_k_role, hit_list_role)

Top1 hits: 0
Hits@1: 0.0

Top3 hits: 11
Hits@3: 0.049107142857142856

Top10 hits: 35
Hits@10: 0.15625

Top100 hits: 107
Hits@100: 0.47767857142857145

Top469 hits: 224
Hits@469: 1.0



In [114]:
print_results(hits_at_k_role, hit_list_role)

Top1 hits: 2
Hits@1: 0.008928571428571428

Top3 hits: 3
Hits@3: 0.013392857142857142

Top10 hits: 11
Hits@10: 0.049107142857142856

Top100 hits: 38
Hits@100: 0.16964285714285715

Top469 hits: 224
Hits@469: 1.0



In [None]:
def corrupt_concept_assertions(test_data_concept_assertions,
                               num_corrupt = int,
                               entity_to_idx_vocab = dict):

    candidate_entities = list(entity_to_idx_vocab.keys())
    
    original_assertions = torch.tensor([sample[0] for sample, label in list(test_data_concept_assertions)]) # Gets rid of the individual

    num_samples = len(original_assertions)

    sampled_entities = torch.tensor([torch.tensor(entity_to_idx_vocab[random.choice(candidate_entities)], dtype=torch.long) for _ in range(num_samples)])

    corrupted_assertions = torch.zeros((num_samples, 2), dtype=torch.long)
    corrupted_assertions[:, 0] = original_assertions
    corrupted_assertions[:, 1] = sampled_entities

    return corrupted_assertions

In [None]:
def corrupt_role_assertions(test_data_role_assertions,
                            num_corrupt = int,
                            entity_to_idx_vocab=dict):

    candidate_entities = list(entity_to_idx_vocab.keys())

    original_head_entities = torch.tensor([sample[0] for sample, label in list(test_data_role_assertions)])
    original_roles = torch.tensor([sample[1] for sample, label in list(test_data_role_assertions)])
    original_tail_entities = torch.tensor([sample[2] for sample, label in list(test_data_role_assertions)])

    num_samples = len(original_head_entities)

    sampled_entities = torch.tensor([torch.tensor(entity_to_idx_vocab[random.choice(candidate_entities)], dtype=torch.long) for _ in range(num_samples)])

    corrupted_assertions = torch.zeros((num_samples, 3), dtype=torch.long)
    corrupted_assertions[:, 0] = original_head_entities # The original head entities
    corrupted_assertions[:, 1] = original_roles # The original roles
    corrupted_assertions[:, 2] = sampled_entities

    return corrupted_assertions