# Chaganty and Liang (2016) - How Much is 131 Million Dollars? Putting Numbers in Perspective with Compositional Descriptions

Reproduction of the method for constructing a graph database and perspective in Chaganty & Liang's 2016 paper.

The method and data provided by Chaganty and Liang are used. To experiment, run all cells. In the bottom cell, you can provide a number and unit (e.g. USD) that you would like to put into perspective.

In [None]:
import pandas as pd
import networkx as nx
import itertools
import matplotlib.pyplot as plt

## Creating the graph from the facts database

In [None]:
class Vertex:
    
    """
    
    Vertex (or node) object. Each unit (or compositional unit)
    has one vertex object, which contains all the tuples with
    the same unit.
    
    Parameters
    ----------
    
    unit : str
        Type of the vertex. Must be one of the foundational units.
    
    """
    
    def __init__(self, unit):
        
        self.unit = unit
        self.tuples = []
        
    def __str__(self):
        
        return ' '.join(self.unit)
        
    def add_tuple(self, tup):
        
        self.tuples.append(tup)
               
class Tuple:
    
    """
    
    Tuple object. Each tuple represents one fact from the database.
    
    Parameters
    ----------
    
    value : float
        Value of the tuple. May be any positive number.
        
    unit : str
        Type of the tuple. Must be one of the foundational units.
        
    description : str
        The description of the fact.
        
    """
    
    def __init__(self, value, unit, description):
        
        self.value = value
        self.unit = unit
        self.description = description
        
        # fundamental values are assigned later
        self.fundamental_value = None
        self.fundamental_unit = None
        
    def __str__(self):
        
        return ' '.join(self.unit)
        
    def get_fundamentals(self, conversions):
        
        # getting the fundamental units
        self.fundamental_unit = []
        self.fundamental_value = self.value
        
        for u in range(len(self.unit)):
            
            if self.unit[u] in conversions.keys():
                fund_u = conversions[self.unit[u]]['fundamental_unit']
                
                # the first unit must be multiplied, the rest must be divided
                if u == 0:
                    self.fundamental_value *= conversions[self.unit[u]]['conversion']
                else:
                    self.fundamental_value /= conversions[self.unit[u]]['conversion']
                
            # if the unit is not in the conversions dataset, it is itself a fundamental unit
            else:
                fund_u = self.unit[u]
                
            self.fundamental_unit.append(fund_u)

In [None]:
def read_data(facts_file, conversions_file):
    
    """
    Function to read the data and convert it to dictionary format.
    
    Parameters 
    ----------
    
    facts_file : str
        Locator for file that contains the facts that build up the database.
        Must have four columns: 'description', 'is', 'value' and 'unit' -
        but header must not be present.
        
    conversions_file : str
        Locator for file that contains unit conversions.
        Must have header with columns 'unit', 'conversion' and 'fundamental_unit'.
    
    Returns
    -------
    
    facts : list
        List of dictionaries, where each list item has keys 
        value, description and unit.
         
    conversions : dict
        Dictionary to convert units to a 'root' representative
        unit (e.g. hours, days etc map to 'time'). Units as keys,
        fundamental units and conversions as values.
        
    """

    # read data from files
    facts_df = pd.read_csv(facts_file, sep='\t', names=["description", "is", "value", "unit"])
    conv_df = pd.read_csv(conversions_file, sep='\t')
    fundamentals_df = conv_df[conv_df['conversion'] == 1].drop_duplicates(subset=['fundamental_unit'])

    # save both as dictionary
    facts = facts_df.to_dict('records')
    conversions = conv_df.set_index('unit').to_dict('index')
    
    return facts, conversions

facts_file = 'data/facts.tsv'
conversions_file = 'data/unit_conversions.tsv'

facts, conversions = read_data(facts_file, conversions_file)

In [None]:
# creating an empty directional graph
graph = nx.DiGraph()

In [None]:
def create_tuples(facts, conversions):
    
    """
    Create Tuple objects based on all facts. Tuples represent
    the facts from the database in the graph.
    
    Parameters
    ----------
    
    facts : list
         List of dictionaries, where each list item has keys 
         value, description and unit.
         
    conversions : dict
        Dictionary to convert units to a 'root' representative
        unit (e.g. hours, days etc map to 'time'). Units as keys,
        fundamental units and conversions as values.
        
    Returns
    -------
    
    all_tuples : list
        List of all tuple objects.
    
    all_units : list
        A list of all fundamental units present in the facts.
        
    """
    
    # store tuples and units
    all_tuples = []
    all_units = []

    for fact in facts:

        # store units as lists
        unitlist = fact['unit'].split(' per ')

        # get rid of leading and trailing whitespace
        clean_list = []
        for u in unitlist:
            clean_list.append(u.strip())

        # create Tuple object for each fact
        tup = Tuple(fact['value'], clean_list, fact['description'])

        # get fundamental values for each tuple
        tup.get_fundamentals(conversions)

        # save tuple and unit
        all_tuples.append(tup)
        all_units.append(tup.fundamental_unit)
        
    return all_tuples, all_units

