In [None]:
%%configure -f
{"driverMemory": "32000M","executorCores":15,
"conf": {
    "spark.jars": "s3:/<s3_bucket_name>/neo4j/neo4j-connector-apache-spark_2.11-4.1.2_for_spark_2.4.jar"
}
}

In [11]:
import datetime
import os
import boto3

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
#main bucket for parquet files to read into dataframe
final_df = spark.read.parquet("s3://<s3_bucket_name>/parquet_files/")

final_df.createOrReplaceTempView("final")

#count of dataframe
print(f'Total Rows: {final_df.count():,}')

#example of various forms of filtering
filtered_df = final_df.filter("<column_name> is not null")

#count of filtered datafram
print(f'Total Rows after filter: {filtered_df.count():,}')

In [15]:

"""
base configuration for neo4j
"""
config = {
    "bolt_url": "bolt://<neo_db_ip>>:7687",
    "database": "<database_name>",
    "user": "<user_name>",
    "password": "<password>",
    "batch_size": "500"
}

"""
map to descibe nodes and relationships passed into the neo read/write methods. Would like to move to yaml and maintain externally in git &/or s3 with validation.
"""
model_map = {
    "node1_node2" : {
        "source" : {
            "node": ":Node1",
            "property": "id_field:id"
        },
        "target" : {
            "node": ":Node2",
            "property": "id_field:id"
        },
        "relationship": {
            "rel": "IS_RELATED",
            "properties": ""
        }
    }
}

"""
method to accept node name as param and return a string of cypher, if needed.
"""
def get_cypher_query(node_name):
    cypher_map = {
        "node1": "MERGE (n:Node1 {id: event.id_field, createdDate: event.date})",
        "node2": "MERGE (n:Node1 {id: event.id_field, createdDate: event.date})"
    }
    return cypher_map[node_name.lower()]

"""
write to neo4j with cypher, calling the 'get_cypher_method' with the name of the node as a param
"""
def write_neo4j_nodes(df, node_name):
    df.write \
        .format("org.neo4j.spark.DataSource") \
        .mode("Overwrite") \
        .option("url", config["bolt_url"]) \
        .option("batch.size", config["batch_size"]) \
        .option("database", config["database"]) \
        .option("authentication.type", "basic") \
        .option("authentication.basic.username",  config["user"]) \
        .option("authentication.basic.password", config["password"]) \
        .option("query", get_cypher_query(node_name)) \
        .save()

"""
method to write a source and target node with a relationship using the keys stragegy - https://neo4j.com/docs/spark/current/writing/#write-rel
The dataframe and map_model_key are passed in as parameters, the key resolves to the nodes/relationship held in the model_map_above
"""
def write_neo4j_node_relationship(df, model_map_key):
    df.write \
        .format("org.neo4j.spark.DataSource") \
        .mode("overwrite") \
        .option("url", config["bolt_url"]) \
        .option("batch.size", config["batch_size"]) \
        .option("database", config["database"]) \
        .option("authentication.type", "basic") \
        .option("authentication.basic.username", config["user"]) \
        .option("authentication.basic.password", config["password"]) \
        .option("relationship", model_map[model_map_key]["relationship"]["rel"]) \
        .option("relationship.properties", model_map[model_map_key]["relationship"]["properties"]) \
        .option("relationship.save.strategy", "keys") \
        .option("relationship.source.labels", model_map[model_map_key]["source"]["node"]) \
        .option("relationship.source.save.mode", "overwrite") \
        .option("relationship.source.node.keys", model_map[model_map_key]["source"]["property"]) \
        .option("relationship.target.labels",  model_map[model_map_key]["target"]["node"]) \
        .option("relationship.target.node.keys", model_map[model_map_key]["target"]["property"]) \
        .option("relationship.target.save.mode", "overwrite") \
    .save()

    
def read_neo4j_nodes(node_name):
    spark.read.format("org.neo4j.spark.DataSource") \
        .option("url", config["bolt_url"]) \
        .option("database", config["database"]) \
        .option("authentication.type", "basic") \
        .option("authentication.basic.username", config["user"]) \
        .option("authentication.basic.password", config["password"]) \
        .option("labels", node_name) \
        .load() \
        .show()


#use to reduce partitions if locks in neo4j occur 
#final_df = final_df.coalesce(1)

#further filtering
no_empty_strings_in_column_df = final_df.filter("Column1 != ''")
concatenate_columns_for_unique_id_df = final_df.withColumn("ID", md5(regexp_replace(lower(concat("Column1","Column2","Column3","Column4", "Column5")), "\\s+", "")))
not_null_in_column_df = final_df.filter("Column1 is not null")

#print out one record for debugging
print(final_df.collect()[1])

#write out nodes
write_neo4j_nodes(final_df, "node1")
write_neo4j_nodes(final_df, "node2")

#write out nodes/relationships
write_neo4j_node_relationship(no_empty_strings_in_column_df, "node1_node2")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…