# Run fullchain tests

See: finnet-pipeline/docker-tests/fullchain/run_tests.py

To add packages, append to `dags/requirements_py3.txt` and run `!pip3 install -r /usr/local/dags/requirements_py3.txt`

In [1]:
# !pip3 install -r /usr/local/dags/requirements_py3.txt

## Init Spark

In [2]:
import findspark
findspark.init("/usr/local/spark")

In [3]:
from pyspark import SparkContext

### Stop current SC, test assumes no existing SC
sc = SparkContext.getOrCreate()
sc.stop()

## Imports and Env

In [4]:
import os
os.environ["GRAPH_DB"] = """bolt://neo4j:test@neo4j:7687"""
os.environ["NEO4J_SSH_PORT"] = "22"
os.environ["NEO4J_SSH_USERNAME"] = "root"

In [5]:
os.environ['PIPELINE_DATA_PATH'] = "/datasets/finnet"
os.environ['PIPELINE_DATA_FORMAT'] = "parquet"

In [6]:
import sys
sys.path.insert(0, "/usr/local/dags")

from run_tests import *

import pandas as pd

## Run tests

In [7]:
# Set the list of tasks to test
dolist = [
    'build_lists', 'resolve_entities',
    'neo4j_purger', 'neo4j_writer',
    'graph_tools'
]

# Get neo4j ssh username and port
neo4j_ssh_username = os.environ.get('NEO4J_SSH_USERNAME', 'neo4j')
neo4j_ssh_port = int(os.environ.get('NEO4J_SSH_PORT', 9000))

# Setup the spark configuration
config = dict()
config['SparkConfiguration'] = (SparkConf()
                                .setMaster('local[*]')
                                .setAppName("test create data")
                                .set("spark.executor.memory", "1024m"))

# Get the graph specs
datalist = os.listdir(LOCAL_DATA_PATH)
jsonlist = [k for k in datalist if re.match(r'.*\.json$', k)]

In [8]:
# Only 1 json
gspec = jsonlist[0]

### Load Graph Spec

In [9]:
# Load the graph spec
with open(os.path.join(LOCAL_DATA_PATH, gspec), 'r') as f:
    graph_spec = GraphSpec.from_dict(json.load(f))
    spec = graph_spec.to_dict()

tables_path = os.path.join(DATA_PATH, graph_spec.name, 'tables')
# n_path = os.path.join(DATA_PATH, graph_spec.name, 'node_list')
# e_path = os.path.join(DATA_PATH, graph_spec.name, 'edge_list')
# n_path_res = os.path.join(DATA_PATH, graph_spec.name, 'node_list_resolved')
# e_path_res = os.path.join(DATA_PATH, graph_spec.name, 'edge_list_resolved')

logging.info("Processing " + gspec)

# Use graph specification's neo4j connection
neo_config = {
    'uri': spec['graph_uri'],
    'max_retries': config.get('neo4j.max_retries', 5),
    'max_batchsize': config.get('neo4j.max_batchsize', 10000)
}

In [10]:
# spec

# """
# Note:
# test_data_chocolate_node_list: index_column: hidden: True is ignored as intended
# """

In [11]:
spec

