# Target Entity Type Identification Evaluation

In this exercise, you'll need to implement lenient evaluation measures for the target entity type identification task.

As a reminder, _target entity type identification_ is the task of finding the target types of a given input query, from a type taxonomy, such that these types correspond to most specific types of entities that are relevant to the query.  Target types cannot lie on the same branch in the taxonomy.

Our final measure is normalized discounted cumulative gain (NDCG), but we need to compute the gain values of answer types based on their distance from ground truth types in the type taxonomy.

In [1]:
import ipytest
import math
import operator
import pytest
from typing import Callable, List, Optional, Set

ipytest.autoconfig()

## Type taxonomy

We use the DBpedia Ontology as our type taxonomy. It is given to you in a preprocessed format in `data/dbpedia_types.tsv`, where each line corresponds to a type, and the tab-separated columns, respectively, are: type identifier, depth in the hierarchy, and parent type.

In [2]:
class TypeTaxonomy:
    
    ROOT = "owl:Thing"
    
    def __init__(self, tsv_filename: str) -> None:
        """Initializes the type taxonomy by loading it from a TSV file.
        
        Args:
            tsv_filename: Name of TSV file, with type_id, depth, and parent_id columns.
        """
        self._types = {self.ROOT: {"parent": None, "depth": 0}}
        self._max_depth = 0
        with open(tsv_filename, "r") as tsv_file:
            next(tsv_file)  # Skip header row
            for line in tsv_file:
                fields = line.rstrip().split("\t")
                type_id, depth, parent_type = fields[0], int(fields[1]), fields[2]
                self._types[type_id] = {"parent": parent_type, "depth": depth}
                self._max_depth = max(depth, self._max_depth)
                
        # Once all types have been read in, we also populate each type with a list
        # of its children for convenience (if the taxonomy is to be traversed
        # downwards not just upwards).
        for type_id in self._types:
            if type_id == self.ROOT:
                continue
            parent_type = self._types[type_id]["parent"]            
            if "children" not in self._types[parent_type]:
                self._types[parent_type]["children"] = set()
            self._types[parent_type]["children"].add(type_id)
                        
    def max_depth(self) -> int:
        """Returns the maximum depth of the type taxonomy."""
        return self._max_depth
    
    def is_root(self, type_id: str) -> bool:
        """Returns true if the type is the root of the taxonomy.
        
        Args:
            type_id: Type ID.
            
        Returns:
            True if root.
        """
        return type_id == self.ROOT
    
    def depth(self, type_id: str) -> int:
        """Returns the depth of a type in the taxonomy.
        
        Args:
            type_id: Type ID.
            
        Returns:
            The depth of the type in the hierarchy (0 for root).
        """
        return self._types.get(type_id, {}).get("depth")

    def parent(self, type_id: str) -> Optional[str]:
        """Returns the parent type of a type in the taxonomy.
        
        Args:
            type_id: Type ID.
            
        Returns:
            Parent type ID, or None if the input type is root.
        """
        return self._types.get(type_id, {}).get("parent")

    def children(self, type_id: str) -> Set[str]:
        """Returns the set of children types of a type in the taxonomy.
        
        Args:
            type_id: Type ID.
            
        Returns:
            Set of type IDs (empty set if leaf type).
        """
        return self._types.get(type_id, {}).get("children", set())
    
    def dist(self, type_id1: str, type_id2: str) -> float:
        """Computes the distance between two types in the taxonomy.
        
        Args:
            type_id1: ID of first type.
            type_id2: ID of second type.
            
        Returns:
            The distance between the two types in the type taxonomy, which is
            the number of steps between them if they lie on the same branch,
            and otherwise `math.inf`.
        """
        # Find which type has higher depth and set if to type_a; the other is type_b.        
        type_a, type_b = (type_id2, type_id1) if self.depth(type_id1) < self.depth(type_id2) \
                         else (type_id1, type_id2)
        dist = self.depth(type_a) - self.depth(type_b)
        
        # If they lie on the same branch, then when traversing type_a for dist steps
        # would make us end up with type_b; otherwise, they're not on the same branch.
        for _ in range(dist):
            type_a = self.parent(type_a)
        
        return dist if type_a == type_b else math.inf

