In [1]:
import sys
import time
import json
from pprint import pprint

import pandas as pd

from neo4j import GraphDatabase
from neo4j.exceptions import ClientError
from dask import delayed, compute
import multiprocessing.popen_spawn_posix
from dask.distributed import Client

import helpers.graph_services as gs
import helpers.mongo_services as ms
import helpers.helper as helper

In [3]:
def get_algos(list_of_algorithms):
    writeProps={}
    for class_of_algorithm in list_of_algorithms:
        writeProps[class_of_algorithm]=[]
        for algorithm in list_of_algorithms[class_of_algorithm]:
            writeProps[class_of_algorithm].append(list_of_algorithms[class_of_algorithm][algorithm]['writeProperty'])
    return writeProps

    writeProps=get_algos(helper.get_list_of_algorithms())
    writeProps= [prop for class_of_algorithm in writeProps for prop in writeProps[class_of_algorithm] ]
    writeProps += ['language', 'mature', 'partner', 'views', 'days']
    writeProps = ["n."+prop+" AS "+prop+"," for prop in writeProps]
    return writeProps

['n.pageRank AS pageRank,', 'n.betweenness AS betweenness,', 'n.articleRank AS articleRank,', 'n.closeness AS closeness,', 'n.closeness_harmonic AS closeness_harmonic,', 'n.degree AS degree,', 'n.eigenvector AS eigenvector,', 'n.louvain AS louvain,', 'n.labelPropagation AS labelPropagation,', 'n.wcc AS wcc,', 'n.triangleCount AS triangleCount,', 'n.localClusteringCoefficient AS localClusteringCoefficient,', 'n.k1coloring AS k1coloring,', 'n.modularityOptimization AS modularityOptimization,', 'n.scc AS scc,', 'n.fastRP AS fastRP,', 'n.node2vec AS node2vec,', 'n.language AS language,', 'n.mature AS mature,', 'n.partner AS partner,', 'n.views AS views,', 'n.days AS days,']


In [4]:
def get_number_of_nodes():
    neo4j_connection=gs.graph_driver(uri_scheme='bolt', host='localhost', port='8687', username='neo4j', password='vaibhav123')
    neo4j_data=neo4j_connection.test_connection()
    return dict(neo4j_data[0])['nodes']

In [5]:
def read_batch_from_neo4j(driver, writeProps, start_index=0, batch_size=1, ):
    query="""
    MATCH (n)
    RETURN {props} 
    SKIP {start_index}
    LIMIT {batch_size}
    """.format(start_index=start_index, batch_size=batch_size, props=" ".join(writeProps)[:-1])
    res=driver.run_single_query(query)
    res=driver.format_raw_res(res)
#     return [dict(rs) for rs in res]
    
    writeProps=get_algos(helper.get_list_of_algorithms())
    DEFAULT='non-graph'
    formatted_results=[]
    for rs in res:
        res_dict={DEFAULT:{}}
        for propName in writeProps.keys():
            res_dict[propName]={}
            
        rs=dict(rs)
        for key, value in rs.items():
            for propName in writeProps.keys():
                if key in writeProps[propName]:
                    res_dict[propName][key]=value
                    break
            else:
                res_dict[DEFAULT][key]=value
        formatted_results.append(res_dict)
    return formatted_results

def get_mongo_driver():
    conf = {
        'MONGODB_HOST': 'localhost',
        'MONGODB_PORT': '27017',
        'LOG_FILE': '../mongo/logs/db.log'
    }
    db="TWITCH"
    service="FEATURES"
    return ms.get_MongoDB_driver(conf, db, service)

def write_batch_to_mongo(driver, data):
    return driver.collection.insert_many(data)
    
def run_single_batch(writeProps, start_index, step):
    neo4j_connection=gs.graph_driver(uri_scheme='bolt', host='localhost', port='8687', username='neo4j', password='vaibhav123')
    neo4j_data=read_batch_from_neo4j(neo4j_connection, writeProps, start_index, step)
    
    mongo_connection=get_mongo_driver()
    mongo_res=None
    if mongo_connection:
        mongo_res=write_batch_to_mongo(mongo_connection, neo4j_data)
    
    return mongo_res

In [6]:
# run_single_batch(writeProps, 0, 2)

In [7]:
client = Client(n_workers=6, threads_per_worker=6)

In [8]:
STEP=6000
NUMBER_OF_NODES=get_number_of_nodes()
print(NUMBER_OF_NODES)

res = []
for index in range(0, NUMBER_OF_NODES, STEP):
    lazy_result = delayed(run_single_batch)(writeProps, index, STEP)
    res.append(lazy_result)

34120


In [9]:
pprint(res)

[Delayed('run_single_batch-ed9e4f99-d059-4c52-b492-2cb9f9ec64aa'),
 Delayed('run_single_batch-1960a17c-1ac5-46c8-a07a-05ac3e09b827'),
 Delayed('run_single_batch-72491df4-ec3c-468b-8e5c-b5acd0494b55'),
 Delayed('run_single_batch-c0ea1421-1820-488a-97b8-6c3ad64d0273'),
 Delayed('run_single_batch-e47c9e59-1786-4b21-a449-fe76f8065aeb'),
 Delayed('run_single_batch-b9c8b3ea-fee7-4b33-bf5c-fb56b295b25d')]


In [10]:
%%time
actual_res = compute(res)

CPU times: user 170 ms, sys: 18.1 ms, total: 189 ms
Wall time: 1.75 s
