New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Neo4jIO #4488
Support Neo4jIO #4488
Conversation
Codecov Report
@@ Coverage Diff @@
## main #4488 +/- ##
==========================================
- Coverage 60.48% 60.18% -0.31%
==========================================
Files 275 279 +4
Lines 9882 10086 +204
Branches 438 840 +402
==========================================
+ Hits 5977 6070 +93
- Misses 3905 4016 +111
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
Frankly I couldn't come up with a good way to parallelise a Cypher query in a generic way. If you would read just nodes or relationships you could split up the natural internal ID address space. For a generic cypher query it's not that trivial. |
Thanks for your answer! I may be mistaken, but that indeed prevents using Splitable DoFn to get parallel IO, however it is still possible to have a naïve With To say things differently, this is not about the parallelisation feature, but the availability of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the contribution!
This looks quite good.
Maybe we can also create a very basic use-case in scio-example
if you know one
scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIoOptions.scala
Outdated
Show resolved
Hide resolved
scio-neo4j/src/main/scala/com/spotify/scio/neo4j/Neo4jIoOptions.scala
Outdated
Show resolved
Hide resolved
@turb for parallel reading in jdbc, we have the
we can potentially do smth like this sc.paralellize(Range(min, shard1), Range(shard1, shard2), Range(shard2, max)) // to be computed ?
.applyTransform(
beam.Neo4jIO
.readAll[Unit, T]()
...
.withCypher(shardedCypher) // ensures we have $min $max filter
.withParametersFunction((r: Range) => Map("min" -> r.min, "max" -> r.max)) |
Added one in fixup 7669f0a |
I suppose @mattcasters has an opinion on it. However, since the Beam impl is still experimental, is it a good idea to build this kind of pattern on top of it yet? |
IMHO it's more for further direction. |
I'd be happy to consider any and all changes to Neo4jIO. It was really hard to get it across the finish line because of all the new rules and regulations that are in effect in the Beam project which do not apply to existing code like JdbcIO. Because of this, both IOs are not the same. Neo4jIO was a bit of a challenge since there was no other clean code for an IO available. Months was spent on it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took some liberty and pushed on your branch some integration tests for neo4j.
Playing with the API raised few questions.
"""UNWIND $origin AS origin | ||
|MATCH | ||
| (m:Movie {name: origin.movie}), | ||
| (c:Country {name: origin.country}) | ||
|CREATE (m)-[:ORIGIN]->(c) | ||
|""".stripMargin, | ||
"origin", | ||
{ case (movie, country) => Map("movie" -> movie, "country" -> country) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing here
.saveAsNeo4j(neo4jOptions, cypher)(x => Map(...))
As a neo4j newbie, I'm also confused by the requirement of the UNWIND
and the obligation to declare the unwindMapName
As I understand, this is to enable batching.
Can this be hidden to the user ?
MATCH
(m:Movie {name: $movie}),
(c:Country {name: $country})
CREATE (m)-[:ORIGIN]->(c)
could be automatically transformed with a carefully picked unwinded name, replacing the $
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a neo4j newbie, I'm also confused by the requirement of the UNWIND and the obligation to declare the unwindMapName
As I understand, this is to enable batching.Can this be hidden to the user ?
Maybe so. however, I hereby notify — bother — @mattcasters again, since he is the best expert we can find here, I believe!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify the UNWIND situation: I should mention that without UNWIND you really can't get decent performance with Neo4j. It is also painful to note that the dialect for UNWIND is a bit different from regular Cypher statements. However, I'm told this is something my colleagues in engineering are working on for the upcoming version 5 of Neo4j. I propose to wait until that situation becomes a bit clearer, before making additional changes. Fingers crossed.
That's great! Many thanks. I have locally updated the Neo4j docker image pulled by testContainers to 4.4.10 from 3.5.0, as the later has no Apple Silicon version, hence runs forever and timeouts on my laptop. Should I share this change? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As many of our API actually uses typeclasses, I think we can leverage this in also for the neo4jIO
RowMapper
would be the read typeclass to transformRecord
toT
ParametersBuilder
would extract theMap[String, AnyRef]
parameter map fromT
I'm less sure of the 2nd, if we should actually leave if as a function instead of a typeclass
object WriteParam { | ||
private[neo4j] val BeamDefaultBatchSize = 5000L | ||
} | ||
final case class WriteParam(batchSize: Long = WriteParam.BeamDefaultBatchSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use batchSize
as a write param, we can have common Neo4jOptions
private[neo4j] val unwindMapName: String = cypher match { | ||
case Neo4jIO.UnwindParameterRegex(name) => name | ||
case _ => | ||
throw new IllegalArgumentException( | ||
s"""Expected unwind cypher with parameter but got: | ||
|$cypher | ||
|See: https://neo4j.com/docs/cypher-manual/current/clauses/unwind/#unwind-creating-nodes-from-a-list-parameter | ||
|""".stripMargin | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to be smart here: we can skip passing unwindMapName
manually and extract if from the query.
Since we only support writeUnwind
I think this is safe
|
||
val modifiedEntities = entities.map(e => e.copy(property = e.property.map(_ + " modified"))) | ||
|
||
modifiedEntities.write(Neo4jWrite(Neo4jWriteOptions(neo4jConf, writeCypher, "rows", setter))) | ||
modifiedEntities.saveAsNeo4j(neo4jConf, writeCypher) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adapted Neo4JExample
with the new interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks I missed that part. Also noticed that there already a exists a typeclass out there as you mentioned in the comment.
we'll do a squash merge for the whole PR |
* @param cypher | ||
* Neo4J cypher query | ||
*/ | ||
def neo4jCypher[T: RowMapper: Coder](neo4jOptions: Neo4jOptions, cypher: String): SCollection[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of an implicit RowMapper
, it'd be more idiomatic to provide a function Record => T
and just pass that directly to the read method. See e.g. https://github.com/spotify/scio/blob/main/scio-jdbc/src/main/scala/com/spotify/scio/jdbc/JdbcOptions.scala#L79
If a mapper could be easily derived via magnolify we should introduce a type there and potentially also support that as a shorthand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EmptyTap | ||
} | ||
|
||
override def tap(params: ReadP): Tap[Nothing] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it for some reason not possible to provide a tap to read these results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is too difficult to get the query that represent the written data I think
neo4jOptions: Neo4jOptions, | ||
unwindCypher: String, | ||
batchSize: Long = WriteParam.BeamDefaultBatchSize | ||
)(implicit params: ParametersBuilder[T]): ClosedTap[Nothing] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to RowMapper
, it'd be more idiomatic to provide this as a function rather than an implicit parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now have a single Neo4jIO
supporting both read and write.
Conversion is handled by magnolify
typeclass for Value
.
Record
result is a 'neighbor' of Value
, and only allows indexed based access.
Here we loose this feature. Single value or tuples aren't supported on the read side
implicit private def recordConverter(record: Record): Value = | ||
Values.value(record.asMap(identity)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By doing this, we loose indexed based access capability
.withTransactionConfig(neo4jOptions.transactionConfig) | ||
.withCypher(cypher) | ||
.withRowMapper(neo4jType.from(_)) | ||
.withCoder(CoderMaterializer.beam(sc, coder)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coder was missing
Support the experimental
org.apache.beam.sdk.io.neo4j.Neo4jIO
introduced in Beam 2.38. This is inspired byJdbcIO
.Request for comment: Beam only provides
readAll
, hence the usage ofsc.parallelize(Seq(())).applyTransform
to simulate aread
.read
is not provided in Beam.