Tests.

In [3]:
%%run_pytest[clean]

@pytest.fixture
def dbpedia_types():
    return TypeTaxonomy("data/dbpedia_types.tsv")

def test_max_depth(dbpedia_types):
    assert dbpedia_types.max_depth() == 7

@pytest.mark.parametrize("type_id,depth", [
    ("owl:Thing", 0),
    ("dbo:Agent", 1),
    ("dbo:SportFacility", 4),
    ("dbo:RaceTrack", 5)
])
def test_depth(dbpedia_types, type_id, depth):
    assert dbpedia_types.depth(type_id) == depth
    
@pytest.mark.parametrize("type_id,parent", [
    ("owl:Thing", None),
    ("dbo:Agent", "owl:Thing"),
    ("dbo:SportFacility", "dbo:ArchitecturalStructure"),
    ("dbo:RaceTrack", "dbo:SportFacility")
])
def test_depth(dbpedia_types, type_id, parent):
    assert dbpedia_types.parent(type_id) == parent

@pytest.mark.parametrize("type_id1,type_id2,distance", [
    ("dbo:Agent", "dbo:Agent", 0),  # same type
    ("dbo:Agent", "dbo:Person", 1),  # type2 is more specific
    ("dbo:Artist", "dbo:Agent", 2),  # type2 is more generic
    ("dbo:Artist", "dbo:Broadcaster", math.inf)  # different branch
])  
def test_distance(dbpedia_types, type_id1, type_id2, distance):
    assert dbpedia_types.dist(type_id1, type_id2) == distance

.........                                                                          [100%]
9 passed in 0.07s


## Computing gain values

For simplicity, refer to this global variable in the gain computations.

In [4]:
type_taxonomy = TypeTaxonomy("data/dbpedia_types.tsv")

When defined in a _linear_ fashion, the gain of a type is computed as:

$$r(y) = \max_{\hat{y} \in \hat{\mathcal{T}}_q} \big( 1 - \frac{d(y,\hat{y})}{h} \big)$$

where $\hat{\mathcal{T}}_q$ is the set of ground truth types, $\hat{y}$ is a ground truth type, $y$ is an answer type, $d(y, \hat{y})$ is the distance between types in the taxonomy, and $h$ is the maximum depth of the type taxonomy.

In [5]:
def gain_linear(gt_types: Set[str], answer_type_id: str) -> float:
    """Computes the gain of an answer type in a linear fashion.
    
    Args:
        gt_types: Set of ground truth type IDs.
        answer_type_id: Answer type ID.
    
    Returns:
        Gain value.
    """
    # Note: if the distance between two types is inf, we set the linear gain to 0.
    return max([
        1 - type_taxonomy.dist(gt_type_id, answer_type_id) / type_taxonomy.max_depth() 
        if type_taxonomy.dist(gt_type_id, answer_type_id) < math.inf else 0
        for gt_type_id in gt_types
    ])

Alternatively, the gain of an answer type can be defined using an _exponential_ decay function:

$$r(y) = \max_{\hat{y} \in \hat{\mathcal{T}}_q} \big ( b^{-d(y,\hat{y})} \big )$$

where $b$ is the base of the exponential function (here: $b=2$).

In [6]:
def gain_exponential(gt_types: Set[str], answer_type_id: str) -> float:
    """Computes the gain of an answer type using exponential decay.
    
    Args:
        gt_types: Set of ground truth type IDs.
        answer_type_id: Answer type ID.
    
    Returns:
        Gain value.
    """
    # Note: if the distance between two types is inf, we set the exponential gain to 0.
    return max([
        2**(-type_taxonomy.dist(gt_type_id, answer_type_id))
        if type_taxonomy.dist(gt_type_id, answer_type_id) < math.inf else 0
        for gt_type_id in gt_types
    ])

Tests.

In [7]:
%%run_pytest[clean]

