Before running the notebook, install the required libraries on your Databricks cluster.

From PyPi:
* Faker==24.14
* dbldatagen
* graphdatascience

From Maven:
* org.neo4j:neo4j-connector-apache-spark_2.12:5.3.0_for_spark_3

Check the [compatability table](https://neo4j.com/docs/spark/current/installation/) to make sure you have a version of the Neo4j Spark Connector that is compatable with your Spark runtime.

In [None]:
import dbldatagen as dg
from pyspark.sql.types import FloatType, IntegerType, StringType, DoubleType
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from graphdatascience import GraphDataScience
from getpass import getpass

In [None]:
neo4j_password = getpass("Neo4j password")

Neo4j password [REDACTED]

In [None]:
url = "neo4j+s://5f8297f1.databases.neo4j.io"
username = "neo4j"

# Create constraints
This is a one-time step to establish the constraints in Neo4j. Every property key that you use for loading data with the Spark connector should have a unique constraint. Node key is a stronger constraint that requires existance + uniqueness.

In [None]:
gds = GraphDataScience(url, auth=(username, neo4j_password))
gds.run_cypher("""CREATE CONSTRAINT userDescription_node_key IF NOT EXISTS for (u:UserDescription) REQUIRE u.userId IS NODE KEY""")
gds.run_cypher("""CREATE CONSTRAINT personName_node_key IF NOT EXISTS for (u:PersonName) REQUIRE (u.fullName) IS NODE KEY""")
gds.run_cypher("""CREATE CONSTRAINT phoneNumber_node_key IF NOT EXISTS for (p:PhoneNumber) REQUIRE p.phoneNumber IS NODE KEY""")
gds.run_cypher("""CREATE CONSTRAINT socialSecurityNumber_node_key IF NOT EXISTS for (ssn:SocialSecurityNumber) REQUIRE ssn.socialSecurityNumber IS NODE KEY""")
gds.run_cypher("""CREATE CONSTRAINT email_node_key IF NOT EXISTS FOR (e:Email) REQUIRE e.email IS NODE KEY""")
gds.run_cypher("""CREATE CONSTRAINT dob_node_key IF NOT EXISTS FOR (d:DOB) REQUIRE d.birthdateString IS NODE KEY""")

# Set basic spark configurations

In [None]:
spark = (
    SparkSession.builder.config("neo4j.url", url)
    .appName("FakeProfiles")
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", neo4j_password)
    .getOrCreate()
)

# Generate a data frame of fake data
This version of the code has person names and street addresses that are strings of a realistic lenght, but are not very realistic sounding.

In [None]:
row_count = 50000000
testDataSpec = (
    dg.DataGenerator(spark, name="test_data_set1", rows=row_count, partitions=1, random=True, randomSeedMethod="hash_fieldname")
    .withColumn("firstName", template=r'A\wa')
    .withColumn("lastName", template=r'A\wa')
    .withColumn("middleName", template=r'A\wa')
    .withColumn("phoneNumber", template=r'ddd-ddd-dddd')
    .withColumn("socialSecurityNumber", template=r'ddd-dd-dddd')
    .withColumn("email", template=r'\wa.\waDD@\w.com')    
    .withColumn(
        "birthdate",
        "date",
        data_range=dg.DateRange("1930-01-01 00:00:00", "2006-01-01 00:00:00", "days=1")
    )
    .withColumn("userId", template=r'kkkkkkkk-kkkk-kkkk-kkkk-kkkkkkkkkkkk')
    )

profiles_df = testDataSpec.build()
profiles_df = profiles_df.cache()
profiles_df = profiles_df.repartition(8)

In [None]:
profiles_df.select("userId").distinct().count()

50000000

In [None]:
profiles_df = (
    profiles_df
    .withColumn("fullName", 
                F.concat_ws(" ",
                            F.concat_ws(", ", profiles_df.lastName, profiles_df.firstName),
                            profiles_df.middleName)
                )
    .withColumn("birthdateString", F.date_format(profiles_df.birthdate,
                                                  "yyyy-MM-dd"))
)
    
    


In [None]:
profiles_df.select("fullName").distinct().count()

49999977

In [None]:
profiles_df.select("phoneNumber").distinct().count()

49875010

In [None]:
profiles_df.select("socialSecurityNumber").distinct().count()

48770676

In [None]:
profiles_df.select("birthdate").distinct().count()

27760

In [None]:
profiles_df.select("email").distinct().count()

49870195

In [None]:
display(profiles_df.head(100))

firstName,lastName,middleName,phoneNumber,socialSecurityNumber,email,birthdate,userId,fullName,birthdateString
Itemporw,Fessel,Bind,159-210-2769,149-23-4963,autef.cupidatati84@consequat.com,1971-05-22,8fp4oihd-ey2o-uzk7-uic4-85ywzy3d323j,"Fessel, Itemporw Bind",1971-05-22
Rlaborumd,Iduisw,Gint,079-756-7883,007-42-4714,ipsumh.magnay14@pariatur.com,1936-03-31,9qpfaf0f-mg9w-oj0v-u4oz-qi9bnl9h0pro,"Iduisw, Rlaborumd Gint",1936-03-31
Qirurev,Xvoluptatem,Kautef,037-835-8408,116-68-3039,quif.consecteturu64@laborum.com,1949-05-19,8y4dn33m-orb4-hzw9-iudb-xjlvrocm895o,"Xvoluptatem, Qirurev Kautef",1949-05-19
Dametm,Zeiusmodi,Boccaecatw,295-298-5114,485-17-2684,utn.adipiscingk38@dolor.com,1995-07-01,nibysssf-lkbc-a2xf-f3dy-gsqvojzm6a8y,"Zeiusmodi, Dametm Boccaecatw",1995-07-01
Tlaborumk,Nmollitk,Uvelitu,682-509-9330,608-76-5818,utc.sity92@ad.com,1932-03-31,zb702gv7-s6w2-7unu-eib2-9d783eu7g95f,"Nmollitk, Tlaborumk Uvelitu",1932-03-31
Qdeseruntg,Wcillumb,Zeub,464-950-7310,051-35-3621,nonp.mollitl35@dolore.com,1998-06-06,mpa9lgqq-m74n-h2e9-o0op-lb50ucg9kbac,"Wcillumb, Qdeseruntg Zeub",1998-06-06
Cexm,Uanimr,Jautek,278-466-9524,930-22-4649,utz.utu54@nulla.com,1965-09-24,nzulz5l5-3ipa-lrp8-08eh-lsbkio2bl223,"Uanimr, Cexm Jautek",1965-09-24
Npariaturt,Qdod,Dlaborums,875-771-5649,201-61-1638,ipsumz.loreml94@commodo.com,1981-07-12,ovophl7b-u0ww-qx7l-2p3r-ex8rkf284zh3,"Qdod, Npariaturt Dlaborums",1981-07-12
Vutk,Pullamcop,Uoccaecatc,109-402-7817,891-23-3031,commodoe.mollith55@excepteur.com,1949-10-01,8kipskfl-w0tg-yu8s-mibk-l1lgmsoy5nuj,"Pullamcop, Vutk Uoccaecatc",1949-10-01
Cmollity,Aeuo,Futp,901-785-4791,462-26-0376,ipsuml.nonh35@deserunt.com,1966-12-05,39ywsro5-nfyb-zd81-hdaw-0cjs3pchdkpi,"Aeuo, Cmollity Futp",1966-12-05


# Send data to Neo4j
Send nodes first in parallel.
Repartition the data frame to a single partition.
Send the relationships serially.

In [None]:
(
    profiles_df.select(F.col('userId'))
    .distinct()
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", "UserDescription")
    .option("node.keys", "userId")
    .save()
)

In [None]:
(
    profiles_df.select(F.col('firstName'), F.col('lastName'), F.col('middleName'), F.col('fullName'))
    .distinct()
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", "PersonName")
    .option("node.keys", "fullName")
    .save()
)

In [None]:
(
    profiles_df.select(F.col('phoneNumber'))
    .distinct()
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", "PhoneNumber")
    .option("node.keys", "phoneNumber")
    .save()
)

In [None]:
(
    profiles_df.select(F.col('socialSecurityNumber'))
    .distinct()
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", "SocialSecurityNumber")
    .option("node.keys", "socialSecurityNumber")
    .save()
)

In [None]:
(
    profiles_df.select(F.col('email'))
    .distinct()
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", "Email")
    .option("node.keys", "email")
    .save()
)

In [None]:
(
    profiles_df.select(F.col('birthdate'), F.col('birthdateString'))
    .distinct()
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", "DOB")
    .option("node.keys", "birthdateString")
    .save()
)

In [None]:
profiles_df = profiles_df.repartition(8)

In [None]:
(
        profiles_df.select(F.col('userId'), F.col('fullName'))
        .write.format("org.neo4j.spark.DataSource")
        .mode("Overwrite")
        .option("query", 
                """MATCH (u:UserDescription {userId:event.userId}), (n:PersonName {fullName:event.fullName}) 
                        CREATE (u)-[:HAS_NAME]->(n) """)
        .save()
)

In [None]:
(
        profiles_df.select(F.col('userId'), F.col('phoneNumber'))
        .write.format("org.neo4j.spark.DataSource")
        .mode("Append")
        .option("query", 
                """MATCH (u:UserDescription {userId:event.userId}), (n:PhoneNumber {phoneNumber:event.phoneNumber}) 
                   CREATE (u)-[:HAS_PHONE]->(n) """)
        .save()
    )

In [None]:
    (
        profiles_df.select(F.col('userId'), F.col('socialSecurityNumber'))
        .write.format("org.neo4j.spark.DataSource")
        .mode("Append")
        .option("query", 
                """MATCH (u:UserDescription {userId:event.userId}), (n:SocialSecurityNumber {socialSecurityNumber:event.socialSecurityNumber}) 
                   CREATE (u)-[:HAS_SOCIAL_SECURITY_NUMBER]->(n) """)
        .save()
    )

ERROR:neo4j.io:Failed to write data to connection ResolvedIPv4Address(('34.28.32.244', 7687)) (ResolvedIPv4Address(('34.28.32.244', 7687)))
ERROR:neo4j.io:Failed to write data to connection IPv4Address(('5f8297f1.databases.neo4j.io', 7687)) (ResolvedIPv4Address(('34.28.32.244', 7687)))


In [None]:
    (
        profiles_df.select(F.col('userId'), F.col('email'))
        .write.format("org.neo4j.spark.DataSource")
        .mode("Append")
        .option("query", 
                """MATCH (u:UserDescription {userId:event.userId}), (n:Email {email:event.email}) 
                   CREATE (u)-[:HAS_EMAIL]->(n) """)
        .save()
    )

In [None]:
from pyspark.sql.functions import col, substring

# Add new column "decade" with first three characters of "birthdateString"
profiles_df = profiles_df.withColumn("decade", F.substring(F.col("birthdateString"), 1, 3))

# Repartition the dataframe by the new "decade" column
profiles_df = profiles_df.repartition(col("decade"))

display(profiles_df.head(5))

firstName,lastName,middleName,phoneNumber,socialSecurityNumber,email,birthdate,userId,fullName,birthdateString,decade
Lculpap,Heiusmodg,Xadr,406-743-3425,429-87-4058,excepteurw.deserunty71@adipiscing.com,2001-06-12,zxvskq8i-76ug-vq7r-fww3-fiq2nv3v3vyq,"Heiusmodg, Lculpap Xadr",2001-06-12,200
Gquisz,Uloreml,Kaliquipj,805-983-6249,717-29-8716,essep.doloro62@ut.com,2005-01-29,2nhl24nz-418h-ek60-gcif-3erd6ehc13i8,"Uloreml, Gquisz Kaliquipj",2005-01-29,200
Feut,Vidl,Ddolorew,526-728-5834,192-11-8024,cillumj.proidentd28@occaecat.com,2000-12-31,dwqyso3e-n184-s9rv-ccbs-dw2606wqbyos,"Vidl, Feut Ddolorew",2000-12-31,200
Areprehenderitr,Ldeseruntv,Atempora,833-419-8701,547-26-5496,dolorl.duisp22@ad.com,2001-02-17,bpt2i5yk-8d06-cx8x-rqn1-r4fmobjwotex,"Ldeseruntv, Areprehenderitr Atempora",2001-02-17,200
Iamets,Yutf,Gins,881-154-0440,961-88-5357,estg.autes44@in.com,2001-05-31,jc88ajyr-sbhi-un8z-ml8h-e8wtf2aogsdx,"Yutf, Iamets Gins",2001-05-31,200


In [None]:
    (
        profiles_df.select(F.col('userId'), F.col('birthdateString'))
        .write.format("org.neo4j.spark.DataSource")
        .mode("Append")
        .option("query", 
                """MATCH (u:UserDescription {userId:event.userId}), (n:DOB {birthdateString:event.birthdateString}) 
                   CREATE (u)-[:HAS_DOB]->(n) """)
        .save()
    )