Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neo4j export #91

Merged
merged 10 commits into from Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2,6 +2,7 @@ package com.lynxanalytics.biggraph.frontend_operations

import com.lynxanalytics.biggraph.SparkFreeEnvironment
import com.lynxanalytics.biggraph.graph_api.Scripting._
import com.lynxanalytics.biggraph.graph_util.Scripting._
import com.lynxanalytics.biggraph.graph_operations
import com.lynxanalytics.biggraph.controllers._

Expand Down Expand Up @@ -153,4 +154,98 @@ class ExportOperations(env: SparkFreeEnvironment) extends OperationRegistry {
.map(_ => wc.createSnapshotFromState(user, params("path"), getState))
}
})

abstract class Neo4jAttributeExport(context: Operation.Context) extends TriggerableOperation(context) {
params ++= List(
Param("url", "Neo4j connection", defaultValue = "bolt://localhost:7687"),
Param("username", "Neo4j username", defaultValue = "neo4j"),
Param("password", "Neo4j password", defaultValue = "neo4j"),
NonNegInt("version", "Export repetition ID", default = 1))
lazy val project = projectInput("graph")
val nodesOrRelationships: String
override def enabled = FEStatus.enabled
override def trigger(wc: WorkspaceController, gdc: GraphDrawingController): scala.concurrent.Future[Unit] = {
gdc.getComputeBoxResult(List(exportResult.gUID))
}
override def getOutputs(): Map[BoxOutput, BoxOutputState] = {
params.validate()
Map(context.box.output(
context.meta.outputs(0)) -> BoxOutputState.from(exportResult, params.toMap - "password"))
}
def getAttribute(a: String): com.lynxanalytics.biggraph.graph_api.Attribute[_]
def exportResult() = {
val keys = splitParam("keys")
val attrs = (splitParam("to_export") ++ keys).toSet.toList
val t = graph_operations.AttributesToTable.run(attrs.map(a => a -> getAttribute(a)))
assert(keys.nonEmpty, "You have to choose one or more attributes to use as the keys for identifying the nodes in Neo4j.")
val op = graph_operations.ExportAttributesToNeo4j(
params("url"), params("username"), params("password"), params("labels"), keys,
params("version").toInt, nodesOrRelationships)
op(op.t, t).result.exportResult
}
}

registerOp(
"Export vertex attributes to Neo4j", defaultIcon, ExportOperations,
List("graph"), List("exported"), new Neo4jAttributeExport(_) {
import Operation.Implicits._
params ++= List(
Param("labels", "Node labels", defaultValue = ""),
Choice("keys", "Attribute(s) to use as key",
// Cannot join on internal ID ("<id>") and stuff like that.
options = project.vertexAttrList.filter(!_.id.startsWith("<")), multipleChoice = true),
Choice("to_export", "Exported attributes", options = project.vertexAttrList, multipleChoice = true))
val nodesOrRelationships = "nodes"
def getAttribute(a: String) = project.vertexAttributes(a)
})
registerOp(
"Export edge attributes to Neo4j", defaultIcon, ExportOperations,
List("graph"), List("exported"), new Neo4jAttributeExport(_) {
import Operation.Implicits._
params ++= List(
Param("labels", "Relationship labels", defaultValue = ""),
Choice("keys", "Attribute(s) to use as key",
// Cannot join on internal ID ("<id>") and stuff like that.
options = project.edgeAttrList.filter(!_.id.startsWith("<")), multipleChoice = true),
Choice("to_export", "Exported attributes", options = project.edgeAttrList, multipleChoice = true))
val nodesOrRelationships = "relationships"
def getAttribute(a: String) = project.edgeAttributes(a)
})

