# Run fullchain tests

See: finnet-pipeline/docker-tests/fullchain/run_tests.py

To add packages, append to `dags/requirements_py3.txt` and run `!pip3 install -r /usr/local/dags/requirements_py3.txt`

In [1]:
# !pip3 install -r /usr/local/dags/requirements_py3.txt

## Init Spark

In [2]:
import findspark
findspark.init("/usr/local/spark")

In [3]:
from pyspark import SparkContext

### Stop current SC, test assumes no existing SC
sc = SparkContext.getOrCreate()
sc.stop()

## Imports and Env

In [4]:
import os
os.environ["GRAPH_DB"] = """bolt://neo4j:test@neo4j:7687"""
os.environ["NEO4J_SSH_PORT"] = "22"
os.environ["NEO4J_SSH_USERNAME"] = "root"

In [5]:
os.environ['PIPELINE_DATA_PATH'] = "/datasets/finnet"
os.environ['PIPELINE_DATA_FORMAT'] = "parquet"

In [6]:
import sys
sys.path.insert(0, "/usr/local/dags")

from run_tests import *

## Run tests

In [7]:
# Set the list of tasks to test
dolist = [
    'build_lists', 'resolve_entities',
    'neo4j_purger', 'neo4j_writer',
    'graph_tools'
]

# Get neo4j ssh username and port
neo4j_ssh_username = os.environ.get('NEO4J_SSH_USERNAME', 'neo4j')
neo4j_ssh_port = int(os.environ.get('NEO4J_SSH_PORT', 9000))

# Setup the spark configuration
config = dict()
config['SparkConfiguration'] = (SparkConf()
                                .setMaster('local[*]')
                                .setAppName("test create data")
                                .set("spark.executor.memory", "1024m"))

# Get the graph specs
datalist = os.listdir(LOCAL_DATA_PATH)
jsonlist = [k for k in datalist if re.match(r'.*\.json$', k)]

In [8]:
# Only 1 json
gspec = jsonlist[0]

### Load Graph Spec

In [9]:
# Load the graph spec
with open(os.path.join(LOCAL_DATA_PATH, gspec), 'r') as f:
    graph_spec = GraphSpec.from_dict(json.load(f))
    spec = graph_spec.to_dict()

tables_path = os.path.join(DATA_PATH, graph_spec.name, 'tables')
n_path = os.path.join(DATA_PATH, graph_spec.name, 'node_list')
e_path = os.path.join(DATA_PATH, graph_spec.name, 'edge_list')
n_path_res = os.path.join(DATA_PATH, graph_spec.name, 'node_list_resolved')
e_path_res = os.path.join(DATA_PATH, graph_spec.name, 'edge_list_resolved')

logging.info("Processing " + gspec)

# Use graph specification's neo4j connection
neo_config = {
    'uri': spec['graph_uri'],
    'max_retries': config.get('neo4j.max_retries', 5),
    'max_batchsize': config.get('neo4j.max_batchsize', 10000)
}

In [10]:
spec

