Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neo4j export #91

Merged
merged 10 commits into from Oct 2, 2020
Expand Up @@ -2,6 +2,7 @@ package com.lynxanalytics.biggraph.frontend_operations

import com.lynxanalytics.biggraph.SparkFreeEnvironment
import com.lynxanalytics.biggraph.graph_api.Scripting._
import com.lynxanalytics.biggraph.graph_util.Scripting._
import com.lynxanalytics.biggraph.graph_operations
import com.lynxanalytics.biggraph.controllers._

Expand Down Expand Up @@ -153,4 +154,96 @@ class ExportOperations(env: SparkFreeEnvironment) extends OperationRegistry {
.map(_ => wc.createSnapshotFromState(user, params("path"), getState))
}
})

abstract class Neo4jAttributeExport(context: Operation.Context) extends TriggerableOperation(context) {
params ++= List(
Param("url", "Neo4j connection", defaultValue = "bolt://localhost:7687"),
Param("username", "Neo4j username", defaultValue = "neo4j"),
Param("password", "Neo4j password", defaultValue = "neo4j"),
NonNegInt("version", "Snapshot revision ID", default = 1))
lazy val project = projectInput("graph")
val nodesOrRelationships: String
override def enabled = FEStatus.enabled
override def trigger(wc: WorkspaceController, gdc: GraphDrawingController) = {
darabos marked this conversation as resolved.
Show resolved Hide resolved
gdc.getComputeBoxResult(List(exportResult.gUID))
}
override def getOutputs(): Map[BoxOutput, BoxOutputState] = {
params.validate()
Map(context.box.output(
context.meta.outputs(0)) -> BoxOutputState.from(exportResult, params.toMap - "password"))
}
def exportResult() = {
val keys = splitParam("keys")
val attrs = (splitParam("to_export") ++ keys).toSet.toList
val t = graph_operations.AttributesToTable.run(
attrs.map(a => a -> project.vertexAttributes(a)))
assert(keys.nonEmpty, "You have to choose one or more attributes to use as the keys for identifying the nodes in Neo4j.")
val op = graph_operations.ExportAttributesToNeo4j(
params("url"), params("username"), params("password"), params("labels"), keys,
params("version").toInt, nodesOrRelationships)
op(op.t, t).result.exportResult
}
}