@pytest.mark.parametrize("gt_types,answer_type_id,gain", [
    (["dbo:Agent"], "dbo:Agent", 1),  # same type
    (["dbo:Agent"], "dbo:Person", 1-1/7),  # type2 is more specific
    (["dbo:Artist"], "dbo:Agent", 1-2/7),  # type2 is more generic
    (["dbo:NationalSoccerClub"], "dbo:Organisation", 1-3/7),  # type2 is more generic
    (["dbo:Artist"], "dbo:Broadcaster", 0),  # different branch
    (["dbo:DisneyCharacter"], "dbo:MythologicalFigure", 0),  # sibling categories
])
def test_gain_linear(gt_types, answer_type_id, gain):
    assert gain_linear(gt_types, answer_type_id) == pytest.approx(gain)
    
@pytest.mark.parametrize("gt_types,answer_type_id,gain", [
    (["dbo:Agent"], "dbo:Agent", 1),  # same type
    (["dbo:Agent"], "dbo:Person", 1/2),  # type2 is more specific
    (["dbo:Artist"], "dbo:Agent", 1/4),  # type2 is more generic
    (["dbo:NationalSoccerClub"], "dbo:Organisation", 1/8),  # type2 is more generic
    (["dbo:Artist"], "dbo:Broadcaster", 0),  # different branch
    (["dbo:DisneyCharacter"], "dbo:MythologicalFigure", 0),  # sibling categories
])
def test_gain_exponential(gt_types, answer_type_id, gain):
    assert gain_exponential(gt_types, answer_type_id) == pytest.approx(gain)

............                                                                       [100%]
12 passed in 0.04s


## Putting everything together

Plug the gain values computed using either linear or exponential into the NDCG computation to get a final evaluation score.

The DCG and NDCG computation parts are given. The only part that needs completing is the construction of the ideal ranking.

In [8]:
def get_ideal_ranking(gt_types: Set[str]) -> List[str]:
    """Generates an ideal ranking corresponding to a set of ground truth types.
    
    Args:
        gt_types: Set of ground truth types.
    
    Returns:
        A ranked list of types that constitute an ideal ranking gain-wise.
    """
    gains = {}
    
    for gt_type in gt_types:
        # Ground truth type has max gain.
        gains[gt_type] = type_taxonomy.max_depth()
        
        # Traverse upwards to add parent types.
        parent_type = type_taxonomy.parent(gt_type)
        gain = type_taxonomy.max_depth() - 1
        while not type_taxonomy.is_root(parent_type):
            gains[parent_type] = max(gains.get(parent_type, 0), gain)
            gain -= 1
            parent_type = type_taxonomy.parent(parent_type)

        # Traverse downwards to add children types.
        children_types = type_taxonomy.children(gt_type)
        gain = type_taxonomy.max_depth() - 1
        while len(children_types) > 0:
            grandchildren_types = set()
            for t in children_types:
                gains[t] = max(gains.get(t, 0), gain)
                grandchildren_types.update(type_taxonomy.children(t))
            gain -= 1
            children_types = grandchildren_types
    
    # Return types ordered by gain desc.
    return [k for k, v in sorted(gains.items(), key=operator.itemgetter(1), reverse=True)]    

In [9]:
def dcg(relevances: List[float], k: int) -> float:
    """Computes DCG@k, given the corresponding relevance levels for a ranked list of types.
    
    Args:
        relevances: List with the relevance levels corresponding to a ranked list of types.
        k: Rank cut-off.
        
    Returns:
        DCG@k (float).
    """
    return relevances[0] + sum(
        [relevances[i] / math.log(i + 1, 2) 
         for i in range(1, min(k, len(relevances)))]
    )

In [10]:
def ndcg(system_ranking: List[str], gt_types: Set[str], gain_function: Callable, k:int = 10) -> float:
    """Computes NDCG@k for a given system ranking.
    
    Args:
        system_ranking: Ranked list of answer type IDs (from most to least relevant).
        gt_types: Set of ground truth types.
        gain_function: Function for computing the gain of an answer type.
        k: Rank cut-off.
    
    Returns:
        NDCG@k (float).
    """
    # Relevance (gain) levels for the ranked docs.
    relevances = [gain_function(gt_types, type_id) for type_id in system_ranking]

    # Relevance levels (gains) of the idealized ranking.
    relevances_ideal = [gain_function(gt_types, type_id) 
                        for type_id in get_ideal_ranking(gt_types)]
    
    return dcg(relevances, k) / dcg(relevances_ideal, k)        

