# Assignment 3, Indexing

In this notebook you will index DBpedia (see the sub-collections listed under `https://github.com/uis-dat640-fall2019/admin/tree/master/assignments/assignment-3#data`). 

Make sure you specify the index settings, analyzer, and fields appropriately for to support the models to be implemented in subsequent notebooks.

Note: you'll need to build a positional index. Use a single shard to make sure you're getting the right term statistics.

Be sure to use both markdown cells with section headings and explanations, as well as writing readable code, to make it clear what your intention is each step of the way through the code. 

In [79]:
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import os
import bz2
from rdflib.plugins.parsers.ntriples import NTriplesParser
from rdflib.plugins.parsers.ntriples import ParseError
from rdflib.term import URIRef
import re
import operator
import pickle

In [80]:
import logging
logging.disable(logging.WARNING);

## Define data file list to index and prefix mapping

Define a list of data filenames and direction of the triple (reverse or not). Due to resource intensive, only 3 files are indexed

In [81]:
files = [
#     {'name': "anchor_text_en.ttl.bz2"},
#     {'name': "article_categories_en.ttl.bz2"},
#     {'name': "disambiguations_en.ttl.bz2", 'reverse': True}, 
#     {'name': "infobox_properties_en.ttl.bz2"},
#     {'name': "instance_types_transitive_en.ttl.bz2"},
    {'name': "labels_en.ttl.bz2"},
    {'name': "long_abstracts_en.ttl.bz2"},
#     {'name': "mappingbased_literals_en.ttl.bz2"},
#     {'name': "mappingbased_objects_en.ttl.bz2"},
    {'name': "page_links_en.ttl.bz2"},
#     {'name': "persondata_en.ttl.bz2"},
#     {'name': "short_abstracts_en.ttl.bz2"},
#     {'name': "transitive_redirects_en.ttl.bz2", 'reverse': True}
]

Define a namespace prefixes, referenced from the book https://link.springer.com/content/pdf/10.1007%2F978-3-319-93935-3.pdf

In [82]:
prefixes = {
    'http://dbpedia.org/resource/': "dbpedia",
    'http://dbpedia.org/property/': "dbp",
    'http://dbpedia.org/ontology/': "dbo",
    'http://purl.org/dc/elements/1.1/': "dc",
    'http://xmlns.com/foaf/0.1/': "foaf",
    'http://www.georss.org/georss/': "georss",
    'http://www.w3.org/2002/07/owl#': "owl",
    'http://www.w3.org/1999/02/22-rdf-syntax-ns#': "rdf",
    'http://www.w3.org/2000/01/rdf-schema#': "rdfs",
    'http://purl.org/dc/terms/': "dcterms"
}

## Parse the Knowledge base files and load data

Class to hold triple values:

In [83]:
class SPO():
    def __init__(self):
        self.s = None
        self.p = None
        self.o = None
        
    def triple(self, s, p, o):
        self.s = s
        self.p = p
        self.o = o

Util functions:

In [84]:
# Function to convert an url into prefixed entity
def url_to_prefixed(url):
    # find the last # or /
    k = url.rfind('#')
    key = ''
    if k >= 0:
        key = url[:k+1]
    else:
        k = url.rfind('/')
        if k >= 0:
            key = url[:k+1]
    prefixed = url
    if key in prefixes:
        prefixed = url.replace(key, prefixes[key] + ":")
    
    if prefixed[0] != '<':
        prefixed = '<' + prefixed
    if prefixed[-1] != '>':
        prefixed += '>'
    
    return prefixed