registerOp(
"Export graph to Neo4j", defaultIcon, ExportOperations,
List("graph"), List("exported"), new TriggerableOperation(_) {
import Operation.Implicits._
params ++= List(
Param("url", "Neo4j connection", defaultValue = "bolt://localhost:7687"),
Param("username", "Neo4j username", defaultValue = "neo4j"),
Param("password", "Neo4j password", defaultValue = "neo4j"),
NonNegInt("version", "Export repetition ID", default = 1),
Choice("node_labels", "Attribute with node labels",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that the value of this attribute can be a comma separated list of label values if user wants multiple labels. Would be more elegant to allow vectors here, but probably now is not the time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that the value of this attribute can be a comma separated list of label values if user wants multiple labels. Would be more elegant to allow vectors here, but probably now is not the time.

I expect you usually just use one label. Described the commas in the help.

options = List(FEOption("", "")) ++ project.vertexAttrList[String]),
Choice("relationship_type", "Attribute with relationship type",
options = List(FEOption("", "")) ++ project.edgeAttrList[String]))
lazy val project = projectInput("graph")
override def enabled = FEStatus.enabled
override def trigger(wc: WorkspaceController, gdc: GraphDrawingController): scala.concurrent.Future[Unit] = {
gdc.getComputeBoxResult(List(exportResult.gUID))
}
override def getOutputs(): Map[BoxOutput, BoxOutputState] = {
params.validate()
Map(context.box.output(
context.meta.outputs(0)) -> BoxOutputState.from(exportResult, params.toMap - "password"))
}
def exportResult() = {
val vsAttr = project.vertexAttributes.toMap +
(graph_operations.ExportGraphToNeo4j.VID -> project.vertexSet.idAttribute)
val esAttr = project.edgeAttributes.toMap +
(graph_operations.ExportGraphToNeo4j.SRCDST -> project.edgeBundle.srcDstAttribute)
val vs = graph_operations.AttributesToTable.run(vsAttr)
val es = graph_operations.AttributesToTable.run(esAttr)
val op = graph_operations.ExportGraphToNeo4j(
params("url"), params("username"), params("password"), params("node_labels"),
params("relationship_type"), params("version").toInt)
op(op.vs, vs)(op.es, es).result.exportResult
}
})
}
Expand Up @@ -167,6 +167,15 @@ class ImportOperations(env: SparkFreeEnvironment) extends ProjectOperations(env)
Map(context.box.output(context.meta.outputs(0)) -> BoxOutputState.from(state))
}

def isNameValid(name: String) = {
try {
SubProject.validateName(name)
true
} catch {
case e: AssertionError => false
}
}

