# Benchmarks
In this notebook we will try to identify the potential bottlenecks of the synchronization system, and show a series of benchmarks with different RDF datasets.

## Setup

As always, we will begin by adding the hercules_sync library to our path, and setting up the logging system.

In [1]:
import logging
import os
import sys

# set up module paths for imports
module_path = os.path.abspath(os.path.join('..'))
hercules_sync_path = os.path.abspath(os.path.join('..', 'hercules_sync'))
sys.path.append(module_path)
sys.path.append(hercules_sync_path)

# start logging system and set logging level
logger = logging.getLogger()
logger.setLevel(logging.WARNING)
logging.info("Starting logger")

%load_ext snakeviz

## Datasets used

In order to execute our benchmarks we are going to use two different datasets:
* Real dataset: A DBpedia dataset with information about different people is going to be used to represent the performance of our system with a real dataset.
* Synthetic dataset: We are also going to use the Berlin SPARQL Benchmark tool to generate datasets of different sizes and observe the performance of our system as the size of the data to be synchronized increases.

We will begin by defining a set of functions that can be used to download the dbpedia dataset and obtain a subset of it:

In [2]:
import bz2
import urllib.request

def read_zipped_dataset(url, decompressor=bz2):
    response = urllib.request.urlopen(url)
    content = bz2.decompress(response.read())
    return content

def get_first_lines(string, num_lines):
    return b'\n'.join(string.split(b'\n')[:num_lines])


Now, we will use the functions defined above to load the person data from DBpedia. Three different datasets will be stored:
* dbpedia_dataset will contain the complete dataset
* dbpedia_dataset_preview will contain just the first 100 lines of the dataset. This subset will be used first to identify potential bottlenecks of the system.
* dbpedia_dataset_final will contain the first 4000 lines of the dataset. This subset will be used later on for our benchmarks will real data.

In [3]:
DBPEDIA_PERSONDATA_URL = 'http://downloads.dbpedia.org/3.4/en/persondata_en.nt.bz2'
NUM_TRIPLES_FINAL = 4000
NUM_TRIPLES_PREVIEW = 100

dbpedia_dataset = read_zipped_dataset(DBPEDIA_PERSONDATA_URL, bz2)
dbpedia_dataset_preview = get_first_lines(dbpedia_dataset, NUM_TRIPLES_PREVIEW)
dbpedia_dataset_final = get_first_lines(dbpedia_dataset, NUM_TRIPLES_FINAL)

Now, we will proceed to create the synthetic datasets. We will first create a function that calls the BSBM tool to generate a synthetic dataset with the given number of products:

## Analysing bottlenecks of the application

In this section we are going to execute an initial dump of the DBpedia preview dataset to identify which are the parts of the synchronization that take the most amount of time to execute. First of all, we are going to define the algorithm to use, and to reset the URIs factory to an initial state:

In [4]:
from hercules_sync.external.uri_factory_mock import URIFactory
from hercules_sync.git import GitFile
from hercules_sync.synchronization import GraphDiffSyncAlgorithm, OntologySynchronizer

algorithm = GraphDiffSyncAlgorithm()
synchronizer = OntologySynchronizer(algorithm)
factory = URIFactory()
factory.reset_factory()

Now, we will define a function to completely reset the state of the Wikibase instance. Since we are going to make several dumps of data to the Wikibase in this notebook, it is important to reset its state before adding more data. In this case we are connecting to the machine where the Wikibase is running (in docker containers), and calling a script that resets the docker volumes:

In [9]:
import paramiko
import time

from hercules_sync.triplestore import WikibaseAdapter
from secret import SSH_USER, SSH_PASS, USERNAME, PASSWORD

wikibase_host = '156.35.94.149'
ssh_port = '22'
mediawiki_api_url = f'http://{wikibase_host}:8181/w/api.php'
sparql_endpoint_url = f'http://{wikibase_host}:8282/proxy/wdqs/bigdata/namespace/wdq/sparql'

def reset_wb_state(factory):
    print("Resetting Wikibase state...")
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(wikibase_host, ssh_port, SSH_USER, SSH_PASS)
    stdin, stdout, stderr = ssh.exec_command('cd ~/wikibase-docker && sh clean_restart.sh')
    exit_status = stdout.channel.recv_exit_status()
    ssh.close()
    factory.reset_factory()
    time.sleep(15) # wait some time for docker containers to go up again
    print("Wikibase state has been reset")


Now we will define the function to be benchmarked. This function executes the synchronization of the given data to Wikibase:

In [6]:
def execute_synchronization(source_content, target_content, synchronizer, adapter):
    gitfile = GitFile(None, source_content, target_content)
    ops = synchronizer.synchronize(gitfile)
    for op in ops:
        res = op.execute(adapter)
        if not res.successful:
            print(f"Error synchronizing triple: {res.message}")

Finally, we will run the function and obtain some visualizations about its performance using the snakeviz tool:

In [8]:
reset_wb_state(factory)
adapter = WikibaseAdapter(mediawiki_api_url, sparql_endpoint_url, USERNAME, PASSWORD)
%snakeviz execute_synchronization("", dbpedia_dataset_preview, synchronizer, adapter)

http://156.35.94.149:8181/w/api.php
Successfully logged in as WikibaseAdmin
 
*** Profile stats marshalled to file '/var/folders/p8/9qm2d_ps5rsfbfggf2yrrphm0000gn/T/tmptw5fuhx6'. 
Embedding SnakeViz in this document...


As we can see above, more than half of the time regarding synchronization is spent in the write method of wdi_core. In the following sections we will try to propose some solutions to alleviate this issue.

## Batch vs Basic operations

As have seen in the previous section, the writing of triples to Wikibase is the main bottleneck of the system. In order to alleviate this problem, the 'optimize_ops' function is provided to convert a list of BasicOperations into BatchOperations.

With this optimization the system will try to perform less writting operations, using instead the 'update' methods provided by WikidataIntegrator. In this section we are going to compare the performance using the basic operations against the 'optimized' version. First of all, we are going to define the functions needed to execute both types of synchronization:

In [None]:
def _synchronize(source_content, target_content, synchronizer, adapter, ops_callback):
    gitfile = GitFile(None, source_content, target_content)
    ops = ops_callback(gitfile)
    for op in ops:
        res = op.execute(adapter)
        if not res.successful:
            print(f"Error synchronizing triple: {res.message}")

def execute_basic_synchronization(source_content, target_content, synchronizer, adapter):
    ops_callback = lambda f: synchronizer.synchronize(f)
    return _synchronize(source_content, target_content, synchronizer, adapter, ops_callback)
    
def execute_batch_synchronization(source_content, target_content, synchronizer, adapter):
    ops_callback = lambda f: optimize_ops(synchronizer.synchronize(f))
    return _synchronize(source_content, target_content, synchronizer, adapter, ops_callback)


The first auxiliary function, '\_synchronize', is very similar to the synchronization function defined previously to profile the preview dataset. However, in this case we are receiving an additional callback to obtain the list of operations from the file.

After that we define two more functions. The first one corresponds to the basic approach used so far, where the operations are obtained directly from our OntologySynchronizer instance. The second function will additionally call the _optimize\_ops_ function on the operations to optimize them.