# Function to parse a file and return a set of subjects
def get_set_subjects(file):
    filepath = os.path.join(DATA_PATH, DBPEDIA_PATH, file['name'])
    reverse = file.get('reverse', False)
    spo = SPO()
    parser = NTriplesParser(spo)
    count = 0
    i = 0
    subjects = set()
    with bz2.open(filepath, "r") as f:
        for line in f:
            count += 1
            if count == 1000000:
                i += 1
                count = 0
                print("File {}: {} million lines processed".format(file['name'], i))
            try:
                # parse the triple
                parser.parsestring(line.decode("utf-8"))
            except ParseError:
                continue
                
            if spo.s is None or spo.o is None:
                continue
                
            subject = url_to_prefixed(spo.s)
            object = spo.o
            # if object is a URI
            if type(object) is URIRef:
                object = url_to_prefixed(object)
            
            # reverse direction
            if reverse:
                temp = subject
                subject = object
                object = temp
                
            # only keep subject that is dbpedia entity
            if not subject.startswith('<dbpedia:'):
                continue
                
            subjects.add(subject)
    
    return subjects

Define data path:

In [85]:
DATA_PATH = "data"
DBPEDIA_PATH = "dbpedia"

ENTITIES = set()
PREDICATES_COUNT = {}

Parse the file labels_en.ttl.bz2 and short_abstracts_en.ttl.bz2 to get a set of entity that have both <rdfs:label> and <rdfs:comment>. We then will ignore entities from other files that not in this set. Using pickle to dump the set of entities for later use without parsing the files again:

In [20]:
ENTITIES = get_set_subjects({'name': 'labels_en.ttl.bz2'})
ENTITIES = ENTITIES.intersection(get_set_subjects({'name': 'short_abstracts_en.ttl.bz2'}))

f = open(b'data/entities.pkl', 'wb')
pickle.dump(ENTITIES, f)
f.close()

File labels_en.ttl.bz2: 1 million lines processed
File labels_en.ttl.bz2: 2 million lines processed
File labels_en.ttl.bz2: 3 million lines processed
File labels_en.ttl.bz2: 4 million lines processed
File labels_en.ttl.bz2: 5 million lines processed
File labels_en.ttl.bz2: 6 million lines processed
File labels_en.ttl.bz2: 7 million lines processed
File labels_en.ttl.bz2: 8 million lines processed
File labels_en.ttl.bz2: 9 million lines processed
File labels_en.ttl.bz2: 10 million lines processed
File labels_en.ttl.bz2: 11 million lines processed
File short_abstracts_en.ttl.bz2: 1 million lines processed
File short_abstracts_en.ttl.bz2: 2 million lines processed
File short_abstracts_en.ttl.bz2: 3 million lines processed
File short_abstracts_en.ttl.bz2: 4 million lines processed


In [86]:
# Reload entities if needed
ENTITIES = pickle.load(open('data/entities.bk.pkl', 'rb'))
len(ENTITIES)

4630608

## Define index name and settings for term-based index

In [103]:
INDEX_NAME = "dbpedia_index2"  

INDEX_SETTINGS = {
    'settings' : {
        'index' : {
            "number_of_shards" : 1,
            "number_of_replicas" : 0,
            "blocks": {
                "read_only_allow_delete": "false"
            }
        },
        'analysis': {
            'analyzer': {
                'my_english_analyzer': {
                    'type': "custom",
                    'tokenizer': "standard",
                    'stopwords': "_english_",
                    'filter': [
                        "lowercase",
                        "english_stop",
                        "filter_english_minimal"
                    ]                
                }
            },
            'filter' : {
                'filter_english_minimal' : {
                    'type': "stemmer",
                    'name': "minimal_english"
                },
                'english_stop': {
                    'type': "stop",
                    'stopwords': "_english_"
                }
            },
        }
    },
    'mappings': {
        'properties': {
            'catch-all': {
                'type': "text",
                'term_vector': "with_positions",
                'analyzer': "my_english_analyzer"
            },
            'abstract': {
                'type': "text",
                'term_vector': "with_positions",
                'analyzer': "my_english_analyzer"
            },
            'names': {
                'type': "text",
                'term_vector': "with_positions",
                'analyzer': "my_english_analyzer"
            },
            'categories': {
                'type': "text",
                'term_vector': "with_positions",
                'analyzer': "my_english_analyzer"
            },
            'attributes': {
                'type': "text",
                'term_vector': "with_positions",
                'analyzer': "my_english_analyzer"
            },
            'similar_entity_names': {
                'type': "text",
                'term_vector': "with_positions",
                'analyzer': "my_english_analyzer"
            },
            'related_entity_names': {
                'type': "text",
                'term_vector': "with_positions",
                'analyzer': "my_english_analyzer"
            }
        }
    }
}

