# Extract DWPCs for a select set of metapaths but for all compound-disease pairs

In [1]:
import json
import threading
import concurrent.futures
import bz2
import csv
import time

import pandas
import py2neo

In [2]:
from tqdm import tqdm

## Connect to Neo4j

In [3]:
# Override the default py2neo timeout
py2neo.packages.httpstream.http.socket_timeout = 1e8

In [4]:
with open('../all-features/servers.json') as read_file:
    instances = json.load(read_file)

for instance in instances:
    if instance['name'] == 'rephetio-v2.0':
        uri = 'http://localhost:{}/db/data/'.format(instance['port'])    
        neo = py2neo.Graph(uri)
        break
neo

<Graph uri='http://localhost:7500/db/data/'>

## Read metapaths

In [5]:
with open('features/metapaths.json') as read_file:
    metapaths = json.load(read_file)

metapaths.sort(key=lambda x: x['join_complexities'][0])
len(metapaths)

6

In [6]:
pair_df = pandas.read_table('features/compound-disease-pairs.tsv.bz2')
pair_df.tail(2)

Unnamed: 0,compound_id,compound_name,disease_id,disease_name,category,status
186661,DB01624,Zuclopenthixol,DOID:1245,vulva cancer,,0
186662,DB00258,,DOID:784,,SYM,0


In [7]:
# Total number of queries
total_queries = len(metapaths) * len(pair_df)
print('{:,} total queries'.format(total_queries))

1,119,978 total queries


## Query setup

In [8]:
pairs = list(pair_df.itertuples())

def generate_parameters(max_elems=None):
    """Generate compound, disease, metapath combinations"""
    n = 0
    for metapath_dict in metapaths:
        metapath = metapath_dict['abbreviation']
        query = metapath_dict['dwpc_query']
        for pair_info in pairs:
            if max_elems is not None and n == max_elems:
                break
            yield {
                'neo': neo,
                'hetnet': 'hetio-ind',
                'compound_id': pair_info.compound_id,
                'disease_id': pair_info.disease_id,
                'metapath': metapath,
                'query': query,
                'w': 0.4,
            }
            n += 1

In [9]:
def compute_dwpc(neo, hetnet, query, metapath, compound_id, disease_id, w):
    """Execute the neo4j query and write results to file"""
    start = time.time()
    results = neo.cypher.execute(query, source=compound_id, target=disease_id, w=w)
    record = results.one
    seconds = '{0:.4g}'.format(time.time() - start)
    row = hetnet, compound_id, disease_id, metapath, record['PC'], w, '{0:.6g}'.format(record['DWPC']), seconds
    with writer_lock:
        writer.writerow(row)

## Execute queries

In [10]:
%%time

# Parameters
workers = 2
max_elems = None

# Prepare writer
path = 'features/dwpc.tsv.bz2'
write_file = bz2.open(path, 'wt')
writer = csv.writer(write_file, delimiter='\t')
writer.writerow(['hetnet', 'compound_id', 'disease_id', 'metapath', 'PC', 'w', 'DWPC', 'seconds'])

# Create ThreadPoolExecutor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=workers)
writer_lock = threading.Lock()

# Submit jobs
n_queries = 0
for params in tqdm(generate_parameters(max_elems), total = total_queries):
    while executor._work_queue.qsize() > 10000:
#         print('Submitted queries: {} ({:.4%})'.format(n_queries, n_queries / total_queries), end='\r')
        time.sleep(1)
    executor.submit(compute_dwpc, **params)
    n_queries += 1

# Shutdown and close
executor.shutdown()
write_file.close()

100%|██████████| 1119978/1119978 [49:18<00:00, 378.60it/s]


CPU times: user 16min 30s, sys: 52.5 s, total: 17min 23s
Wall time: 50min 14s


In [11]:
n_queries

1119978