{'node_lists': [{'safe_name': 'fn_chocolate_nodes',
   'labels': ['chocolate'],
   'safe_table_name': 'fn_test_data_chocolate_node_list',
   'metadata_columns': [],
   'table_name': 'test_data_chocolate_node_list',
   'index_column': {'resolution_alias': 'chocolate',
    'no_append': False,
    'variable_definition': 'String',
    'safe_name': 'fn_id',
    'hidden': False,
    'name': 'id'},
   'name': 'chocolate nodes'},
  {'safe_name': 'fn_sweets_nodes',
   'labels': ['sweets'],
   'safe_table_name': 'fn_test_data_sweets_node_list',
   'metadata_columns': [{'no_append': True,
     'variable_definition': 'String',
     'safe_name': 'fn_prop',
     'friendly_name': 'sweetness number',
     'hidden': False,
     'name': 'prop'}],
   'table_name': 'test_data_sweets_node_list',
   'index_column': {'resolution_alias': 'sweets',
    'no_append': False,
    'variable_definition': 'String',
    'safe_name': 'fn_id',
    'hidden': False,
    'name': 'id'},
   'name': 'sweets nodes'},
  {'safe_

In [11]:
graph_spec.table_details

{'connection': 'data_uri_value',
 'poll_frequency': '0 2 * * *',
 'tables': {('test_data_chocolate_node_list',
   'fn_test_data_chocolate_node_list'): {('id', 'fn_id')},
  ('test_data_sweets_node_list',
   'fn_test_data_sweets_node_list'): {('id', 'fn_id'), ('prop', 'fn_prop')},
  ('test_data_toffee_node_list',
   'fn_test_data_toffee_node_list'): {('hide', 'fn_hide'),
   ('id', 'fn_id'),
   ('prop', 'fn_prop')},
  ('test_data_chocolate_edge_list',
   'fn_test_data_chocolate_edge_list'): {('chocolate_s',
    'fn_chocolate_s'), ('chocolate_t', 'fn_chocolate_t')},
  ('test_data_sweets_edge_list',
   'fn_test_data_sweets_edge_list'): {('sweets_s', 'fn_sweets_s'), ('sweets_t',
    'fn_sweets_t')},
  ('test_data_toffee_edge_list',
   'fn_test_data_toffee_edge_list'): {('src_meta', 'fn_src_meta'), ('toffee_s',
    'fn_toffee_s'), ('toffee_t', 'fn_toffee_t')}}}

### Build Lists

In [12]:
# Build list
if 'build_lists' in dolist:
    logging.info("Building lists...")
    build_node_lists(
        graph_specification=graph_spec,
        spark_config=(SparkConfFactory()
                      .set_master('local[*]')
                      .set_app_name('test create data')
                      .set('spark.executor.memory', '1g')),
        tables_path=tables_path,
        node_path=n_path,
        data_format=DATA_FORMAT,
    )
    build_edge_lists(
        graph_specification=graph_spec,
        spark_config=(SparkConfFactory()
                      .set_master('local[*]')
                      .set_app_name('test create data')
                      .set('spark.executor.memory', '1g')),
        tables_path=tables_path,
        edge_path=e_path,
        data_format=DATA_FORMAT,
    )
    logging.info("Checking build_lists...")
    with get_spark_context(config['SparkConfiguration']) as spark_ctx:
        sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
        assert test_build_lists(spark_ctx, sql_context, spec)

In [13]:
# Reads only needed columns and writes to HDFS, 1 file per nodekind and edgekind respectively

with get_spark_context(config['SparkConfiguration']) as spark_ctx:
    sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
    sql_context.read.parquet("/datasets/finnet/test_data/node_list/fn_chocolate_nodes").show(5)

+-----+
|fn_id|
+-----+
|    0|
|    1|
|    2|
|    3|
|    4|
+-----+
only showing top 5 rows



In [14]:
# Reads only needed columns and writes to HDFS, 1 file per nodekind and edgekind respectively

with get_spark_context(config['SparkConfiguration']) as spark_ctx:
    sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
    sql_context.read.parquet("/datasets/finnet/test_data/edge_list/fn_toffee_relations").show(5)

+-----------+-----------+-----------+
|fn_toffee_s|fn_toffee_t|fn_src_meta|
+-----------+-----------+-----------+
|          1|          0|        foo|
|          1|          9|        foo|
|          2|          3|        foo|
|          2|          4|        foo|
|          3|          4|        foo|
+-----------+-----------+-----------+
only showing top 5 rows



### Resolve Entities

In [15]:
# Resolve entities
if 'resolve_entities' in dolist:
    logging.info("Resolving entities...")
    resolve_node_entities(
        graph_specification=graph_spec,
        spark_config=(SparkConfFactory()
                      .set_master('local[*]')
                      .set_app_name('test create data')
                      .set('spark.executor.memory', '1g')),
        entity_maps=dict(),
        input_node_path=n_path,
        output_node_path=n_path_res,
        output_node_id='_canonical_id',
        data_format=DATA_FORMAT
    )
    resolve_edge_entities(
        graph_specification=graph_spec,
        spark_config=(SparkConfFactory()
                      .set_master('local[*]')
                      .set_app_name('test create data')
                      .set('spark.executor.memory', '1g')),
        entity_maps=dict(),
        input_edge_path=e_path,
        output_edge_path=e_path_res,
        output_edge_source_id='_canonical_id_source',
        output_edge_target_id='_canonical_id_target',
        data_format=DATA_FORMAT
    )

In [25]:
# Produces _canonical_id columns and writes to HDFS, 1 file per nodekind and edgekind respectively

with get_spark_context(config['SparkConfiguration']) as spark_ctx:
    sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
    sql_context.read.parquet("/datasets/finnet/test_data/node_list_resolved/fn_toffee_nodes").show(5)

+-----+-------+-------+-------------+
|fn_id|fn_prop|fn_hide|_canonical_id|
+-----+-------+-------+-------------+
|    7|      2|      8|            7|
|    3|      6|      4|            3|
|    8|      1|      9|            8|
|    0|      9|      1|            0|
|    5|      4|      6|            5|
+-----+-------+-------+-------------+
only showing top 5 rows



In [17]:
# Produces _canonical_id columns and writes to HDFS, 1 file per nodekind and edgekind respectively

with get_spark_context(config['SparkConfiguration']) as spark_ctx:
    sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
    sql_context.read.parquet("/datasets/finnet/test_data/edge_list_resolved/fn_toffee_relations").show(5)

+-----------+-----------+-----------+--------------------+--------------------+
|fn_toffee_s|fn_toffee_t|fn_src_meta|_canonical_id_source|_canonical_id_target|
+-----------+-----------+-----------+--------------------+--------------------+
|          3|          4|        foo|                   3|                   4|
|          2|          4|        foo|                   2|                   4|
|          8|          9|        bar|                   8|                   9|
|          1|          9|        foo|                   1|                   9|
|          4|          9|        bar|                   4|                   9|
+-----------+-----------+-----------+--------------------+--------------------+
only showing top 5 rows



In [18]:
# What does EntityMapper do???
with get_spark_context(config['SparkConfiguration']) as spark_ctx:
    from fncore.tasks.resolve_entities import EntityMapper
    
    entityMapper = EntityMapper(spark_ctx, {})
    # Think these are just placeholder column names
    print(entityMapper._from)
    print(entityMapper._to)
    entityMapper._map.show()

from-c2733e7f-e535-4f9f-9c0c-43c2e811f594
to-278f55cc-d089-4b98-ae6a-6e153ed81d74
+-----------------------------------------+---------------------------------------+
|from-c2733e7f-e535-4f9f-9c0c-43c2e811f594|to-278f55cc-d089-4b98-ae6a-6e153ed81d74|
+-----------------------------------------+---------------------------------------+
+-----------------------------------------+---------------------------------------+



In [19]:
# Seems like EntityMapper isn't doing anything here,
# since entity_map param is empty, _canonical_id just takes the id from index_column

### Purge existing Graph

In [20]:
# Purging the graph
if 'neo4j_purger' in dolist:
    logging.info("Purging Neo4j...")
    neo4j_manager.purge(graph_spec,
                        username=neo4j_ssh_username,
                        port=neo4j_ssh_port)
    logging.info("Checking purging neo4j...")
    with get_neo4j_context(neo_config['uri']) as neo_context:
        assert test_neo4j_purger(neo_context)

In [21]:
# Check nodes in graph
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run("MATCH (n) RETURN count(n) as count")
    print(cursor.data())

[{'count': 0}]


### Write nodes and edges to graph

In [22]:
# Graph writer
if 'neo4j_writer' in dolist:
    logging.info("Writing to Neo4j...")
        
    graph_to_neo4j.graph_to_neo4j(graph_specification=graph_spec,
                                  spark_config=SparkConfFactory()
                                  .set_master('local[*]')
                                  .set_app_name('write neo4j nodes')
                                  .set("spark.driver.maxResultSize",
                                       "1g")
                                  .set('spark.executor.memory',
                                       '1g'),
                                  input_node_path=n_path_res,
                                  input_edge_path=e_path_res,
                                  username=neo4j_ssh_username,
                                  port=neo4j_ssh_port,
                                  debug_write=True,
                                  verbose=True
                                  )
    

Wrote temp nodes .csv to /tmp/tmp65903z1o/tmpc0dqmpxr.csv
Wrote temp edges .csv to /tmp/tmp65903z1o/tmptw562_l4.csv
Wrote temp edges .csv to /tmp/tmp65903z1o/tmp9nmm1u7z.csv
Wrote temp edges .csv to /tmp/tmp65903z1o/tmp8ubqxy_w.csv


## Viz

In [27]:
### Connect with py2neo
from py2neo import Graph
graph = Graph("bolt://neo4j:test@neo4j:7687", user="neo4j", password="test")

In [28]:
### Plot with neo4jupyter
import neo4jupyter
neo4jupyter.init_notebook_mode()

neo4jupyter.draw(graph, {"User": "id"})

<IPython.core.display.Javascript object>

In [29]:
# Check nodes in graph
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run("MATCH (n) RETURN count(n) as count")
    print(cursor.data())

[{'count': 45}]


In [30]:
# Check nodes in graph
with get_neo4j_context(neo_config['uri']) as neo_context:
    cursor = neo_context.run("MATCH (n) RETURN n")
    for n in cursor.data():
        print(n)

{'n': <Node id=0 labels={'_searchable', 'chocolate', 'sweets'} properties={'_canonical_id': '16', '_label': '16'}>}
{'n': <Node id=1 labels={'_searchable', 'chocolate'} properties={'_canonical_id': '35'}>}
{'n': <Node id=2 labels={'_searchable', 'chocolate'} properties={'_canonical_id': '20'}>}
{'n': <Node id=3 labels={'_searchable', 'chocolate'} properties={'_canonical_id': '36'}>}
{'n': <Node id=4 labels={'_searchable', 'chocolate', 'sweets'} properties={'_canonical_id': '12', '_label': '12'}>}
{'n': <Node id=5 labels={'_searchable', 'chocolate'} properties={'_canonical_id': 'illegal \u0381code'}>}
{'n': <Node id=6 labels={'_searchable', 'sweets', 'chocolate', 'toffee', 'edge_label'} properties={'_canonical_id': '3', '_label': '3;foo;6'}>}
{'n': <Node id=7 labels={'_searchable', 'chocolate', 'sweets', 'toffee'} properties={'_canonical_id': '8', '_label': '1;bar;8'}>}
{'n': <Node id=8 labels={'_searchable', 'chocolate'} properties={'_canonical_id': '31'}>}
{'n': <Node id=9 labels={'_s

Okay the weird- ass `'_label': '1;8;foo'` is actually correct, concatenate all properties: `_canonical_id`, `fn_prop` and `fn_src_meta` under `'_label`

In [26]:
# wahlaueh.. finnet a lot of vestigial code

In [27]:
spec

{'node_lists': [{'labels': [],
   'metadata_columns': [],
   'safe_name': 'fn_chocolate_nodes',
   'table_name': 'test_data_chocolate_node_list',
   'safe_table_name': 'fn_test_data_chocolate_node_list',
   'name': 'chocolate nodes',
   'index_column': {'safe_name': 'fn_id',
    'hidden': False,
    'resolution_alias': 'chocolate',
    'no_append': False,
    'name': 'id',
    'variable_definition': 'String'}},
  {'labels': [],
   'metadata_columns': [{'friendly_name': 'sweetness number',
     'safe_name': 'fn_prop',
     'hidden': False,
     'no_append': True,
     'name': 'prop',
     'variable_definition': 'String'}],
   'safe_name': 'fn_sweets_nodes',
   'table_name': 'test_data_sweets_node_list',
   'safe_table_name': 'fn_test_data_sweets_node_list',
   'name': 'sweets nodes',
   'index_column': {'safe_name': 'fn_id',
    'hidden': False,
    'resolution_alias': 'sweets',
    'no_append': False,
    'name': 'id',
    'variable_definition': 'String'}},
  {'labels': [],
   'metadat

In [35]:
node_list = spec.get('node_lists')

In [36]:
node_kind = node_list[2]

In [40]:
from fncore_py3.utils.neo4j_tools import *
from fncore_py3.utils.graph_specification import get_friendly_name_mapping, get_fields_with_property

In [42]:
mapping = get_friendly_name_mapping(node_kind)

# Get the hidden fields
hiddenfields = get_fields_with_property(
    node_kind, prop='hidden')

# Get the labelled fields
labelledfields = get_fields_with_property(
    node_kind, prop='no_append')

# I suppose these are temp labels...?
indexfields = ['_label' if k == 0 else '_label_' + str(k)
               for k in range(len(labelledfields))]

labelledfields.append(
    node_kind['index_column'].get('safe_name'))
indexfields.append('_node_id')

noappendfields = indexfields + [labelledfields[-1]]

tags = node_kind['labels'] + ['_searchable']

In [43]:
mapping

{'fn_id': 'fn_id', 'fn_prop': 'sweetness number', 'fn_hide': 'hidden value'}

In [44]:
hiddenfields

['fn_hide']

In [45]:
labelledfields

['fn_prop', 'fn_id']

In [46]:
indexfields

['_label', '_node_id']

In [47]:
noappendfields

['_label', '_node_id', 'fn_id']

In [48]:
tags

['toffee', '_searchable']

In [49]:
with get_neo4j_context(neo_config['uri']) as neo_context:
    for tag in tags:
        create_uniqueness_constraint(neo_context, tag, '_canonical_id')

In [50]:
with get_neo4j_context(neo_config['uri']) as neo_context:        
    print(neo_context.run("CALL db.constraints").data())

[{'description': 'CONSTRAINT ON ( _searchable:_searchable ) ASSERT _searchable._canonical_id IS UNIQUE'}, {'description': 'CONSTRAINT ON ( toffee:toffee ) ASSERT toffee._canonical_id IS UNIQUE'}]


In [51]:
generate_set_label_statements(tags)

'set n:`toffee`:`_searchable`'

In [None]:
tag_stmt

In [None]:
with get_spark_context(config['SparkConfiguration']) as spark_ctx:
    sql_context = SQLContext(spark_ctx)
    
    data = sql_context\
        .read.format('parquet')\
        .option('header', 'true')\
        .option('inferschema', 'true')\
        .load('/datasets/finnet/test_data/node_list_resolved/fn_toffee_nodes')\
        .dropna(how='any', subset=['_canonical_id'])
    
    rows = data.take(10)
    rowdict = rows[0].asDict()

In [None]:
generate_set_prop_statements(rowdict, hiddenfields, noappendfields, mapping)

In [None]:
rowdict

In [None]:
excludefields, noappendfields, mapping

In [None]:
!hdfs dfs -ls /datasets/finnet/test_data/node_list_resolved

In [None]:
with get_neo4j_context(neo_config['uri']) as neo_context:
    statement = "call db.indexes() yield description return description"
    d = neo_context.run(statement).data()

In [None]:
d

In [None]:
with get_neo4j_context(neo_config['uri']) as neo_context:
    from fncore_py3.tasks.neo4j_writer import *
    x = get_indexes(neo_context)

In [None]:
x

In [None]:
# # Second part?
# if 'neo4j_writer' in dolist:
#     # This part inserts the remainder of node properties that were not captured above
#     neo4j_writer.write_neo4j_nodes(graph_specification=spec,
#                                    spark_config=SparkConfFactory()
#                                    .set_master('local[*]')
#                                    .set_app_name('write neo4j nodes')
#                                    .set('spark.executor.memory',
#                                         '1g')
#                                    )

#     datetime_now = datetime.now()
#     logging.info("Backing up db, then purge it...")
#     neo4j_manager.backup(graph_spec, datetime_now,
#                          username=neo4j_ssh_username,
#                          port=neo4j_ssh_port)
#     neo4j_manager.purge(graph_spec,
#                         username=neo4j_ssh_username,
#                         port=neo4j_ssh_port)
#     logging.info("Restoring the backup to db...")
#     neo4j_manager.restore(graph_spec,
#                           datetime_now,
#                           username=neo4j_ssh_username,
#                           port=neo4j_ssh_port)

#     logging.info("Checking write neo4j...")
#     with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#         sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#         with get_neo4j_context(neo_config['uri']) as neo_context:
#             assert test_neo4j_writer(
#                 spark_ctx, sql_context, neo_context, spec
#             )

In [None]:
# if 'graph_tools' in dolist:
#     # Test graph_construction_coi.get_graph_dataframes
#     data_path = os.environ['PIPELINE_DATA_PATH']
#     graph_name = graph_spec.name
#     node_path_resolved = os.path.join(data_path, graph_name, 'node_list_resolved')
#     edge_path_resolved = os.path.join(data_path, graph_name, 'edge_list_resolved')
#     with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#         sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#         graph = get_graph_dataframes(graph_spec, sql_context,
#                                      node_path_resolved, edge_path_resolved,
#                                      DATA_FORMAT)

#         assert 'node_list' in graph
#         assert 'edge_list' in graph
#         assert len(graph['node_list']) == len(graph_spec.node_lists)
#         for cur_node_list in graph_spec.node_lists:
#             assert cur_node_list.safe_name in graph['node_list']
#         assert len(graph['edge_list']) == len(graph_spec.edge_lists)
#         for cur_edge_list in graph_spec.edge_lists:
#             assert cur_edge_list.safe_name in graph['edge_list']

#     # Test graph_construction_coi.data_loading
#     with get_spark_context(config['SparkConfiguration']) as spark_ctx:
#         sql_context = SQLContext(spark_ctx, sparkSession=SparkSession(spark_ctx))
#         tables = load_node_edge_lists(sql_context, graph_spec,
#                                       node_path_resolved, edge_path_resolved,
#                                       DATA_FORMAT)
#         for cur_edge_list in graph_spec.edge_lists:
#             assert (cur_edge_list.safe_table_name,
#                     cur_edge_list.source_column.safe_name,
#                     cur_edge_list.target_column.safe_name) in tables
#         assert len(tables) == len(graph_spec.node_lists) + len(graph_spec.edge_lists)