## Create term-based index

In [88]:
es = Elasticsearch(timeout=30, max_retries=10, retry_on_timeout=True)

In [89]:
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)
    
es.indices.create(index=INDEX_NAME, body=INDEX_SETTINGS)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'dbpedia_index2'}

In [90]:
# Predicates constraints for the fields
names_predicates = ["<foaf:name>", "<dbp:name>", "<foaf:givenName>", "<foaf:surname>", "<dbp:officialName>", 
                    "<dbp:fullname>", "<dbp:nativeName>", "<dbp:birthName>", "<dbo:birthName>", "<dbp:nickname>",
                    "<dbp:showName>", "<dbp:shipName>", "<dbp:clubname>", "<dbp:unitName>", "<dbp:otherName>",
                    "<dbo:formerName>", "<dbp:birthname>", "<dbp:alternativeNames>", "<dbp:otherNames>", "<dbp:names>",
                    "<rdfs:label>"]
categories_predicates = ["<dcterms:subject>"]
similar_entity_names_predicates = ["!<dbo:wikiPageRedirects>", "!<dbo:wikiPageDisambiguates>", 
                                   "<dbo:wikiPageWikiLinkText>"]

Functions that used to resolve the URIs:

In [91]:
# function to process uri to text: removing prefix, convert camelCase into "camel case"
def URI_to_text(uri):
    text = str(uri)
    
    # remove brackets
    text = text.replace('<', '').replace('>', '')
    if text.startswith('dbpedia:Category') or text.startswith('dbpedia:File'): # resolve Category URIs
        k = text.rfind(':')
        text = text[k + 1:]
    elif text.startswith('http'):
        k = text.rfind('/')
        text = text[k + 1:]
    else:
        k = text.find(':')
        text = text[k + 1:]
    
    # break camelCase
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1 \2', text)
    text = re.sub('([a-z0-9])([A-Z])', r'\1 \2', s1)
    
    return text.strip()
    

# Function to do the URI resolution, given a predicate and a list of objects
def URI_resolution(predicate, object_list):
    resolved = []
    for obj in object_list:
        obj = str(obj)
        if obj.startswith('<') and obj.endswith('>'): # URI object
            obj = URI_to_text(obj)
        # if p matches <dbp:.*> both p and o are stored (i.e. "p o" is indexed).
        if predicate.startswith('<dbp:'):
            obj = "{} {}".format(URI_to_text(predicate), obj)
        
        # replace specific characters with space
        chars = "'.,!?()\"$#;`~@%^&*_+=[]{}|\\-"
        for ch in chars:
            if ch in obj:
                obj = obj.replace(ch, ' ')
        obj = re.sub(' +', ' ', obj)
        resolved.append(obj)
    return resolved

Parsing the files that defined above and building the index. Using actions bulk from Elastisearch helper to improve the performance. Also counting the predicate occurrence to get the top-predicate for entity-based indexing later:

In [92]:
actions = []
# hold a set of indexed entities. For the entities already in the index, doing the update action using _op_type=update
indexed_entities = set()
catchalls = {}
for file in files:
    filepath = os.path.join(DATA_PATH, DBPEDIA_PATH, file['name'])
    reverse = file.get('reverse', False)
    spo = SPO()
    parser = NTriplesParser(spo)
    count, j = 0, 0
    
    with bz2.open(filepath, "r") as f:
        for line in f:
            count += 1
            if count == 1000000:
                j += 1
                count = 0
                print("File {}: {} million lines processed".format(file['name'], j))
            try:
                # parse the triple
                parser.parsestring(line.decode("utf-8"))
            except ParseError:
                continue
            
            if spo.s is None or spo.o is None:
                continue

            subject = url_to_prefixed(spo.s)
                
            predicate = url_to_prefixed(spo.p)
            if reverse:
                predicate = "!" + predicate
            
            object = spo.o
            # if object is a URI
            if type(object) is URIRef:
                object = url_to_prefixed(object)
            
            # reverse direction
            if reverse:
                temp = subject
                subject = object
                object = temp
                
            # only keep subject that is dbpedia entity
            if not subject.startswith('<dbpedia:'):
                continue
            
            # only keep entities that has both <rdfs:label> and <rdfs:comment>
            if subject not in ENTITIES:
                continue
                    
            # Count predicates then later we can find the top predicates to consider as fields
            if predicate not in PREDICATES_COUNT:
                PREDICATES_COUNT[predicate] = 1
            else:
                PREDICATES_COUNT[predicate] += 1
                
            source = {}

            uri_resolved = URI_resolution(predicate, [object])
            if predicate in names_predicates: # Field Names
                source['names'] = uri_resolved

            elif predicate in categories_predicates: # Field Categories
                source['categories'] = uri_resolved

            elif predicate in similar_entity_names_predicates: # Field Similar entity names
                source['similar_entity_names'] = uri_resolved

            else:
                if object.startswith('<') and object.endswith('>'): # object is a URI => Field Related entity names
                    source['related_entity_names'] = uri_resolved
                else:
                    # Field Attributes
                    source['attributes'] = uri_resolved
                    
            # Field catch-all
            if subject in catchalls:
                catchalls[subject] += uri_resolved
            else:
                catchalls[subject] = uri_resolved
                
            # Field abstract
            if predicate == '<dbo:abstract>':
                source['abstract'] = object
            
            if subject in indexed_entities: # update the entity index
                actions.append({
                    '_index': INDEX_NAME,
                    '_id': subject,
                    'doc': source,
                    '_op_type': 'update',
                })
            else:                           # add entity into index
                actions.append({
                    '_index': INDEX_NAME,
                    '_id': subject,
                    '_source': source
                })
                indexed_entities.add(subject)

            # Add entities into the index in a bulk of 5000 to improve the performance
            if len(actions) > 5000:
                helpers.bulk(es, actions)
                actions = []
                
# process the last bulk
helpers.bulk(es, actions)
print('Indexing completed')

File labels_en.ttl.bz2: 1 million lines processed
File labels_en.ttl.bz2: 2 million lines processed
File labels_en.ttl.bz2: 3 million lines processed
File labels_en.ttl.bz2: 4 million lines processed
File labels_en.ttl.bz2: 5 million lines processed
File labels_en.ttl.bz2: 6 million lines processed
File labels_en.ttl.bz2: 7 million lines processed
File labels_en.ttl.bz2: 8 million lines processed
File labels_en.ttl.bz2: 9 million lines processed
File labels_en.ttl.bz2: 10 million lines processed
File labels_en.ttl.bz2: 11 million lines processed
File long_abstracts_en.ttl.bz2: 1 million lines processed
File long_abstracts_en.ttl.bz2: 2 million lines processed
File long_abstracts_en.ttl.bz2: 3 million lines processed
File long_abstracts_en.ttl.bz2: 4 million lines processed
File page_links_en.ttl.bz2: 1 million lines processed
File page_links_en.ttl.bz2: 2 million lines processed
File page_links_en.ttl.bz2: 3 million lines processed
File page_links_en.ttl.bz2: 4 million lines processed


Index catch-all field:

In [104]:
actions = []
for subject in catchalls:
    actions.append({
        '_index': INDEX_NAME,
        '_id': subject,
        'doc': {'catch-all': catchalls[subject]},
        '_op_type': 'update',
    })
    
    # Add entities into the index in a bulk of 5000 to improve the performance
    if len(actions) > 5000:
        helpers.bulk(es, actions)
        actions = []
        
