# HetioNet Updated Notebook

## Overview
------

[HetioNet_update.ipnb](HetioNet_update.ipynb)  is a jupyter notebook that implements the code to update the G-BP relations from the original [HetioNet network](https://github.com/hetio/hetionet).
This can be very useful for testing, training and demos.
It provides an interactive dashboard to visualise the CPU and Disk usage whilst running the workflow.

Simply run this jupyter notebook in order to construct the knowledge graph form the outputs of the processing files mentioned above. 

This jupyter notebook produces a `json.gzip` version of the **HetioNet Updated Network** .

In [1]:
# Import custom functions
from tools.utils import *

In [2]:
# Import required libraries
import os
import sys
import bz2 as bz2
import json
import dask
import requests
import warnings

import logging
# import requests
import argparse
from pprint import pprint

import dask.bag as db
import dask.dataframe as dd
from dask.distributed import Client

from datetime import datetime

In [3]:
# Define program global Variables
_current_wd = os.getcwd()
_out_path = _current_wd + '/out'
_log_out_path = _out_path + '/running_output'
_program_output = _out_path + '/network_outputs'
_download_output = _out_path + '/download_outputs'
_jsonl_path = _program_output + '/jsonl'

_run_date = datetime.now().strftime("%Y%m%d")
_run_time = datetime.now().strftime("%H.%M")
warnings.filterwarnings("ignore")

In [4]:
# Initiate logers
logger, oh, eh = logger_outputs(_log_out_path, _run_date, _run_time)
logger.info('Environment set, program starts running. Current working directory is {}'.format(_current_wd))


2022-06-27 08:48:53,230 - <ipython-input-4-0895e5bee33b> - INFO: Environment set, program starts running. Current working directory is /home/llopez/git/hetnet_project


In [5]:
# Download HetioNetJSON
download_url = 'https://github.com/hetio/hetionet/raw/master/hetnet/json/hetionet-v1.0.json.bz2'
logger.info('Start download helionet compressed json from its origin URL: {} using HTTP'.format(download_url))
os.makedirs(_download_output, exist_ok=True)
response = requests.get(download_url, stream=True)
with open(_download_output + '/' + 'hetionet-v1.0.json.bz2', 'wb') as f:
    for data in response:
        f.write(data)
        
# Sanity check
while 'hetionet-v1.0.json.bz2' not in os.listdir(_download_output):
    logger.info('Waiting for download to complete and streaming writing.')

2022-06-24 11:16:43,598 - <ipython-input-5-36723c832226> - INFO: Start download helionet compressed json from its origin URL: https://github.com/hetio/hetionet/raw/master/hetnet/json/hetionet-v1.0.json.bz2 using HTTP
2022-06-24 11:16:43,639 - connectionpool - DEBUG: Starting new HTTPS connection (1): github.com:443
2022-06-24 11:16:43,975 - connectionpool - DEBUG: https://github.com:443 "GET /hetio/hetionet/raw/master/hetnet/json/hetionet-v1.0.json.bz2 HTTP/1.1" 302 0
2022-06-24 11:16:43,979 - connectionpool - DEBUG: Starting new HTTPS connection (1): media.githubusercontent.com:443
2022-06-24 11:16:44,953 - connectionpool - DEBUG: https://media.githubusercontent.com:443 "GET /media/hetio/hetionet/master/hetnet/json/hetionet-v1.0.json.bz2 HTTP/1.1" 200 16112094


In [None]:
# Read json.bz2 file in streaming mode and convert it to jsonl
logger.info('DOWNLOAD COMPLETE: Read json.bz2 downloaded file in streaming mode and convert it to jsonl')
with bz2.open(_download_output + '/hetionet-v1.0.json.bz2', 'rb') as f:
    json_data = json.load(f)
    keys = [data for data in json_data]
    for key in keys:
        if not os.path.exists(_jsonl_path):
            logging.debug('Creating new folder at {}'.format(_jsonl_path))
            os.makedirs(_jsonl_path)
        logging.debug('Generating jsonl: output_{}.jsonl.'.format(key))
        with open(_jsonl_path + '/output_{}.jsonl'.format(key), 'w') as outfile:
            to_write = json_data[key]
            for element in to_write:
                outfile.write(json.dumps(element) + "\n")

# Sanity Check
logger.info('Conversion complete. Performing sanity check before starting the Extraction Pipeline')
try:
    len(os.listdir(_jsonl_path)) == 5
except:
    raise BlockingIOError('Not all jsonl files needed for downstream Dask pipeline found. Path {} contains: {}. '
                          'Try to rerun the program'.format(_jsonl_path, os.listdir(_jsonl_path).join(',')))
logger.info('All jsonl files needed for the downstream Dask pipeline have been generated at {}:{}.'.format(
    _jsonl_path, str(os.listdir(_jsonl_path))))

2022-06-23 12:29:49,988 - 1560022088 - INFO: DOWNLOAD COMPLETE: Read json.bz2 downloaded file in streaming mode and convert it to jsonl
2022-06-23 12:29:49,988 - 1560022088 - INFO: DOWNLOAD COMPLETE: Read json.bz2 downloaded file in streaming mode and convert it to jsonl
2022-06-23 12:29:49,988 - 1560022088 - INFO: DOWNLOAD COMPLETE: Read json.bz2 downloaded file in streaming mode and convert it to jsonl


In [7]:
# Start the Dask Client
n_workers = 4
threads_per_worker = 1
memory_limit = '8GB'
logger.info(
    'Initiating Dask Client with the next parameters: {} workers, {} threads x worker:, memory_limit of {}.'.format(
        n_workers, threads_per_worker, memory_limit))
client = initiate_dask_client(n_workers, threads_per_worker, memory_limit)




2022-06-24 11:23:48,935 - <ipython-input-7-2719fd567484> - INFO: "Genes" to "Biological Process" Edges (G-BP) Extraction Pipeline begins.
2022-06-24 11:23:48,947 - <ipython-input-7-2719fd567484> - INFO: Initiating Dask Client with the next parameters: 4 workers, 1 threads x worker:, memory_limit of 8GB.
2022-06-24 11:23:48,948 - selector_events - DEBUG: Using selector: EpollSelector
2022-06-24 11:23:48,950 - selector_events - DEBUG: Using selector: EpollSelector
2022-06-24 11:23:49,061 - <ipython-input-7-2719fd567484> - INFO: Dask client status is running. INFO: <Client: 'tcp://127.0.0.1:42923' processes=4 threads=4, memory=32.00 GB>


In [None]:
# Edge extraction starts
logger.info('"Genes" to "Biological Process" Edges (G-BP) Extraction Pipeline begins.')

# Sanity check
try:
    client.status == 'running'
except:
    raise EnvironmentError('Dask client could not initiate.')
logger.info('Dask client status is {}. INFO: {}'.format(client.status, client))

In [8]:
# Convert jsonl files into a dask bag (ONLY NODE EDGES?, Remember there are other two)
N, E = convert_jsonl_to_bags(logger, _jsonl_path, print_example=True)

# Number of Nodes of each type in the network
logger.info('Computing the number of nodes of each type in the network')
n_nodes = dict(N.map(lambda record: record['kind']).frequencies(sort=True).compute())
logger.info('Number of nodes in the network: \n----- NODE TYPES')
pprint(n_nodes)
print('-----')


2022-06-24 11:23:52,616 - utils - INFO: Nodes and Edges bags created from /home/llopez/git/hetnet_project/out/network_outputs/jsonl.
2022-06-24 11:23:52,617 - utils - INFO: Example of Node Record:
-------
({'data': {'license': 'CC BY 4.0',
           'source': 'Gene Ontology',
           'url': 'http://purl.obolibrary.org/obo/GO_0031753'},
  'identifier': 'GO:0031753',
  'kind': 'Molecular Function',
  'name': 'endothelial differentiation G-protein coupled receptor binding'},)
-------
2022-06-24 11:23:52,651 - utils - INFO: Example of Edge Record:
-------
({'data': {'source': 'Bgee', 'unbiased': True},
  'direction': 'both',
  'kind': 'upregulates',
  'source_id': ['Anatomy', 'UBERON:0000178'],
  'target_id': ['Gene', 9489]},)
-------
2022-06-24 11:23:52,671 - <ipython-input-8-320747605a47> - INFO: Computing the number of nodes of each type in the network
2022-06-24 11:23:52,921 - <ipython-input-8-320747605a47> - INFO: Number of nodes in the network: 
----- NODE TYPES
{'Anatomy': 402,


In [9]:
# Extract only Gene and Biological Process Nodes
logger.info('Extracting nodes of kind:"Gene" and kind:"Biological Process" from the network')
selected_nodes = select_gene_BP_nodes(N)
n_selected_nodes = dict(selected_nodes.map(lambda record: record['kind']).frequencies(sort=True).compute())
logger.info('Nodes extracted, new records contain: \n----- SELECTED NETWORK NODE TYPES')
pprint(n_selected_nodes)
print('-----')

2022-06-24 11:24:56,092 - <ipython-input-9-4b7e99af849a> - INFO: Extracting nodes of kind:"Gene" and kind:"Biological Process" from the network
2022-06-24 11:24:56,509 - <ipython-input-9-4b7e99af849a> - INFO: Nodes extracted, new records contain: 
----- SELECTED NETWORK NODE TYPES
{'Biological Process': 11381, 'Gene': 20945}
-----


In [10]:
# Filter to select Gene to Biological Process edges
logger.info('Extracting G-BP Edges from the network')
selected_edges = select_gene_BP_edges(E)
n_selected_edges = dict(selected_edges.map(lambda record: record['kind']).frequencies(sort=True).compute())
double = selected_edges.map(lambda record: record['target_id']).take(10), selected_edges.map(
    lambda record: record['source_id']).take(10)
logger.info('G-BP Edges extracted successfully! New records contain: \n----- SELECTED EDGES')
print('Type of edge and frequency of each type:')
pprint(n_selected_edges)
print('---\nSample of 20 Edges:')
for i in range(0, len(double[0])):
    print(double[0][i], '------', list(n_selected_edges.keys()), '------', double[1][i])
print('-----')

2022-06-24 11:25:18,704 - <ipython-input-10-28149d5220fb> - INFO: Extracting G-BP Edges from the network
2022-06-24 11:25:37,850 - <ipython-input-10-28149d5220fb> - INFO: G-BP Edges extracted successfully! New records contain: 
----- SELECTED EDGES
Type of edge and frequency of each type:
{'participates': 559504}
---
Sample of 20 Edges:
['Biological Process', 'GO:0071357'] ------ ['participates'] ------ ['Gene', 9021]
['Biological Process', 'GO:0098780'] ------ ['participates'] ------ ['Gene', 51676]
['Biological Process', 'GO:0055088'] ------ ['participates'] ------ ['Gene', 19]
['Biological Process', 'GO:0010243'] ------ ['participates'] ------ ['Gene', 3176]
['Biological Process', 'GO:0006898'] ------ ['participates'] ------ ['Gene', 3039]
['Biological Process', 'GO:0051346'] ------ ['participates'] ------ ['Gene', 5962]
['Biological Process', 'GO:0043207'] ------ ['participates'] ------ ['Gene', 841]
['Biological Process', 'GO:0006354'] ------ ['participates'] ------ ['Gene', 6924]

In [11]:
# Convert to df
logger.info('Converting the records of the extracted G-BP Edges into a Dask DataFrame '
            'and save it as a jsonl output file.')
if not os.path.exists(_program_output):
    logging.debug('Creating new folder at {}'.format(_program_output))
    os.makedirs(_program_output)
# define the absolute path
df_jsonl_name = 'G-BP_edges_formated_{}.jsonl.gzip'.format(_run_date)

# Save to jsonl.gzip format
output_absolute_path = _program_output + '/' + df_jsonl_name

# execute the function
convert_to_dd(logger, selected_edges, print_head=True, save_jsonl=True, jsonl_path=output_absolute_path)
# Sanity Check
try:
    df_jsonl_name in os.listdir(_program_output)
except:
    FileNotFoundError('{} not found in {}. Something must have gone wrong during the export')
logger.info('Export of {} complete at {}!'.format(df_jsonl_name, _program_output))

2022-06-24 11:27:59,978 - <ipython-input-11-e3f27e9092a7> - INFO: Converting the records of the extracted G-BP Edges into a Dask DataFrame and save it as a jsonl output file.
2022-06-24 11:28:00,001 - utils - INFO: Selected G-BP Edges Dask DataFrane successfully created.
2022-06-24 11:28:00,002 - utils - INFO: Showing first rows of the DataFrame:
----- G-BP Edges DataFrame
Dask DataFrame Structure:
              source_type source_id target_type target_id    kind direction    data
npartitions=1                                                                      
                   object    object      object    object  object    object  object
                      ...       ...         ...       ...     ...       ...     ...
Dask Name: describe, 81 tasks
  source_type source_id         target_type   target_id
0        Gene      9021  Biological Process  GO:0071357
1        Gene     51676  Biological Process  GO:0098780
2        Gene        19  Biological Process  GO:0055088
3       