In [None]:
%%spark

// Set base path
val absfss_Base_Path = "abfss://Neo4j_Workspace@onelake.dfs.fabric.microsoft.com/Northwind_Lakehouse.Lakehouse/Files/Northwind/"

// Import required libraries
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper

// Create Spark session
val spark = SparkSession.builder().appName("Neo4j Notebook").getOrCreate()

// Load JSON file
val jsonString = spark.read.text(absfss_Base_Path + "neo4j-conn.json").as[String].collect().mkString("\n")

// Parse JSON string
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val data = mapper.readValue[Map[String, Any]](jsonString)

// Extract Neo4j connection details
val neo4jUrl = data("neo4j-url").asInstanceOf[String]
val neo4jUsername = data("neo4j-username").asInstanceOf[String]
val neo4jPassword = data("neo4j-password").asInstanceOf[String]

// Read Northwind data files
val customerDF = spark.read.option("header", true).csv(absfss_Base_Path + "customers.csv")
val supplierDF = spark.read.option("header", true).csv(absfss_Base_Path + "suppliers.csv")
val orderDF = spark.read.option("header", true).csv(absfss_Base_Path + "orders.csv")
val orderDetailDF = spark.read.option("header", true).csv(absfss_Base_Path + "order-details.csv")
val productDF = spark.read.option("header", true).csv(absfss_Base_Path + "products.csv")
val categoryDF = spark.read.option("header", true).csv(absfss_Base_Path + "categories.csv")

// Cypher statements
val cypherStmtCreateCustConstraint =
  "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Customer) REQUIRE (n.customerID) IS NODE KEY;"
val cypherStmtCreateOrderConstraint =
  "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Order) REQUIRE (n.orderID) IS NODE KEY;"
val cypherStmtCreateProdConstraint =
  "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Product) REQUIRE (n.productID) IS NODE KEY;"
val cypherStmtCreateSupplierConstraint =
  "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Supplier) REQUIRE (n.supplierID) IS NODE KEY;"

// Execute Cypher statements
def executeCypher(spark: SparkSession, cypherStatement: String): Unit = {
  spark
    .read.format("org.neo4j.spark.DataSource")
    .option("url", neo4jUrl)
    .option("authentication.basic.username", neo4jUsername)
    .option("authentication.basic.password", neo4jPassword)
    .option("query", cypherStatement)
    .load()
}

// Execute Cypher statements to create constraints
executeCypher(spark, cypherStmtCreateCustConstraint)
executeCypher(spark, cypherStmtCreateOrderConstraint)
executeCypher(spark, cypherStmtCreateProdConstraint)
executeCypher(spark, cypherStmtCreateSupplierConstraint)


// Write data to Neo4j
def writeToNeo4j(dataFrame: DataFrame, label: String, nodeKey: String, relationship: Option[String] = None): Unit = {
  dataFrame.write.format("org.neo4j.spark.DataSource")
    .mode(SaveMode.Overwrite)
    .option("url", neo4jUrl)
    .option("authentication.basic.username", neo4jUsername)
    .option("authentication.basic.password", neo4jPassword)
    .option("labels", label)
    .option("node.keys", nodeKey)
    .save()
}

// Write nodes to Neo4j
writeToNeo4j(customerDF, ":Customer", "customerID")
writeToNeo4j(supplierDF, ":Supplier", "supplierID")
writeToNeo4j(orderDF, ":Order", "orderID")
writeToNeo4j(orderDetailDF, ":Order_Detail", "orderID")
writeToNeo4j(productDF, ":Product", "productID")
writeToNeo4j(categoryDF, ":Category", "categoryID")

// Write relationships to Neo4j
writeToNeo4j(productDF, ":Product", "productID", Some("PRODUCT-CATEGORY"))
writeToNeo4j(productDF, ":Supplier", "supplierID", Some("SUPPLIER-PRODUCT"))
writeToNeo4j(orderDetailDF, ":Order", "orderID", Some("ORDER-PRODUCT"))
writeToNeo4j(orderDF, ":Customer", "customerID", Some("CUSTOMER-ORDERED"))
