In [1]:
# Generate triples parameter

#kgtk_path takes in the directory which contains the kgtk subgraph
kgtk_path = '/Users/rijulvohra/Documents/work/Novartis-ISI/kgtk_development/data/Q28885102'
output_filename = 'pharma_product_concat2.tsv.gz'
triple_filename = 'pharma_triple2.ttl'
triple_generation_log = 'pharma_log2.txt'
properties_file_path = './properties.tsv'

# Load triples to blazegraph
wikibase_ui_port = '10001'
wikibase_sparql_port = '10002'
wikibase_proxy_port = '10003'
wikibase_qs_port = '10005'
wikibase_volume = '.'
docker_name = 'blazegraphpipeline'
create_new = False
stop_docker = "No"
blazegraph_image = 'wikibase/wdqs:0.3.10'
ttl_path = ''
query_service_name = 'Novartis-ISI Query Service'

#Parameterize whether you want to run just the generate_wikidata_triples part or loading to blazegraph part
gen_triples = False
load_triples = False



In [2]:
# Parameters
kgtk_path = "/Users/rijulvohra/Documents/work/Novartis-ISI/blazegraph-load-Noartis-ISI-pipeline/blazegraph-load-pipeline/blazegraph_load_pipeline_2/data/rxnorm_converted/Qrx_qnode_rx_un.tsv.gz"
output_filename = "rxnorm_concat_full.tsv.gz"
triple_filename = "rxnorm_triple.ttl"
triple_generation_log = "rxnorm_log.txt"
properties_file_path = "./data/rxnorm/pnode_rx_un.tsv"
gen_triples = True
load_triples = True


In [3]:
import os
import re
import subprocess
import gzip
import subprocess
import socket
import sys
import shutil
import time
from IPython.display import display, Markdown, HTML
dirname = os.path.abspath('')
kgtk_path = os.path.join(dirname,kgtk_path)
wikibase_volume = os.path.join(dirname,'volume')
print(kgtk_path)
print(wikibase_volume)

/Users/rijulvohra/Documents/work/Novartis-ISI/blazegraph-load-Noartis-ISI-pipeline/blazegraph-load-pipeline/blazegraph_load_pipeline_2/data/rxnorm_converted/Qrx_qnode_rx_un.tsv.gz
/Users/rijulvohra/Documents/work/Novartis-ISI/blazegraph-load-Noartis-ISI-pipeline/blazegraph-load-pipeline/blazegraph_load_pipeline_2/volume


In [4]:
# Downloading the spacy en_core_web_sm model. It is a KGTK dependency
!python -m spacy download en_core_web_sm