{'node_lists': [{'index_column': {'name': 'id',
    'safe_name': '`id`',
    'variable_definition': 'String',
    'resolution_alias': 'chocolate'},
   'name': 'chocolate nodes',
   'safe_name': '`chocolate nodes`',
   'labels': ['chocolate'],
   'metadata_columns': [],
   'table_name': 'test_data_chocolate_node_list',
   'safe_table_name': 'test_data_chocolate_node_list'},
  {'index_column': {'name': 'id',
    'safe_name': '`id`',
    'variable_definition': 'String',
    'resolution_alias': 'sweets'},
   'name': 'sweets nodes',
   'safe_name': '`sweets nodes`',
   'labels': ['sweets'],
   'metadata_columns': [{'name': 'prop',
     'safe_name': '`prop`',
     'hidden': False,
     'variable_definition': 'String',
     'friendly_name': 'sweetness number',
     'use_as_label': False}],
   'table_name': 'test_data_sweets_node_list',
   'safe_table_name': 'test_data_sweets_node_list'},
  {'index_column': {'name': 'id',
    'safe_name': '`id`',
    'variable_definition': 'String',
    'resol

In [12]:
graph_spec.edge_lists[0].edge_category

'6a614fcb-fe36-470c-98f1-07c6b9d836fe'

In [13]:
from fncore_py3.tasks.graph_to_neo4j import get_combined_edges

In [14]:
spark_config=SparkConfFactory() \
          .set_master('local[*]') \
          .set_app_name('write neo4j nodes') \
          .set("spark.driver.maxResultSize",
               "1g") \
          .set('spark.executor.memory',
               '1g')

In [15]:
get_combined_edges(
    graph_specification=graph_spec,
    spark_config=spark_config,
    input_edge_path=tables_path,
    input_group_edge_path=tables_path,
    output_source_col=':START_ID',
    output_target_col=':END_ID',
    output_label_col=':TYPE'
)

<generator object get_combined_edges at 0x7f5fc4efc8e0>

### Entity resolution:

How should we disambiguate `S1234567G:Person` and `S1234567G:SoleProprietor`. Need to generate canonical ID? Should be done before hive ingestion then... not part as graph building...? Let them be separate nodes but with 0 distance

### Purge existing Graph

In [16]:
# Purging the graph
if 'neo4j_purger' in dolist:
    logging.info("Purging Neo4j...")
    neo4j_manager.purge(graph_spec,
                        username=neo4j_ssh_username,
                        port=neo4j_ssh_port)
    logging.info("Checking purging neo4j...")
    with get_neo4j_context(neo_config['uri']) as neo_context:
        assert test_neo4j_purger(neo_context)

In [17]:
# Check nodes in graph
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run("MATCH (n) RETURN count(n) as count")
    print(cursor.data())

[{'count': 0}]


### Write nodes and edges to graph

Question for node groups, what if same table, I can define common mother and common father, what to do with the overlap in edges......., criteria OR????....

In [18]:
# Graph writer
if 'neo4j_writer' in dolist:
    debug_write = True
    logging.info("Writing to Neo4j...")
        
    graph_to_neo4j.graph_to_neo4j(graph_specification=graph_spec,
                                  spark_config=SparkConfFactory()
                                  .set_master('local[*]')
                                  .set_app_name('write neo4j nodes')
                                  .set("spark.driver.maxResultSize",
                                       "1g")
                                  .set('spark.executor.memory',
                                       '1g'),
                                  input_node_path=tables_path,
                                  input_edge_path=tables_path,
                                  username=neo4j_ssh_username,
                                  port=neo4j_ssh_port,
                                  debug_write=debug_write,
                                  verbose=True
                                  )
    
    if debug_write:
        print("Debug mode node and edge count test...")
        
        node_cnt = 0; edge_cnt=0
        debug_dir = "debug"
        for file in os.listdir(debug_dir):
            df = pd.read_csv(os.path.join(debug_dir, file))
            cnt = len(df)

            if file.startswith("node"):
                node_cnt += cnt
            elif file.startswith("edge"):
                edge_cnt += cnt
                
        with get_neo4j_context(neo_config['uri']) as neo_context:
            nc = neo_context.run("MATCH (n) RETURN count(n) as count").data()[0]["count"]
            assert nc == node_cnt
            
            ec = neo_context.run("MATCH ()-[r]->() RETURN count(r) as count").data()[0]["count"]
            assert ec == edge_cnt

Wrote temp nodes .csv to /tmp/tmpk48b548h/tmpoxrfkj7y.csv
Wrote temp edges .csv to /tmp/tmpk48b548h/tmp0_e7rp68.csv
Wrote temp edges .csv to /tmp/tmpk48b548h/tmp2jaxy9hj.csv
Wrote temp edges .csv to /tmp/tmpk48b548h/tmpp7qdj5vs.csv
Wrote temp edges .csv to /tmp/tmpk48b548h/tmpztex62ia.csv
Debug mode node and edge count test...


## Checks

In [19]:
### Connect with py2neo
from py2neo import Graph
graph = Graph("bolt://neo4j:test@neo4j:7687", user="neo4j", password="test")

In [20]:
# Check nodes in graph
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run("MATCH (n) RETURN count(n) as count")
    print(cursor.data())

[{'count': 45}]


In [21]:
# Check node 1 in graph
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run(
        """MATCH (n {_canonical_id: "1"}) RETURN n"""
    )
    
    node = cursor.data()[0]['n']
    print(node.labels)
    assert(
        node.labels 
        == {'chocolate', 'toffee', 'sweets', 'is_target', '8', '_searchable', 'toffee_groups'}
    )
    
    print(node.items())
    assert(
        set(node.items()) 
        == {('_canonical_id', '1'), ('edge_metadata', 'foo'), ('sweetness number', '1')}
    )

frozenset({'_searchable', 'chocolate', 'is_target', 'toffee_groups', 'toffee', '8', 'sweets'})
dict_items([('_canonical_id', '1'), ('edge_metadata', 'foo'), ('sweetness number', '1')])


In [22]:
# Check edges in graph
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run("MATCH ()-[r]->() RETURN count(r) as count")
    print(cursor.data())

[{'count': 131}]


In [23]:
# Check edges in graph
import pandas as pd

with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run(
        """
        MATCH (s)-[r]-(t)
        RETURN s._canonical_id as source, t._canonical_id as target
        """)
    
    edge_table = pd.DataFrame(cursor.data())
    print(
        len(set(edge_table["source"].tolist() + edge_table["target"].tolist()))
    )
    print(
        len(edge_table.drop_duplicates())
    )
    

44
232


In [24]:
# Check multi-edges in graph
import pandas as pd

with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run(
        """
        MATCH (s)-[r]-(t)
        WITH s, t, count(r) as rel_cnt
        WHERE rel_cnt > 1
        RETURN s._canonical_id as source, t._canonical_id as target
        """)
    
    print(cursor.data())

[{'source': '1', 'target': '9'}, {'source': '0', 'target': '1'}, {'source': '3', 'target': '19'}, {'source': '7', 'target': '5'}, {'source': '18', 'target': '3'}, {'source': '13', 'target': '29'}, {'source': '2', 'target': '1'}, {'source': '2', 'target': '3'}, {'source': '1', 'target': '2'}, {'source': '14', 'target': '13'}, {'source': '9', 'target': '1'}, {'source': '13', 'target': '14'}, {'source': '19', 'target': '3'}, {'source': '2', 'target': '13'}, {'source': '3', 'target': '18'}, {'source': '6', 'target': '9'}, {'source': '29', 'target': '13'}, {'source': '6', 'target': '4'}, {'source': '3', 'target': '2'}, {'source': '5', 'target': '7'}, {'source': '1', 'target': '0'}, {'source': '5', 'target': '6'}, {'source': '13', 'target': '2'}, {'source': '18', 'target': '28'}, {'source': '9', 'target': '6'}, {'source': '28', 'target': '18'}, {'source': '6', 'target': '5'}, {'source': '4', 'target': '6'}]


In [25]:
# Check multi-edges in graph
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run(
        """
        MATCH (s)-[r]-(t)
        WHERE s._canonical_id='9' AND t._canonical_id='6'
        RETURN r
        """)
    d = cursor.data()
    print(d)
    
    assert len(d) == 2
    
    # there exists a 6->9 edge and 9<-6 edge

[{'r': <Relationship id=110 nodes=(<Node id=35 labels=set() properties={}>, <Node id=30 labels=set() properties={}>) type='sweets' properties={}>}, {'r': <Relationship id=87 nodes=(<Node id=30 labels=set() properties={}>, <Node id=35 labels=set() properties={}>) type='sweets' properties={}>}]


In [26]:
# Check edges with extra information
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run(
        """
        MATCH (s)-[r]-(t)
        WHERE s._canonical_id='1' AND t._canonical_id='0'
        RETURN r
        """)
    d = cursor.data()
    print(d)

[{'r': <Relationship id=129 nodes=(<Node id=15 labels=set() properties={}>, <Node id=34 labels=set() properties={}>) type='edge_group_type' properties={'common_group': 'A'}>}, {'r': <Relationship id=117 nodes=(<Node id=15 labels=set() properties={}>, <Node id=34 labels=set() properties={}>) type='b;f;toffee;toffee_extended' properties={'edge_prop_friendly': 'ep2', 'edge_prop_2': 'prop2'}>}]


In [27]:
# Check node group edges
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run(
        """
        MATCH ()-[r :edge_group_type]->()
        RETURN count(r)
        """)
    d = cursor.data()
    print(d)

[{'count(r)': 9}]


## Viz

In [28]:
# ### Connect with py2neo
# from py2neo import Graph
# graph = Graph("bolt://neo4j:test@neo4j:7687", user="neo4j", password="test")

# ### Plot with neo4jupyter
# import neo4jupyter
# neo4jupyter.init_notebook_mode()

# neo4jupyter.draw(graph, {"User": "id"})

### Build Lists - Should deprecate this too???

In [29]:
# # Build list
# if 'build_lists' in dolist:
#     logging.info("Building lists...")
#     build_node_lists(
#         graph_specification=graph_spec,
#         spark_config=(SparkConfFactory()
#                       .set_master('local[*]')
#                       .set_app_name('test create data')
#                       .set('spark.executor.memory', '1g')),
#         tables_path=tables_path,
#         node_path=n_path,
#         data_format=DATA_FORMAT,
#     )
#     build_edge_lists(
#         graph_specification=graph_spec,
#         spark_config=(SparkConfFactory()
#                       .set_master('local[*]')
#                       .set_app_name('test create data')
#                       .set('spark.executor.memory', '1g')),
#         tables_path=tables_path,
#         edge_path=e_path,
#         data_format=DATA_FORMAT,
#     )
#     logging.info("Checking build_lists...")
#     with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#         sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#         assert test_build_lists(spark_ctx, sql_context, spec)

In [30]:
# # Reads only needed columns and writes to HDFS, 1 file per nodekind and edgekind respectively
# with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#     sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#     sql_context.read.parquet("/datasets/finnet/test_data/node_list/fn_chocolate_nodes").show(5)

In [31]:
# # Reads only needed columns and writes to HDFS, 1 file per nodekind and edgekind respectively
# with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#     sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#     sql_context.read.parquet("/datasets/finnet/test_data/edge_list/fn_toffee_relations").show(5)

### ~Resolve Entities~ Drop... resolve entities to be done before persisting to hive

Lets drop entity res module in this form, any entity res should be done at point of injest

In [32]:
# # Resolve entities
# if 'resolve_entities' in dolist:
#     logging.info("Resolving entities...")
#     resolve_node_entities(
#         graph_specification=graph_spec,
#         spark_config=(SparkConfFactory()
#                       .set_master('local[*]')
#                       .set_app_name('test create data')
#                       .set('spark.executor.memory', '1g')),
#         entity_maps=dict(),
#         input_node_path=n_path,
#         output_node_path=n_path_res,
#         output_node_id='_canonical_id',
#         data_format=DATA_FORMAT
#     )
#     resolve_edge_entities(
#         graph_specification=graph_spec,
#         spark_config=(SparkConfFactory()
#                       .set_master('local[*]')
#                       .set_app_name('test create data')
#                       .set('spark.executor.memory', '1g')),
#         entity_maps=dict(),
#         input_edge_path=e_path,
#         output_edge_path=e_path_res,
#         output_edge_source_id='_canonical_id_source',
#         output_edge_target_id='_canonical_id_target',
#         data_format=DATA_FORMAT
#     )

In [33]:
# # Produces _canonical_id columns and writes to HDFS, 1 file per nodekind and edgekind respectively

# with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#     sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#     sql_context.read.parquet("/datasets/finnet/test_data/node_list_resolved/fn_toffee_nodes").show(5)

In [34]:
# # Produces _canonical_id columns and writes to HDFS, 1 file per nodekind and edgekind respectively

# with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#     sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#     sql_context.read.parquet("/datasets/finnet/test_data/edge_list_resolved/fn_toffee_relations").show(5)

In [35]:
# # What does EntityMapper do???
# with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#     from fncore.tasks.resolve_entities import EntityMapper
    
#     entityMapper = EntityMapper(spark_ctx, {})
#     # Think these are just placeholder column names
#     print(entityMapper._from)
#     print(entityMapper._to)
#     entityMapper._map.show()

In [36]:
# Seems like EntityMapper isn't doing anything here,
# since entity_map param is empty, _canonical_id just takes the id from index_column

## Viz

In [37]:
# ### Connect with py2neo
# from py2neo import Graph
# graph = Graph("bolt://neo4j:test@neo4j:7687", user="neo4j", password="test")

# ### Plot with neo4jupyter
# import neo4jupyter
# neo4jupyter.init_notebook_mode()

# neo4jupyter.draw(graph, {"User": "id"})

In [38]:
# # Second part?
# if 'neo4j_writer' in dolist:
#     # This part inserts the remainder of node properties that were not captured above
#     neo4j_writer.write_neo4j_nodes(graph_specification=spec,
#                                    spark_config=SparkConfFactory()
#                                    .set_master('local[*]')
#                                    .set_app_name('write neo4j nodes')
#                                    .set('spark.executor.memory',
#                                         '1g')
#                                    )

#     datetime_now = datetime.now()
#     logging.info("Backing up db, then purge it...")
#     neo4j_manager.backup(graph_spec, datetime_now,
#                          username=neo4j_ssh_username,
#                          port=neo4j_ssh_port)
#     neo4j_manager.purge(graph_spec,
#                         username=neo4j_ssh_username,
#                         port=neo4j_ssh_port)
#     logging.info("Restoring the backup to db...")
#     neo4j_manager.restore(graph_spec,
#                           datetime_now,
#                           username=neo4j_ssh_username,
#                           port=neo4j_ssh_port)

#     logging.info("Checking write neo4j...")
#     with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#         sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#         with get_neo4j_context(neo_config['uri']) as neo_context:
#             assert test_neo4j_writer(
#                 spark_ctx, sql_context, neo_context, spec
#             )

In [39]:
# if 'graph_tools' in dolist:
#     # Test graph_construction_coi.get_graph_dataframes
#     data_path = os.environ['PIPELINE_DATA_PATH']
#     graph_name = graph_spec.name
#     node_path_resolved = os.path.join(data_path, graph_name, 'node_list_resolved')
#     edge_path_resolved = os.path.join(data_path, graph_name, 'edge_list_resolved')
#     with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#         sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#         graph = get_graph_dataframes(graph_spec, sql_context,
#                                      node_path_resolved, edge_path_resolved,
#                                      DATA_FORMAT)

#         assert 'node_list' in graph
#         assert 'edge_list' in graph
#         assert len(graph['node_list']) == len(graph_spec.node_lists)
#         for cur_node_list in graph_spec.node_lists:
#             assert cur_node_list.safe_name in graph['node_list']
#         assert len(graph['edge_list']) == len(graph_spec.edge_lists)
#         for cur_edge_list in graph_spec.edge_lists:
#             assert cur_edge_list.safe_name in graph['edge_list']

#     # Test graph_construction_coi.data_loading
#     with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#         sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#         tables = load_node_edge_lists(sql_context, graph_spec,
#                                       node_path_resolved, edge_path_resolved,
#                                       DATA_FORMAT)
#         for cur_edge_list in graph_spec.edge_lists:
#             assert (cur_edge_list.safe_table_name,
#                     cur_edge_list.source_column.safe_name,
#                     cur_edge_list.target_column.safe_name) in tables
#         assert len(tables) == len(graph_spec.node_lists) + len(graph_spec.edge_lists)