New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fixes #503: Not able to Insert Neo4j Map Data type using the neo4j-spark connector #507
Conversation
… the neo4j-spark connector
s""" | ||
|The field `${field.name}` which has a map {${scalaMap.mkString(", ")}} | ||
|contains the following duplicated keys: [${dupKeys.mkString(", ")}], | ||
|you will loose information stored in these keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|you will loose information stored in these keys | |
|you will lose some of the values associated with these duplicate keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall this warning be logged in the implicit directly instead? I'm not sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that, but I preferred to leave the implementation clean and then ad the business logic for checking duplicates in the Service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
common/src/main/scala/org/neo4j/spark/util/Neo4jImplicits.scala
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/neo4j/spark/service/MappingService.scala
Outdated
Show resolved
Hide resolved
Because the information that we'll store into Neo4j will be this: | ||
|
||
---- | ||
MyNodeWithMapFlattend { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MyNodeWithMapFlattend { | |
MyNodeWithFlattenedMap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
---- | ||
The field `table` which has a map {key.inner -> {key=innerValue}, key -> {inner.key=value}} | ||
contains the following duplicated keys: [table.key.inner.key], | ||
you will loose information stored in these keys |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see one of my first suggestion
if you accept it, you will need to change the message here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
val dupKeys = scalaMap.flattenKeys(field.name) | ||
.groupBy(identity) | ||
.collect { case (x, List(_,_,_*)) => x } | ||
if (dupKeys.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, it's a pity to iterate over the map twice, could we move the duplicate check into flattenMap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also thinking that we could actually aggregate values in a collection when there are duplicates (with the caveat that the order is not deterministic is the initial map does not guarantee any iteration order)
since it's a breaking change, could we maybe allow this behavior only if a specific config flag is set (which would default to false
-> the default value can change at the next major version)
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's eliminate logging because if we have a Dataframe with a column map with collisions with a count of 5b we risk printing the warning 5b times, which is not super...
2ac57bf
to
a07d708
Compare
@@ -178,6 +178,23 @@ class Neo4jImplicitsTest { | |||
Assert.assertEquals(expected, actual) | |||
} | |||
|
|||
@Test | |||
def `should not handle collision by aggregating values`(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def `should not handle collision by aggregating values`(): Unit = { | |
def `should handle collision by aggregating values`(): Unit = { |
=== Group duplicated keys to array of values | ||
|
||
You can use the option `schema.map.group.duplicate.keys` to avoid this problem, what the connector will do is to group all and only the values with duplicate keys into an array with all the values. The default value for the option is `false`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the option `schema.map.group.duplicate.keys` to avoid this problem, what the connector will do is to group all and only the values with duplicate keys into an array with all the values. The default value for the option is `false`. | |
You can use the option `schema.map.group.duplicate.keys` to avoid this problem. The connector will group all the values with the same keys into an array. The default value for the option is `false`. |
|) | ||
|RETURN count(n) | ||
|""".stripMargin) | ||
.peek() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.peek() | |
.single() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same above
fixes #503