override def runImport(env: com.lynxanalytics.biggraph.BigGraphEnvironment): ImportResult = {
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types
Expand Down Expand Up @@ -202,6 +211,16 @@ class ImportOperations(env: SparkFreeEnvironment) extends ProjectOperations(env)
val esp = table.toProject.viewer
val srcAttr = esp.vertexAttributes("<source.id>").asString
val dstAttr = esp.vertexAttributes("<target.id>").asString
def pullAttributes(embedding: EdgeBundle) = {
for ((name, attr) <- esp.vertexAttributes) {
// LynxKite attribute names cannot have '.'.
val renamed = name.replace(".", "_")
// Skip names that have other issues.
if (isNameValid(renamed)) {
project.edgeAttributes(renamed) = attr.pullVia(embedding)
}
}
}
if (params("vertex_query").isEmpty) {
val es = {
val op = graph_operations.VerticesToEdges()
Expand All @@ -210,19 +229,13 @@ class ImportOperations(env: SparkFreeEnvironment) extends ProjectOperations(env)
project.vertexSet = es.vs
project.newVertexAttribute("<id>", es.stringId)
project.edgeBundle = es.es
for ((name, attr) <- esp.vertexAttributes) {
// LynxKite attribute names cannot have '.'.
project.edgeAttributes(name.replace(".", "_")) = attr.pullVia(es.embedding)
}
pullAttributes(es.embedding)
} else {
val idAttr = project.vertexAttributes("<id>").asString
val es = graph_operations.ImportEdgesForExistingVertices.run(
idAttr, idAttr, srcAttr, dstAttr)
project.edgeBundle = es.edges
for ((name, attr) <- esp.vertexAttributes) {
// LynxKite attribute names cannot have '.'.
project.edgeAttributes(name.replace(".", "_")) = attr.pullVia(es.embedding)
}
pullAttributes(es.embedding)
}
}
val state = json.Json.toJson(project.state).toString
Expand Down
139 changes: 139 additions & 0 deletions app/com/lynxanalytics/biggraph/graph_operations/ExportToNeo4j.scala
@@ -0,0 +1,139 @@
// Backend operations for Neo4j export.
package com.lynxanalytics.biggraph.graph_operations
import com.lynxanalytics.biggraph.graph_api._
import com.lynxanalytics.biggraph.graph_util.Timestamp
import org.apache.spark

object ExportAttributesToNeo4j extends OpFromJson {
class Input extends MagicInputSignature {
val t = table
}
class Output(implicit instance: MetaGraphOperationInstance) extends MagicOutput(instance) {
val exportResult = scalar[String]
}
def fromJson(j: JsValue) = ExportAttributesToNeo4j(
(j \ "url").as[String], (j \ "username").as[String], (j \ "password").as[String],
(j \ "labels").as[String], (j \ "keys").as[Seq[String]], (j \ "version").as[Long],
(j \ "nodesOrRelationships").as[String])
}

// Makes it easy to send a DataFrame to a specified Neo4j instance.
case class Neo4jConnectionParameters(url: String, username: String, password: String) {
def send(df: spark.sql.DataFrame, query: String) {
df.write
.format("org.neo4j.spark.DataSource")
.option("authentication.type", "basic")
.option("authentication.basic.username", username)
.option("authentication.basic.password", password)
.option("url", url)
.option("query", query)
.save()
}
}

case class ExportAttributesToNeo4j(
url: String, username: String, password: String, labels: String, keys: Seq[String],
version: Long, nodesOrRelationships: String)
extends SparkOperation[ExportAttributesToNeo4j.Input, ExportAttributesToNeo4j.Output] {
val neo = Neo4jConnectionParameters(url, username, password)
@transient override lazy val inputs = new ExportAttributesToNeo4j.Input()
def outputMeta(instance: MetaGraphOperationInstance) = new ExportAttributesToNeo4j.Output()(instance)
override def toJson = Json.obj(
"url" -> url, "username" -> username, "password" -> password, "labels" -> labels,
"keys" -> keys, "version" -> version, "nodesOrRelationships" -> nodesOrRelationships)
def execute(
inputDatas: DataSet,
o: ExportAttributesToNeo4j.Output,
output: OutputBuilder,
rc: RuntimeContext): Unit = {
implicit val ds = inputDatas
// Drop null keys.
val df = keys.foldLeft(inputs.t.df)((df, key) => df.filter(df(key).isNotNull))
val keyMatch = keys.map(k => s"`$k`: event.`$k`").mkString(", ")
val query = nodesOrRelationships match {
case "nodes" => s"MATCH (n$labels {$keyMatch}) SET n += event"
case "relationships" => s"MATCH ()-[r$labels {$keyMatch}]-() SET r += event"
}
neo.send(df, query)
val exportResult = "Export done."
output(o.exportResult, exportResult)
}
}

object ExportGraphToNeo4j extends OpFromJson {
class Input extends MagicInputSignature {
val vs = table
val es = table
}
class Output(implicit instance: MetaGraphOperationInstance) extends MagicOutput(instance) {
val exportResult = scalar[String]
}
def fromJson(j: JsValue) = ExportGraphToNeo4j(
(j \ "url").as[String], (j \ "username").as[String], (j \ "password").as[String],
(j \ "nodeLabelsColumn").as[String], (j \ "relationshipTypeColumn").as[String],
(j \ "version").as[Long])
val VID = "!LynxKite ID"
val SRCDST = "!LynxKite endpoint IDs"
val SRCID = "!Source LynxKite ID"
val DSTID = "!Destination LynxKite ID"
}

case class ExportGraphToNeo4j(
url: String, username: String, password: String, nodeLabelsColumn: String,
relationshipTypeColumn: String, version: Long)
extends SparkOperation[ExportGraphToNeo4j.Input, ExportGraphToNeo4j.Output] {
import ExportGraphToNeo4j._
val neo = Neo4jConnectionParameters(url, username, password)
@transient override lazy val inputs = new ExportGraphToNeo4j.Input()
def outputMeta(instance: MetaGraphOperationInstance) = new ExportGraphToNeo4j.Output()(instance)
override def toJson = Json.obj(
"url" -> url, "username" -> username, "password" -> password,
"nodeLabelsColumn" -> nodeLabelsColumn, "relationshipTypeColumn" -> relationshipTypeColumn,
"version" -> version)
def execute(
inputDatas: DataSet,
o: ExportGraphToNeo4j.Output,
output: OutputBuilder,
rc: RuntimeContext): Unit = {
implicit val ds = inputDatas
val F = spark.sql.functions
// Prefix the internal IDs with the timestamp so different exports don't collide.
// Also save the timestamp so the created entities can be easily cleaned up.
val timestamp = F.lit(Timestamp.human)
val vs = inputs.vs.df
.withColumn(VID, F.concat(timestamp, F.lit(" "), F.col(VID)))
.withColumn("!LynxKite export timestamp", timestamp)
val es = inputs.es.df
.withColumn(SRCID, F.concat(timestamp, F.lit(" "), F.col(SRCDST + "._1")))
.withColumn(DSTID, F.concat(timestamp, F.lit(" "), F.col(SRCDST + "._2")))
.drop(SRCDST)
.withColumn("!LynxKite export timestamp", timestamp)

if (nodeLabelsColumn.isEmpty) {
neo.send(vs, s"""
CREATE (n)
SET n += event
""")
} else {
neo.send(vs, s"""
CALL apoc.create.node(split(event.`$nodeLabelsColumn`, ','), event) YIELD node
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://neo4j.com/labs/apoc/4.1/overview/apoc.create/apoc.create.node/.
This is the way to create a node with a dynamically picked label. Requires apoc to be installed.

RETURN 1
""")
}
if (relationshipTypeColumn.isEmpty) {
neo.send(es, s"""
MATCH (src {`$VID`: event.`$SRCID`}), (dst {`$VID`: event.`$DSTID`})
CREATE (src)-[r:EDGE]->(dst)
SET r += event
""")
} else {
neo.send(es, s"""
MATCH (src {`$VID`: event.`$SRCID`}), (dst {`$VID`: event.`$DSTID`})
CALL apoc.create.relationship(src, event.`$relationshipTypeColumn`, event, dst) YIELD rel
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RETURN 1
""")
}
val exportResult = "Export done."
output(o.exportResult, exportResult)
}
}
5 changes: 5 additions & 0 deletions app/com/lynxanalytics/biggraph/graph_util/Scripting.scala
Expand Up @@ -72,6 +72,11 @@ object Scripting {
val op = graph_operations.AddReversedEdges()
op(op.es, self).result.esPlus
}

def srcDstAttribute: Attribute[(ID, ID)] = {
val op = graph_operations.EdgeBundleAsAttribute()
op(op.edges, self).result.attr
}
}

