In [None]:
%%configure -f
{
  "conf": {
    "spark.jars": "abfss://f7e2db24-d041-47bb-ae02-0c67982c6c4f@onelake.dfs.fabric.microsoft.com/8fcfa082-2992-46d9-9794-dea041d8c11a/Files/Drivers/neo4j-spark-connector-5.3.8-s_2.12.jar"
  }
}

In [None]:
%%spark


// Set base path
val absfss_Base_Path = "abfss://f7e2db24-d041-47bb-ae02-0c67982c6c4f@onelake.dfs.fabric.microsoft.com/8fcfa082-2992-46d9-9794-dea041d8c11a/Files/Northwind/"

// Import required libraries
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper

// Create Spark session
val spark = SparkSession.builder().appName("Neo4j Workbook").getOrCreate()

In [None]:
%%spark
// Read Northwind data files
val customerDF = spark.read.option("header", true).csv(absfss_Base_Path + "customers.csv")
val supplierDF = spark.read.option("header", true).csv(absfss_Base_Path + "suppliers.csv")
val stagedOrderDF = spark.read.option("header", true).csv(absfss_Base_Path + "orders.csv")
  .withColumn("addressID", concat_ws(", ", col("shipName"), col("shipAddress"), 
  col("shipCity"), col("shipRegion"), col("shipPostalCode"), col("shipCountry")))
val orderDetailDF = spark.read.option("header", true).csv(absfss_Base_Path + "order-details.csv")
val productDF = spark.read.option("header", true).csv(absfss_Base_Path + "products.csv")
val categoryDF = spark.read.option("header", true).csv(absfss_Base_Path + "categories.csv")

//create seperate addressesDF and finalize orderDF
val addressDF = stagedOrderDF
 .select($"addressID", 
    $"shipName".alias("name"), 
    $"shipAddress".alias("address"), 
    $"shipCity".alias("city"), 
    $"shipRegion".alias("region"), 
    $"shipPostalCode".alias("postalCode"), 
    $"shipCountry".alias("country"))
 .dropDuplicates("addressID")
val orderDF = stagedOrderDF.drop("shipName","shipAddress", "shipCity", "shipRegion", "shipPostalCode", "shipCountry")
//display(orderDF)

In [None]:
%%spark
// Neo4j connection details
// Hardcoded here.  You can also import from json file stored in lakehouse.  
//See Neo4jToLakehouse workbook for example
val neo4jUrl = "neo4j+s://a2e6569d.databases.neo4j.io"
val neo4jUsername = "neo4j"
val neo4jPassword = "UpdateThis"

// Write nodes to Neo4j
def writeNodesToNeo4j(dataFrame: DataFrame, label: String, nodeKey: String): Unit = {
  dataFrame.write.format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Overwrite)
    .option("url", neo4jUrl)
    .option("authentication.basic.username", neo4jUsername)
    .option("authentication.basic.password", neo4jPassword)
    .option("labels", label)
    .option("node.keys", nodeKey)
    .option("schema.optimization.node.keys", "KEY") //create node key constraints under the hood
    .save()
}

writeNodesToNeo4j(customerDF, "Customer", "customerID")
writeNodesToNeo4j(supplierDF, "Supplier", "supplierID")
writeNodesToNeo4j(orderDF, "Order", "orderID")
writeNodesToNeo4j(productDF, "Product", "productID")
writeNodesToNeo4j(categoryDF, "Category", "categoryID")
writeNodesToNeo4j(addressDF, "Address", "addressID")


In [None]:
%%spark
// Write relationships to Neo4j

case class NodeInfo(labels:String, keys:String)  

def writeRelsToNeo4j(dataFrame: DataFrame, sourceNode: NodeInfo, relType: String, targetNode: NodeInfo): Unit = {
  dataFrame.write.format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Overwrite)
    .option("url", neo4jUrl)
    .option("authentication.basic.username", neo4jUsername)
    .option("authentication.basic.password", neo4jPassword)
    .option("relationship.save.strategy", "keys")
    .option("relationship", relType)
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", sourceNode.labels)
    .option("relationship.source.node.keys", sourceNode.keys)
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", targetNode.labels)
    .option("relationship.target.node.keys", targetNode.keys)
    .save()
}

writeRelsToNeo4j(productDF.select($"productID", $"categoryID"), NodeInfo("Product", "productID"), "BELONGS_TO", NodeInfo("Category", "categoryID"))
writeRelsToNeo4j(productDF.select($"productID", $"supplierID"), NodeInfo("Product", "productID"), "SUPPLIED_BY", NodeInfo("Supplier", "supplierID"))
writeRelsToNeo4j(orderDetailDF.select($"orderID", $"productID"), NodeInfo("Order", "orderID"), "ORDER_CONTAINS", NodeInfo("Product", "productID"))
writeRelsToNeo4j(orderDF.select($"customerID", $"orderID"), NodeInfo("Customer", "customerID"), "ORDERED", NodeInfo("Order", "orderID"))
writeRelsToNeo4j(orderDF.select($"orderID", $"addressID"), NodeInfo("Order", "orderID"), "SHIPPED_TO", NodeInfo("Address", "addressID"))