Tests.

In [11]:
%%run_pytest[clean]

def test_ideal_ranking_single_gt():
    ideal_rankings = get_ideal_ranking({"dbo:Person"})
    # Ideal ranking starts with ground truth type.
    assert "dbo:Person" in ideal_rankings 
    assert ideal_rankings.index("dbo:Person") == 0
    # Types that are not parent or children types are not present.
    assert "dbo:Organisation" not in ideal_rankings
    # Parent types are present and ranked lower than GT type.
    assert "dbo:Agent" in ideal_rankings 
    assert ideal_rankings.index("dbo:Agent") > 0
    # Children types are present and ranked lower than GT type.    
    assert "dbo:Politician" in ideal_rankings 
    assert ideal_rankings.index("dbo:Politician") > 0
    assert "dbo:President" in ideal_rankings
    # Relative ranking of children types is correct.
    assert ideal_rankings.index("dbo:President") > ideal_rankings.index("dbo:Politician")    
    # Relative ranking of parent vs. children types is correct.
    assert ideal_rankings.index("dbo:Agent") < ideal_rankings.index("dbo:President")    

def test_ideal_ranking_multi_gt():
    # Ground truth types are subtypes of person but on different branches
    # and at different depths:
    # Skier -> WinterSportPlayer -> Athlete -> Person
    # DisneyCharacter -> FictionalCharacter -> Person
    ideal_rankings = get_ideal_ranking({"dbo:Skier", "dbo:DisneyCharacter"})
    # Ground truth type at the top of the ideal ranking in any order
    assert set(ideal_rankings[:2]) == {"dbo:Skier", "dbo:DisneyCharacter"}
    # Path tho Skier.
    assert "dbo:WinterSportPlayer" in ideal_rankings 
    assert "dbo:Athlete" in ideal_rankings 
    assert "dbo:Person" in ideal_rankings 
    assert "dbo:Agent" in ideal_rankings 
    assert ideal_rankings.index("dbo:WinterSportPlayer") < ideal_rankings.index("dbo:Athlete")
    assert ideal_rankings.index("dbo:Person") < ideal_rankings.index("dbo:Agent")
    # Path to DisneyCharacter.
    assert "dbo:FictionalCharacter" in ideal_rankings 
    assert ideal_rankings.index("dbo:FictionalCharacter") < ideal_rankings.index("dbo:Person")
    assert ideal_rankings.index("dbo:FictionalCharacter") < ideal_rankings.index("dbo:Athlete")    
    # Sibling types to ground truth types.
    assert "dbo:Ski_jumper" not in ideal_rankings
    assert "dbo:SoapCharacter" not in ideal_rankings
    # Types on other branches.
    assert "dbo:Gymnast" not in ideal_rankings
    assert "dbo:ShoppingMall" not in ideal_rankings


def test_ndcg():
    # Perfect ranking.
    assert ndcg(["dbo:Person", "dbo:Agent", "dbo:Politician"], 
                {"dbo:Person"}, gain_linear, k=3) == 1.0
    assert ndcg(["dbo:Organisation", "dbo:Agent", "dbo:Politician"], 
                {"dbo:Person"}, gain_linear, k=3) == pytest.approx(0.583, rel=1e-3)
    assert ndcg(["dbo:Athlete", "dbo:Agent", "dbo:FictionalCharacter"], 
                {"dbo:Skier", "dbo:DisneyCharacter"}, 
                gain_linear, k=3) == pytest.approx(0.719, rel=1e-3)
    assert ndcg(["dbo:Athlete", "dbo:Agent", "dbo:DisneyCharacter"], 
                {"dbo:Skier", "dbo:DisneyCharacter"}, 
                gain_linear, k=3) == pytest.approx(0.754, rel=1e-3)    

...                                                                                [100%]
3 passed in 0.02s