all_tuples, all_units = create_tuples(facts, conversions)

In [None]:
def create_vertices(all_units, graph=nx.DiGraph()):
    
    """
    Create initial vertices, which are collections of tuples with the
    same unit, and place them in the graph as unconnected units.
    
    Parameters
    ----------
    
    all_units : list
        A list of all fundamental units present in the facts.
        
    graph : networkx directed graph
        Object that will store all relationships between the vertices.
        
    Returns
    -------
    
    graph : networkx directed graph
        Graph now with initial vertices included.
        
    all_vertices : list
        List of all vertex objects.
    
    """
    
    # store all vertices in list
    all_vertices = []
    
    # get unique fundamental units: one vertex is created for each
    unique_units = [list(x) for x in set(tuple(x) for x in all_units)]

    for unique_u in unique_units:

        vertex = Vertex(unique_u)

        # search for all tuples with this fundamental unit
        for tup in all_tuples:

            if tup.fundamental_unit == unique_u:

                # store found tuples in vertex
                vertex.add_tuple(tup)

        all_vertices.append(vertex)

    # add all vertices as independent nodes
    graph.add_nodes_from(all_vertices)
    
    return graph, all_vertices

graph, all_vertices = create_vertices(all_units, graph=graph)

In [None]:
def find_tuples(all_tuples, unit):
    
    """
    Helper function to find all tuples that an edge should
    be annotated with.
    
    Parameters
    ----------
    
    all_tuples : list
        List of all tuples/facts in the database.
        
    unit : str
        The fundamental unit that the returned tuples
        should have.
        
    Returns
    -------
    
    tuples : list
        List which is the subset of all_tuples that have unit 
        as their fundamental unit.
    
    """
    
    tuples = []
    
    for tup in all_tuples:
        if tup.fundamental_unit == [unit]:
            tuples.append(tup)
            
    return tuples

In [None]:
def find_child_vertices(graph, vertex_list, sub_vertex_list, edge_idx, unit_idxs):
    
    """
    Connect a set of vertices to its children
    and create them if they are missing.
    
    Parameters
    ----------
    
    graph : networkx object
        Partially completed graph.
        
    vertex_list : list
        List of vertices whose children should be 
        connected/found.
        
    sub_vertex_list : list
        The list of candidate children (of length 1
        shorter than the vertices in vertex_list).
        
    edge_idx : int
        Which part of the unit the edges should be 
        annotated with.
        
    unit_idxs : list
        The indexes of the unit that the child vertex
        should have.
        
    Returns
    -------
    
    graph : networkx object
        Updated graph, with new connections added.
        
    sub_vertex_list : list
        The original list, plus the newly created (empty)
        vertices.
    
    """
    
    for vertex in vertex_list:

        # check if the adjacent vertex exists; if not, create it
        unit_needed = [vertex.unit[i] for i in unit_idxs]
        unit_found = False

        # search for adjacent vertex
        for vx in sub_vertex_list:
            if vx.unit == unit_needed:
                unit_found = True
                new_vertex = vx

        # if adjacent vertex not found: create and add to graph
        if not unit_found:
            new_vertex = Vertex(unit_needed)
            graph.add_node(new_vertex)
            sub_vertex_list.append(new_vertex)
        
        # create edge between pair vertices and annotate
        graph.add_edge(vertex, new_vertex)
        graph[vertex][new_vertex]['unit'] = vertex.unit[edge_idx]
        graph[vertex][new_vertex]['tuples'] = find_tuples(all_tuples, vertex.unit[edge_idx])
        
    return graph, sub_vertex_list

In [None]:
def create_full_graph(graph, all_vertices):
    
    """
    Create the full graph by connecting the existing
    vertices and creating the missing ones.
    
    Parameters
    ----------
    
    graph : networkx object
        Graph containing only non-connected vertices.
        
    all_vertices : list
        List of all existing vertices.
        
    Returns
    -------
    
    graph : networkx object
        Completed graph. 
        
    """
    
    # split the vertices by length 
    three_vertices = [v for v in all_vertices if len(v.unit) == 3]
    two_vertices = [v for v in all_vertices if len(v.unit) == 2]
    one_vertices = [v for v in all_vertices if len(v.unit) == 1]

    # first create connections between 3- and 2-, then 2- and 1-length vertices
    graph, two_vertices = find_child_vertices(graph, three_vertices, two_vertices, 1, [0, 2])
    graph, one_vertices = find_child_vertices(graph, two_vertices, one_vertices, 1, [0])
    
    return graph