# process the last bulk
helpers.bulk(es, actions)
print('Indexing completed')

Indexing completed


## Find the top-1000 most frequent predicates to make fields for entity-based index

In [93]:
TOP_PREDICATES = sorted(PREDICATES_COUNT.items(), key=operator.itemgetter(1), reverse=True)

In [94]:
TOP_PREDICATES = TOP_PREDICATES[:1000]
TOP_PREDICATES

[('<dbo:wikiPageWikiLink>', 158426950),
 ('<dbo:abstract>', 4630609),
 ('<rdfs:label>', 4630608)]

In [95]:
f = open(b'data/top_predicates.pkl', 'wb')
pickle.dump(TOP_PREDICATES, f)
f.close()

In [None]:
TOP_PREDICATES = pickle.load(open('data/top_predicates.bk.pkl', 'rb'))
TOP_PREDICATES = [field[0] for field in TOP_PREDICATES]

Update index settings for top-predicates fields:

In [42]:
INDEX_URI_SETTINGS = {
    'settings' : {
        'index' : {
            "number_of_shards" : 1,
            "number_of_replicas" : 0
        }
    },
    'mappings': {
        'properties': {}
    }
}

In [20]:
for predicate in TOP_PREDICATES:
    INDEX_URI_SETTINGS['mappings']['properties'][predicate] = {
        'type': "text",
        'term_vector': "yes",
        'analyzer': "keyword"
    }

In [21]:
INDEX_URI_SETTINGS

{'settings': {'index': {'number_of_shards': 1, 'number_of_replicas': 0}},
 'mappings': {'properties': {'<dbo:wikiPageWikiLink>': {'type': 'text',
    'term_vector': 'yes',
    'analyzer': 'keyword'},
   '<dbo:abstract>': {'type': 'text',
    'term_vector': 'yes',
    'analyzer': 'keyword'},
   '<rdfs:label>': {'type': 'text',
    'term_vector': 'yes',
    'analyzer': 'keyword'}}}}

## Build the entity-based index

In [43]:
INDEX_URI_NAME = 'dbpedia_uri_index'
if es.indices.exists(INDEX_URI_NAME):
    es.indices.delete(index=INDEX_URI_NAME)
    
es.indices.create(index=INDEX_URI_NAME, body=INDEX_URI_SETTINGS)

{'acknowledged': True,
 'shards_acknowledged': True,
 'index': 'dbpedia_uri_index'}

Parsing the files that defined above and building the entity-based index with the top predicates as fields. Using actions bulk from Elastisearch helper to improve the performance:

In [23]:
actions = []
# hold a set of indexed entities. For the entities already in the index, doing the update action using _op_type=update
indexed_entities = set()
for file in files:
    filepath = os.path.join(DATA_PATH, DBPEDIA_PATH, file['name'])
    reverse = file.get('reverse', False)
    spo = SPO()
    parser = NTriplesParser(spo)
    count, j = 0, 0
    with bz2.open(filepath, "r") as f:
        for line in f:
            count += 1
            if count == 1000000:
                j += 1
                count = 0
                print("File {}: {} million lines processed".format(file['name'], j))
            try:
                # parse the triple
                parser.parsestring(line.decode("utf-8"))
            except ParseError:
                continue
            
            if spo.s is None or spo.o is None:
                continue

            subject = url_to_prefixed(spo.s)
                
            predicate = url_to_prefixed(spo.p)
            if reverse:
                predicate = "!" + predicate
            
            object = spo.o
            # if object is a URI
            if type(object) is URIRef:
                object = url_to_prefixed(object)
            
            # reverse direction
            if reverse:
                temp = subject
                subject = object
                object = temp
                
            # only keep subject that is dbpedia entity
            if not subject.startswith('<dbpedia:'):
                continue
            
            # only keep entities that has both <rdfs:label> and <rdfs:comment>
            if subject not in ENTITIES:
                continue
            
            # Add top-1000 predicates fields into index
            if predicate in TOP_PREDICATES:
                if object.startswith('<') and object.endswith('>'):
                    if subject in indexed_entities:
                        actions.append({
                            '_index': INDEX_URI_NAME,
                            '_id': subject,
                            'doc': {predicate: [object]},
                            '_op_type': 'update',
                        })
                    else:
                        actions.append({
                            '_index': INDEX_URI_NAME,
                            '_id': subject,
                            '_source': {predicate: [object]}
                        })
                        indexed_entities.add(subject)

            # Add entities into the index in a bulk of 5000 to improve the performance
            if len(actions) > 5000:
                helpers.bulk(es, actions)
                actions = []
                