[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_sm')


In [5]:
'''
Utility class to print stuff in Bold.
'''
class color:
   PURPLE = '\033[95m'
   CYAN = '\033[96m'
   DARKCYAN = '\033[36m'
   BLUE = '\033[94m'
   GREEN = '\033[92m'
   YELLOW = '\033[93m'
   RED = '\033[91m'
   BOLD = '\033[1m'
   UNDERLINE = '\033[4m'
   END = '\033[0m'

### Generate Wikidata triples

In [6]:
%%time
def find_files(path):
    '''
    Finds all the KGTK edge files in a directory
    
    parameter:
    path: The path of the directory which contains the KGTK edge files
    return:
    A list of files with entire paths of the KGTK edge files
    '''
    kgtk_files = []
    for file_name in glob.glob(kgtk_path + '/Q*.tsv.gz'):
        if re.search('.statistics.tsv.gz',file_name) or re.search('.P279star.tsv.gz',file_name):
            continue
        
        kgtk_files.append(os.path.join(kgtk_path,file_name))
    return kgtk_files

if not(kgtk_path.endswith('.tsv.gz')):
    kgtk_files = find_files(kgtk_path)
    print(json.dumps(kgtk_files,indent = 4))

else:
    kgtk_files = [kgtk_path]
    kgtk_path = os.path.dirname(kgtk_path)

CPU times: user 13 µs, sys: 7 µs, total: 20 µs
Wall time: 21.9 µs


In [7]:
%%time
def generate_concat_files(kgtk_files, kgtk_path, output_filename):
    '''
    Concatenates all the KGTK edge files present in the passed directory.
    
    parameters:
    kgtk_files: A list with the path of all the KGTK edge files
    kgtk_path: The path of the directory which contains the KGTK edge files
    output_filename: The name of the final concatenated tsv file
    '''
    concat_input = ' '.join(kgtk_files)
    output_concat = os.path.join(kgtk_path,output_filename)
    !kgtk cat -o $output_concat -i $concat_input
    
    return output_concat
    


CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 3.81 µs


In [8]:
##generate_wikidata_triples
#Run only generate triples
'''
1. This cell will run only if you just want to generate triples aligned to wikidata schema. 
It will first concatenate the KGTK edge files and then will use the KGTK generate_wikidata_triples 
command to generate triples for the concatenated file.

2. The generate_wikidata_triples takes in the properities file path as a parameter. 
The properties file should have the data_type mentioned for each of the property used in the KGTK edge file.

3. The generated triple file is then gzipped.
'''

if gen_triples:
    output_concat = generate_concat_files(kgtk_files, kgtk_path, output_filename) # Concatenate the kgtk edge files
    
    print()
    
    print(color.BOLD + 'Concatenated edge file is present at:' + color.END,end = ' ')
    print(output_concat)
    
    print()
    
    print(color.BOLD + '------------Head of the concatenated KGTK edge file-------------' + color.END)
    
    print()
    
    ! gzip -cd $output_concat | head -n 20

    gen_triple_input = os.path.join(kgtk_path,output_filename)
    triple_output_save_path = os.path.join(kgtk_path,triple_filename) # Name of the output triple file
    log_save_path = os.path.join(kgtk_path,triple_generation_log) # Name of the log file
    
    # generate the triples
    !time cat $gen_triple_input | kgtk generate_wikidata_triples -ap aliases,alias -lp label -dp description \
                                                                 -pf $properties_file_path \
                                                                 -n 100000 \
                                                                 --debug \
                                                                 -gt yes -gz yes -w yes \
                                                                 -log $log_save_path > $triple_output_save_path

    
    # gzip the triple file
    with open(triple_output_save_path, 'rb') as f_in:
        with gzip.open(triple_output_save_path + '.gz', 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
            os.remove(f_in.name)
    
    print()
            
    print(color.BOLD + 'The triple file is generated and saved at:' + color.END,end = ' ')
    print(triple_output_save_path + '.gz')
    
    triple_path = triple_output_save_path + '.gz'
    
    print()
    
    print(color.BOLD + '------------Head of the triple file-------------' + color.END)
    
    print()
    
    ! gzip -cd $triple_path | head -n 20
    
    


[1mConcatenated edge file is present at:[0m /Users/rijulvohra/Documents/work/Novartis-ISI/blazegraph-load-Noartis-ISI-pipeline/blazegraph-load-pipeline/blazegraph_load_pipeline_2/data/rxnorm_converted/rxnorm_concat_full.tsv.gz

[1m------------Head of the concatenated KGTK edge file-------------[0m

id	node1	label	node2
QRX690258309-BN-196472	QRX690258309	BN	196472
QRX690258310-rxcui-317541	QRX690258310	rxcui	317541
QRX690258310-label-Oral Tablet	QRX690258310	label	Oral Tablet
QRX690258310-alias-	QRX690258310	alias	
QRX690258310-description-dose form	QRX690258310	description	dose form
QRX690258310-language-ENG	QRX690258310	language	ENG
QRX690258310-suppress-N	QRX690258310	suppress	N
QRX690258310-umlscui-C0993159	QRX690258310	umlscui	C0993159
QRX690258310-IN-29046	QRX690258310	IN	29046
QRX690258311-rxcui-206771	QRX690258311	rxcui	206771
QRX690258311-label-lisinopril 40 MG Oral Tablet [Zestril]	QRX690258311	label	lisinopril 40 MG Oral Tablet [Zestril]
QRX690258311-alias-Zestril 40 MG

### Load Triples

In [9]:
# Exception Functions
class PortInUseError(BaseException):
    """
    Exception class for generating error if the passed ports are already in use.
    
    """
    def __init__(self,value):
        self.value = value


class DockerNameInUse(BaseException):
    """
    Exception class for generating error if the passed Docker Name is already in use.
    
    """
    def __init__(self,value):
        self.value = value


In [10]:
class BlazegraphLoad():
    '''
    The class is used to create a new or use an existing wikibase-docker instance to load 
    a given gzipped ttl file to a blazegraph triple store.
    '''
    def __init__(self,ttl_path,wikibase_ui_port,wikibase_sparql,wikibase_proxy,wikibase_qs,wikibase_volume,
                 create_new,docker_name,stop_docker,blazegraph_image,query_service_name):
        '''
        Initializing the class variables and Setting the environment variables 
        that will be used by the docker-compose.pipeline.yml file.
        
        '''
        self.ttl_path = os.path.join(dirname,ttl_path)
        self.wikibase_ui_port = str(wikibase_ui_port)
        self.wikibase_sparql = str(wikibase_sparql)
        self.wikibase_proxy = str(wikibase_proxy)
        self.wikibase_qs = str(wikibase_qs)
        self.wikibase_volume = wikibase_volume
        self.create_new = create_new
        self.docker_name = docker_name
        self.stop_docker = stop_docker
        self.blazegraph_image = blazegraph_image
        self.query_service_name = query_service_name
        os.environ['WIKIBASE_UI'] = self.wikibase_ui_port
        os.environ['WIKIBASE_SPARQL'] = self.wikibase_sparql
        os.environ['WIKIBASE_PROXY'] = self.wikibase_proxy
        os.environ['WIKIBASE_QS'] = self.wikibase_qs
        os.environ['WIKIBASE_VOLUME'] = self.wikibase_volume
        os.environ['BLAZEGRAPH_IMAGE'] = self.blazegraph_image
        os.environ['QUERY_SERVICE_NAME'] = self.query_service_name

    @staticmethod
    def check_availability():
        '''
        1. The function checks whether the passed ports are available or not. If anyone of the passed port
        is not available, then it will generate an error.
        2. The functions also checks if the passed docker name is available or not. If the docker name is 
        already in use it will generate an error.
        '''

        wikibase_ui = os.getenv('WIKIBASE_UI')
        wikibase_sparql = os.getenv('WIKIBASE_SPARQL')
        wikibase_proxy = os.getenv('WIKIBASE_PROXY')
        wikibase_qs = os.getenv('WIKIBASE_QS')
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            wikibase_ui_usage = s.connect_ex(('localhost', int(wikibase_ui))) == 0
            wikibase_sparql_usage = s.connect_ex(('localhost', int(wikibase_sparql))) == 0
            wikibase_proxy_usage = s.connect_ex(('localhost', int(wikibase_proxy))) == 0
            wikibase_qs_usage = s.connect_ex(('localhost', int(wikibase_qs))) == 0
        docker_name_availability = subprocess.Popen(['docker', 'ps', '--filter', 'name={}'.format(docker_name)],
                                                    stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        try:
            if create_new:
                if wikibase_ui_usage:
                    raise PortInUseError('Wikibase UI Port is in use')
                if wikibase_sparql_usage:
                    raise PortInUseError('Wikibase Sparql Port is in use')
                if wikibase_proxy_usage:
                    raise PortInUseError('Wikibase Proxy Port is in use')
                if wikibase_qs_usage:
                    raise PortInUseError('Wikibase QS Port is in use')
            if len(docker_name_availability.communicate()[0]) > 126:
                raise DockerNameInUse('Try changing docker container name')
            print(docker_name_availability)
        except PortInUseError as Argument:
            raise ('Error Message:', Argument)
            sys.exit(1)

        except DockerNameInUse as Argument:
            raise ('Error Message:', Argument)
            sys.exit(1)
        return True

    @staticmethod
    def load_data():
        '''
        The function is used to load a gzipped ttl file to the Blazegraph triple store.
        '''
        l_data = subprocess.Popen(
            ['docker', 'exec', '{}_wdqs_1'.format(docker_name), '/wdqs/loadData.sh', '-n', 'wdq', '-d',
             '/instancestore/wikibase/mungeOut'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        print(l_data.communicate()[0])

    def driver_fn(self):
        '''
        This is the main driver function which first checks if the user wants to create a new docker instance.
        
        '''
        if self.create_new:
            all_parameters = self.check_availability() # checks the availability of the ports and the docker name
            if all_parameters:
                # creates a new docker container
                create_docker = subprocess.Popen(
                    ['docker-compose', '-f', 'docker-compose.pipeline.yml', '-p', docker_name, 'up', '-d'],
                    stdin=subprocess.PIPE, stdout=subprocess.PIPE)
                create_docker.communicate()

        if self.stop_docker == 'Yes' or self.stop_docker == 'yes':
            docker_stop = subprocess.Popen(
                ['docker-compose', '-f', 'docker-compose.pipeline.yml', '-p', docker_name, 'down', '-v'],
                stdin=subprocess.PIPE, stdout=subprocess.PIPE)
            docker_stop.communicate()
            sys.exit(1)

        # checks if the directory which needs to be mounted on the docker container is present. 
        # If not recursively creates a directory. Also moves the triple file that needs to be loaded to this directory
        if os.path.isdir(os.getenv('WIKIBASE_VOLUME') + '/mungeOut'):
            shutil.copy(ttl_path, os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut/wikidump-000000001.ttl.gz'))
        else:
            os.makedirs(os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut'))
            shutil.copy(ttl_path, os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut/wikidump-000000001.ttl.gz'))

        time.sleep(40) # Wait time to let the docker containers start before the loading function is called

        self.load_data()
        os.remove(os.path.join(os.getenv('WIKIBASE_VOLUME'), 'mungeOut/wikidump-000000001.ttl.gz'))

In [11]:
# Run only load triples
'''
1. This cell is used to load a given triple file to blazegraph triple store.

2. It will run only if the parameter only_load_triples is set to True
'''
if (gen_triples and load_triples) or load_triples:
    if gen_triples:
        ttl_path = triple_path
    print(color.BOLD + '------------Log output of loading the triple file to Blazegraph-------------' + color.END)
    print()
    loader_obj = BlazegraphLoad(ttl_path,wikibase_ui_port,wikibase_sparql_port,wikibase_proxy_port,wikibase_qs_port,
                                    wikibase_volume,create_new,docker_name,stop_docker,blazegraph_image,query_service_name)
    loader_obj.driver_fn()

[1m------------Log output of loading the triple file to Blazegraph-------------[0m

b'Processing wikidump-000000001.ttl.gz\n<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"><html><head><meta http-equiv="Content-Type" content="text&#47;html;charset=UTF-8"><title>blazegraph&trade; by SYSTAP</title\n></head\n><body<p>totalElapsed=4822ms, elapsed=4822ms, connFlush=0ms, batchResolve=0, whereClause=0ms, deleteClause=0ms, insertClause=0ms</p\n><hr><p>COMMIT: totalElapsed=35369ms, commitTime=1603761859022, mutationCount=798980</p\n></html\n>File wikidump-000000002.ttl.gz not found, terminating\n'


In [12]:
# Generate a link to SPARQL ENDPOINT only if triples are loaded to Blazegraph

if load_triples:
    s = """<a href="http://localhost:{}">Sparql Endpoint</a>""".format(wikibase_sparql_port)
    display(HTML(s))
    