registerOp(
"Export vertex attributes to Neo4j", defaultIcon, ExportOperations,
List("graph"), List("exported"), new Neo4jAttributeExport(_) {
import Operation.Implicits._
params ++= List(
Param("labels", "Node labels", defaultValue = ""),
Choice("keys", "Attribute to use as key",
darabos marked this conversation as resolved.
Show resolved Hide resolved
// Cannot join on internal ID ("<id>") and stuff like that.
options = project.vertexAttrList.filter(!_.id.startsWith("<")), multipleChoice = true),
Choice("to_export", "Exported attributes", options = project.vertexAttrList, multipleChoice = true))
val nodesOrRelationships = "nodes"
})
registerOp(
"Export edge attributes to Neo4j", defaultIcon, ExportOperations,
List("graph"), List("exported"), new Neo4jAttributeExport(_) {
import Operation.Implicits._
params ++= List(
Param("labels", "Relationship labels", defaultValue = ""),
Choice("keys", "Attribute to use as key",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Choice("keys", "Attribute to use as key",
Choice("keys", "Attributes to use as key",

// Cannot join on internal ID ("<id>") and stuff like that.
options = project.edgeAttrList.filter(!_.id.startsWith("<")), multipleChoice = true),
Choice("to_export", "Exported attributes", options = project.edgeAttrList, multipleChoice = true))
val nodesOrRelationships = "edges"
})

registerOp(
"Export graph to Neo4j", defaultIcon, ExportOperations,
List("graph"), List("exported"), new TriggerableOperation(_) {
import Operation.Implicits._
params ++= List(
Param("url", "Neo4j connection", defaultValue = "bolt://localhost:7687"),
Param("username", "Neo4j username", defaultValue = "neo4j"),
Param("password", "Neo4j password", defaultValue = "neo4j"),
NonNegInt("version", "Snapshot revision ID", default = 1),
Choice("node_labels", "Attribute with node labels",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that the value of this attribute can be a comma separated list of label values if user wants multiple labels. Would be more elegant to allow vectors here, but probably now is not the time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that the value of this attribute can be a comma separated list of label values if user wants multiple labels. Would be more elegant to allow vectors here, but probably now is not the time.

I expect you usually just use one label. Described the commas in the help.

options = List(FEOption("", "")) ++ project.vertexAttrList[String]),
Choice("relationship_type", "Attribute with relationship type",
options = List(FEOption("", "")) ++ project.edgeAttrList[String]))
lazy val project = projectInput("graph")
override def enabled = FEStatus.enabled
override def trigger(wc: WorkspaceController, gdc: GraphDrawingController) = {
darabos marked this conversation as resolved.
Show resolved Hide resolved
gdc.getComputeBoxResult(List(exportResult.gUID))
}
override def getOutputs(): Map[BoxOutput, BoxOutputState] = {
params.validate()
Map(context.box.output(
context.meta.outputs(0)) -> BoxOutputState.from(exportResult, params.toMap - "password"))
}
def exportResult() = {
val vsAttr = project.vertexAttributes.toMap +
(graph_operations.ExportGraphToNeo4j.VID -> project.vertexSet.idAttribute)
val esAttr = project.edgeAttributes.toMap +
(graph_operations.ExportGraphToNeo4j.SRCDST -> project.edgeBundle.srcDstAttribute)
val vs = graph_operations.AttributesToTable.run(vsAttr)
val es = graph_operations.AttributesToTable.run(esAttr)
val op = graph_operations.ExportGraphToNeo4j(
params("url"), params("username"), params("password"), params("node_labels"),
params("relationship_type"), params("version").toInt)
op(op.vs, vs)(op.es, es).result.exportResult
}
})
}
135 changes: 135 additions & 0 deletions app/com/lynxanalytics/biggraph/graph_operations/ExportToNeo4j.scala
@@ -0,0 +1,135 @@
package com.lynxanalytics.biggraph.graph_operations

import com.lynxanalytics.biggraph.graph_api._
import com.lynxanalytics.biggraph.graph_util.HadoopFile
import org.apache.spark
import org.apache.spark.sql.SaveMode

object ExportAttributesToNeo4j extends OpFromJson {
class Input extends MagicInputSignature {
val t = table
}
class Output(implicit instance: MetaGraphOperationInstance) extends MagicOutput(instance) {
val exportResult = scalar[String]
}
def fromJson(j: JsValue) = ExportAttributesToNeo4j(
(j \ "url").as[String], (j \ "username").as[String], (j \ "password").as[String],
(j \ "labels").as[String], (j \ "keys").as[Seq[String]], (j \ "version").as[Long],
(j \ "nodesOrRelationships").as[String])
}

case class Neo4jConnection(url: String, username: String, password: String) {
def send(df: spark.sql.DataFrame, query: String) {
df.write
.format("org.neo4j.spark.DataSource")
.option("authentication.type", "basic")
.option("authentication.basic.username", username)
.option("authentication.basic.password", password)
.option("url", url)
.option("query", query)
.save()
}
}

case class ExportAttributesToNeo4j(
url: String, username: String, password: String, labels: String, keys: Seq[String],
version: Long, nodesOrRelationships: String)
extends SparkOperation[ExportAttributesToNeo4j.Input, ExportAttributesToNeo4j.Output] {
val neo = Neo4jConnection(url, username, password)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I first saw this (and of course didn't realize I just reviewed the definition of this class :( ) I thought why would you put a connection kind of class up here, why not create the connection only when needed, in execute. But of course, there is no connection being made here, it just collects some metadata which will be used in a helper method of this object. So then what about naming it differently? E.g. Neo4JHelper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I first saw this (and of course didn't realize I just reviewed the definition of this class :( ) I thought why would you put a connection kind of class up here, why not create the connection only when needed, in execute. But of course, there is no connection being made here, it just collects some metadata which will be used in a helper method of this object. So then what about naming it differently? E.g. Neo4JHelper?

I changed it to Neo4jConnectionParameters.

@transient override lazy val inputs = new ExportAttributesToNeo4j.Input()
def outputMeta(instance: MetaGraphOperationInstance) = new ExportAttributesToNeo4j.Output()(instance)
override def toJson = Json.obj(
"url" -> url, "username" -> username, "password" -> password, "labels" -> labels,
"keys" -> keys, "version" -> version, "nodesOrRelationships" -> nodesOrRelationships)
def execute(
inputDatas: DataSet,
o: ExportAttributesToNeo4j.Output,
output: OutputBuilder,
rc: RuntimeContext): Unit = {
implicit val ds = inputDatas
// Drop null keys.
val df = keys.foldLeft(inputs.t.df)((df, key) => df.filter(df(key).isNotNull))
val keyMatch = keys.map(k => s"`$k`: event.`$k`").mkString(", ")
val query = nodesOrRelationships match {
case "nodes" => s"MATCH (n$labels {$keyMatch}) SET n += event"
case "relationships" => s"MATCH ()-[r$labels {$keyMatch}]-() SET r += event"
}
neo.send(df, query)
val exportResult = "Export done."
output(o.exportResult, exportResult)
}
}

object ExportGraphToNeo4j extends OpFromJson {
class Input extends MagicInputSignature {
val vs = table
val es = table
}
class Output(implicit instance: MetaGraphOperationInstance) extends MagicOutput(instance) {
val exportResult = scalar[String]
}
def fromJson(j: JsValue) = ExportGraphToNeo4j(
(j \ "url").as[String], (j \ "username").as[String], (j \ "password").as[String],
(j \ "nodeLabelsColumn").as[String], (j \ "relationshipTypeColumn").as[String],
(j \ "version").as[Long])
val VID = "!LynxKite ID"
val SRCDST = "!LynxKite endpoint IDs"
val SRCID = "!Source LynxKite ID"
val DSTID = "!Destination LynxKite ID"
}

case class ExportGraphToNeo4j(
url: String, username: String, password: String, nodeLabelsColumn: String,
relationshipTypeColumn: String, version: Long)
extends SparkOperation[ExportGraphToNeo4j.Input, ExportGraphToNeo4j.Output] {
import ExportGraphToNeo4j._
val neo = Neo4jConnection(url, username, password)
@transient override lazy val inputs = new ExportGraphToNeo4j.Input()
def outputMeta(instance: MetaGraphOperationInstance) = new ExportGraphToNeo4j.Output()(instance)
override def toJson = Json.obj(
"url" -> url, "username" -> username, "password" -> password,
"nodeLabelsColumn" -> nodeLabelsColumn, "relationshipTypeColumn" -> relationshipTypeColumn,
"version" -> version)
def execute(
inputDatas: DataSet,
o: ExportGraphToNeo4j.Output,
output: OutputBuilder,
rc: RuntimeContext): Unit = {
implicit val ds = inputDatas
val F = spark.sql.functions
// Prefix the internal IDs with this operation's GUID so different exports don't collide.
val guid = F.lit(this.gUID.toString + "-")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this is perfect! But what about adding GUID as vertex and edge attribute as well? That would allow users to clean up results of somehow screwed up exports with relative ease. Maybe we can add export date as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this is perfect! But what about adding GUID as vertex and edge attribute as well? That would allow users to clean up results of somehow screwed up exports with relative ease. Maybe we can add export date as well?

Actually the timestamp is better than the GUID. Maybe you've recreated your LynxKite instance. Maybe two LynxKite instances use the same Neo4j instance. If everyone exports the example graph, you can end up with two exports with the same GUID quick enough.

I'll switch to just using the timestamp. Easier to interpret too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

val vs = inputs.vs.df
.withColumn(VID, F.concat(guid, F.col(VID)))
val es = inputs.es.df
.withColumn(SRCID, F.concat(guid, F.col(SRCDST + "._1")))
.withColumn(DSTID, F.concat(guid, F.col(SRCDST + "._2")))
.drop(SRCDST)
if (nodeLabelsColumn.isEmpty) {
neo.send(vs, s"""
CREATE (n)
SET n += event
""")
} else {
neo.send(vs, s"""
CALL apoc.create.node(split(event.`$nodeLabelsColumn`, ','), event) YIELD node
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://neo4j.com/labs/apoc/4.1/overview/apoc.create/apoc.create.node/.
This is the way to create a node with a dynamically picked label. Requires apoc to be installed.

RETURN 1
""")
}
if (relationshipTypeColumn.isEmpty) {
neo.send(es, s"""
MATCH (src {`$VID`: event.`$SRCID`}), (dst {`$VID`: event.`$DSTID`})
CREATE (src)-[r:EDGE]->(dst)
SET r += event
""")
} else {
neo.send(es, s"""
MATCH (src {`$VID`: event.`$SRCID`}), (dst {`$VID`: event.`$DSTID`})
CALL apoc.create.relationship(src, event.`$relationshipTypeColumn`, event, dst) YIELD rel
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RETURN 1
""")
}
val exportResult = "Export done."
output(o.exportResult, exportResult)
}
}
5 changes: 5 additions & 0 deletions app/com/lynxanalytics/biggraph/graph_util/Scripting.scala
Expand Up @@ -72,6 +72,11 @@ object Scripting {
val op = graph_operations.AddReversedEdges()
op(op.es, self).result.esPlus
}

def srcDstAttribute: Attribute[(ID, ID)] = {
val op = graph_operations.EdgeBundleAsAttribute()
op(op.edges, self).result.attr
}
}

implicit class RichContainedAttribute[T](
Expand Down