graph = create_full_graph(graph, all_vertices)

In [None]:
edgelist = nx.to_pandas_edgelist(graph)
#edgelist.to_csv('edgelist.csv')

## Generating perspectives from the graph

In [None]:
def has_successor(node):
    
    """
    Helper function that determines whether a node has
    a successor.
    
    Parameters
    ----------
    
    node : Vertex
        Vertex object that is part of a networkx graph.
        
    Returns
    -------
    
    bool
        True if node has a successor, False if not.
        
    """
    
    if len(list(graph.successors(node))) > 0:
        return True
    
    else:
        return False

In [None]:
def perspectives_tuples(graph):
    
    """
    Generate all possible perspectives by performing
    walks over the graph.
    
    Parameters
    ----------
    
    graph : networkx object
        Complete networkx object.
        
    Returns
    -------
    
    perspectives : list
        List of lists, where each sublist is a sequence
        of tuples which together form a perspective.
    """
    
    # store all perspectives in list
    perspectives = []
    
    for node in list(graph.nodes):
    
        # node needs to be a leaf node (i.e. contain a tuple)
        if len(node.tuples) > 0:
            
            # the path starts at the leaf node itself
            paths_from_node = [node.tuples]
            
            # keep walking until the root node is reached
            while has_successor(node):
                
                # there can only be one successor for each node
                successor = list(graph.successors(node))[0]
                
                # add the nodes found at the successor to the path
                edge_set = graph[node][successor]['tuples']
                paths_from_node.append(edge_set)
                
                # take a step down
                node = successor
            
            # create combinations of all facts found along path
            perspectives_from_path = itertools.product(*paths_from_node)
            perspectives.extend(perspectives_from_path)
            
    return perspectives

perspectives_tuples = perspectives_tuples(graph)

In [None]:
class Perspective:
    
    """
    
    Object that will store a perspective.
    
    Parameters
    ----------
    
    tuples : list
        List of the Tuple objects that form part of the perspective.    
    
    """
    
    def __init__(self, tuples):
        
        self.value = 1
        self.tuples = tuples        
        self.phrase = ' x '.join([cp.description for cp in tuples])
        
        # root and multiplier are determined later
        self.root_unit = None
        self.multiplier = None

        
    def __str__(self):
        return self.phrase
    
    def get_root_unit(self):
        
        # the first item of the longest tuple must be the root unit
        maxlen = 0
        root_unit = None
        
        for tup in self.tuples:
            if len(tup.fundamental_unit) > maxlen:
                maxlen = len(tup.fundamental_unit)
                self.root_unit = tup.fundamental_unit[0]
                
    def set_value(self):
        
        # the total value of the perspective is the product of its tuples' values
        for tup in self.tuples:
            self.value = tup.fundamental_value * self.value   
            
    def set_multiplier(self, total_value):
        
        # how often the perspective should be repeated
        self.multiplier = total_value / self.value
        

In [None]:
def get_perspectives(perspectives_tuples):
    
    """
    Convert all lists of tuples to perspective objects.
    
    Parameters 
    ----------
    
    perspectives_tuples : list
        List of lists, where each sublist contains a 
        set of tuples that represent a perspective.
        
    Returns
    -------
    
    perspectives : list
        List of perspective objects.
    
    """
    
    # list to store perspectives
    perspectives = []

    for ps in perspectives_tuples:
        
        # create perspective
        perspective = Perspective(ps)
        perspective.get_root_unit()
        perspective.set_value()
        
        # store it
        perspectives.append(perspective)
        
    return perspectives

perspectives = get_perspectives(perspectives_tuples)

In [None]:
def put_in_perspective(number, unit, perspectives):
    
    """
    
    Function that takes a number and its unit and puts
    it into perspective.
    
    Parameters
    ----------
    
    number : float
        Number to be put into perspective.
        
    unit : str
        Unit of number to be put in perspective.
        
    perspectives : list
        All perspectives that are possible on the graph.
        
    Returns
    -------
    
    possible_perspectives : list
        List of perspectives that have the desired unit, 
        and with the appropriate multiplier added.
    
    """
    
    possible_perspectives = []
    
    fund_unit = conversions[unit]['fundamental_unit']
    fund_value = number * conversions[unit]['conversion']
    
    for persp in perspectives:
        
        if fund_unit == persp.root_unit:
        
            persp.set_multiplier(fund_value)
            possible_perspectives.append(persp)
            
    return possible_perspectives

p_unit = 'USD'
p_number = 1000000000

poss_persp = put_in_perspective(p_number, p_unit, perspectives)

print(f"{p_number} {p_unit} IS EQUIVALENT TO: \n")
for p in poss_persp:
    print(f"{format(p.multiplier, '.2f')} times the {p}")