# process the last bulk
helpers.bulk(es, actions)
print('Indexing completed')

File labels_en.ttl.bz2: 1 million lines processed
File labels_en.ttl.bz2: 2 million lines processed
File labels_en.ttl.bz2: 3 million lines processed
File labels_en.ttl.bz2: 4 million lines processed
File labels_en.ttl.bz2: 5 million lines processed
File labels_en.ttl.bz2: 6 million lines processed
File labels_en.ttl.bz2: 7 million lines processed
File labels_en.ttl.bz2: 8 million lines processed
File labels_en.ttl.bz2: 9 million lines processed
File labels_en.ttl.bz2: 10 million lines processed
File labels_en.ttl.bz2: 11 million lines processed
File long_abstracts_en.ttl.bz2: 1 million lines processed
File long_abstracts_en.ttl.bz2: 2 million lines processed
File long_abstracts_en.ttl.bz2: 3 million lines processed
File long_abstracts_en.ttl.bz2: 4 million lines processed
File page_links_en.ttl.bz2: 1 million lines processed
File page_links_en.ttl.bz2: 2 million lines processed
File page_links_en.ttl.bz2: 3 million lines processed
File page_links_en.ttl.bz2: 4 million lines processed


Test the indexing:

In [24]:
tv = es.termvectors(index=INDEX_NAME, id='<dbpedia:Rome>', fields="catch-all")
tv

{'_index': 'dbpedia_index',
 '_type': '_doc',
 '_id': '<dbpedia:Rome>',
 '_version': 844,
 'found': True,
 'took': 0,
 'term_vectors': {'catch-all': {'field_statistics': {'sum_doc_freq': 77805365,
    'doc_count': 6564468,
    'sum_ttf': 189950900},
   'terms': {'heritage': {'term_freq': 2,
     'tokens': [{'position': 1}, {'position': 106}]},
    'italy': {'term_freq': 2, 'tokens': [{'position': 4}, {'position': 109}]},
    'site': {'term_freq': 2, 'tokens': [{'position': 2}, {'position': 107}]},
    'world': {'term_freq': 2,
     'tokens': [{'position': 0}, {'position': 105}]}}}}}

In [25]:
for p in TOP_PREDICATES:
    print(p)
    tv = es.termvectors(index=INDEX_NAME, id='<dbpedia:Outline_of_cuisines>', fields=p)
    print(tv)

<dbo:wikiPageWikiLink>
{'_index': 'dbpedia_index', '_type': '_doc', '_id': '<dbpedia:Outline_of_cuisines>', '_version': 83, 'found': True, 'took': 0, 'term_vectors': {}}
<dbo:abstract>
{'_index': 'dbpedia_index', '_type': '_doc', '_id': '<dbpedia:Outline_of_cuisines>', '_version': 83, 'found': True, 'took': 0, 'term_vectors': {}}
<rdfs:label>
{'_index': 'dbpedia_index', '_type': '_doc', '_id': '<dbpedia:Outline_of_cuisines>', '_version': 83, 'found': True, 'took': 0, 'term_vectors': {}}