implicit class RichContainedAttribute[T](
Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Expand Up @@ -95,6 +95,9 @@ libraryDependencies ++= Seq(
"javax.media" % "jai_core" % "1.1.3" from "https://repo.osgeo.org/repository/geotools-releases/javax/media/jai_core/1.1.3/jai_core-1.1.3.jar",
// Used for working with AVRO files.
"org.apache.spark" %% "spark-avro" % sparkVersion.value,
// For Neo4j tests.
"org.testcontainers" % "testcontainers" % "1.14.3" % Test,
"org.testcontainers" % "neo4j" % "1.14.3" % Test,
// Used for working with Delta tables.
"io.delta" %% "delta-core" % "0.6.1"
)
Expand Down
@@ -0,0 +1,70 @@
package com.lynxanalytics.biggraph.frontend_operations

import com.lynxanalytics.biggraph.controllers.DirectoryEntry
import com.lynxanalytics.biggraph.graph_api.Edge
import com.lynxanalytics.biggraph.graph_api.Scripting._
import com.lynxanalytics.biggraph.graph_api.GraphTestUtils._

class Neo4jContainer
extends org.testcontainers.containers.Neo4jContainer[Neo4jContainer]("neo4j:4.0.8-enterprise")

class Neo4jExportImportTest extends OperationsTestBase {
val server = new Neo4jContainer()
.withoutAuthentication
.withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes")
.withEnv("NEO4JLABS_PLUGINS", "[\"apoc\"]")

def exportExampleGraph() = {
val res = box("Create example graph")
.box("Export graph to Neo4j", Map("url" -> server.getBoltUrl)).exportResult
dataManager.get(res)
}

test("full graph export and import") {
server.start()
exportExampleGraph()
val p = importBox("Import from Neo4j", Map("url" -> server.getBoltUrl)).project
assert(p.vertexAttributes.toMap.keySet == Set(
"!LynxKite ID", "!LynxKite export timestamp", "<id>", "<labels>",
"age", "gender", "id", "income", "location", "name"))
assert(p.edgeAttributes.toMap.keySet == Set(
"<rel_id>", "<rel_type>", "<source_id>", "<target_id>", "comment", "weight"))
assert(get(p.vertexAttributes("name")).values.toSet == Set("Adam", "Bob", "Eve", "Isolated Joe"))
assert(get(p.edgeAttributes("weight")).values.toSet == Set(1.0, 2.0, 3.0, 4.0))
server.stop()
}

test("attribute export") {
server.start()
exportExampleGraph()
val g = box("Create example graph").box("Compute PageRank").box("Compute dispersion")
dataManager.get(g.box(
"Export vertex attributes to Neo4j",
Map("url" -> server.getBoltUrl, "keys" -> "name", "to_export" -> "page_rank")).exportResult)
dataManager.get(g.box(
"Export edge attributes to Neo4j",
Map("url" -> server.getBoltUrl, "keys" -> "comment", "to_export" -> "dispersion")).exportResult)
val p = importBox("Import from Neo4j", Map("url" -> server.getBoltUrl)).project
assert(get(p.vertexAttributes("page_rank")).values.toSet ==
get(g.project.vertexAttributes("page_rank")).values.toSet)
assert(get(p.edgeAttributes("dispersion")).values.toSet ==
get(g.project.edgeAttributes("dispersion")).values.toSet)
server.stop()
}

test("export with labels and types") {
server.start()
val res = box("Create example graph").box("Export graph to Neo4j", Map(
"url" -> server.getBoltUrl,
"node_labels" -> "gender", "relationship_type" -> "comment")).exportResult
dataManager.get(res)
val p = importBox("Import from Neo4j", Map("url" -> server.getBoltUrl)).project
assert(
get(p.vertexAttributes("<labels>")).values.toList
.flatMap(_.asInstanceOf[scala.collection.mutable.WrappedArray[String]]).sorted
== Seq("Female", "Male", "Male", "Male"))
assert(get(p.edgeAttributes("<rel_type>")).values.toSet
== Set("Adam loves Eve", "Bob envies Adam", "Bob loves Eve", "Eve loves Adam"))
server.stop()
}
}