New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fixes #163: Add support to partitioning #164
Conversation
@@ -177,6 +188,13 @@ every single node property as column prefixed by `source` or `target` | |||
|`sample` | |||
|No | |||
|
|||
|`partitions` | |||
|This defines the parallelization level while pulling data from Neo4j. | |||
N.b. As More parallelization does not mean more performances so please tune wisely in according to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"As more parallelization does not mean better performance, choose this setting wisely"
Note the interaction with "count". I think what you mean is that if the count query returns 100 and partitions=5, you're going to be getting 5 partitions of 20 records, using skip/limit. You should just say that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also mention that it only makes sense when you're pulling more than X (where X can be 1M or 10M)
```cypher | ||
MATCH (p:Person)-[r:BOUGHT]->(pr:Product) | ||
WHERE pr.name = 'An Awesome Product' | ||
RETUNR count(p) AS count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo return
2. Relationship extraction | ||
3. Query extraction | ||
|
||
We adopt generally provide a general count on what you're trying to pull of and add build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
provide an example here, this isn't very clear. For example, I'm loading just Person nodes. I set partitions=5. This turns into:
MATCH (p:Person) RETURN p LIMIT 20 SKIP 0
MATCH (p:Person) RETURN p LIMIT 20 SKIP 20
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory we could also leverage cursors:
- given there is a unqiue constraint
- we can return the data ordered (using index backed order by) with a limit
- then we can take the last value returned and add an
WHERE n.id > $cursor
which then would only query from that value - pure skip limit queries need to re-execute and throw away, so they get slower over time
While for (1) and (2) we leverage the Neo4j count store in order to retrieve the total count | ||
about the nodes/relationships we're trying pulling off, for the (3) we have two possible approaches: | ||
|
||
* Compute a count over the query that we're using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would this be done in the general query case? Or it can't be done, and using more than > 1 partition with a query where no count is specified will fail?
} | ||
} | ||
|
||
def countForRelationship(): Long = try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These count methods, please think about failure methods. In some of these APOC calls you are doing you are coupling down to the string format that APOC returns in a particular field. Remembering that APOC will get put into product at some point, and the string format of return columns isn't promised, this code has some potential to break at some point in the future due to a new published APOC, just as an example of what can go wrong.
Suppose the user specifies paralleism of 5, and everything goes wrong with the count feature, and there's no way to obtain a valid count. What happens then? My vote would be for a debug message, plus reverting to reading into one partition (with no need for counts)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's the extra possibility that the user just regular screws up the count query. Say I specify the count query like this:
MATCH (p:Person)-[:BOUGHT]->(pr:Product)
RETURN count(pr) - 10 as count
Doesn't really matter how or why, but the count is wrong.
In this case, you might try to fetch one extra "empty" partition. If the user says 5 partitions, try to fetch 6, where the 6th partition has one record (SKIP whatever LIMIT 1). If you get nothing, that's good. If you get an actual record here, this is a warning that the user has messed up the partition. Another warning is if any of your partitions end up empty, then chances are good the user messed up the count query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just allow to specifiy a literal count value in the property
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's always good to overfetch for stuff like that in SDN we used to pull pages with pagesize+1 so we could say "there is a next page"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be sure should we over-fetch always or only if the user provides the query count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just allow to specifiy a literal count value in the property
the user can define a query like this RETURN 50 AS count
is it fine? Or we allow to simply pass .option("query.count", "50")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added .option("query.count", "50")
as valid option
assertEquals(5, partitionedDf.rdd.getNumPartitions) | ||
assertEquals(150, partitionedDf.collect().map(_.getAs[Long]("person")).toSet.size) | ||
|
||
val partitionedQueryCountDf = ss.read.format(classOf[DataSource].getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add tests:
- What happens when query.count is invalid, or fails to return a "count" field, or returns a "count" field that has nonsense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We provided a check if was a READ_ONLY query, I added a check about the returned identifier, that must be only count
.
What do you mean nonsense? About type? That will be a runtime error because in my knowledge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
github seems trucated my comment 😥
... in my knowledge there is no way to get the return type from the explain
Actually the optimized version should be:
which uses get-degree |
|Query count, used only in combination with `query` option, it's a query that returns a `count` | ||
field like the following | ||
``` | ||
MATCH (p:Person)-[r:BOUGHT]->(pr:Product) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my optimized version in the other comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also typo in RETURN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I also opt out of the count query and just provide a literal value if I know how much data is there?
That would be really useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
"""CALL apoc.meta.relTypeProperties({ includeRels: $rels }) YIELD sourceNodeLabels, targetNodeLabels, | ||
| propertyName, propertyTypes | ||
|WITH * | ||
|WHERE sourceNodeLabels = $sourceLabels AND targetNodeLabels = $targetLabels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we rather pass this to the proc? seems a big waste. If it's based on apoc.meta.* it should support include and exclude functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately at this moment, the procedure does not take those parameters as configuration params
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh ok, that's really unfortunate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we can change that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
// TODO get back to Cypher DSL | ||
options.nodeMetadata.labels | ||
.map(label => { | ||
val query = s"""MATCH (${Neo4jUtil.NODE_ALIAS}:$label) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can do a union query that uses
MATCH (:
$label) RETURN { type: '$label', count:count(*) } as counts UNION ALL ...
|""".stripMargin).single().get("count")) | ||
.map(count => if (count.isNull) 0L else count.asLong()) | ||
.min | ||
Math.min(minFromSource, minFromTarget) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why min and not max?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because I use the most selective, if we have this use use-case:
val df = ss.read.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "false")
.option("relationship.source.labels", ":Person:RegisteredCustomer")
.option("relationship.target.labels", ":Product")
.load()
This will get three counts:
(:Person)-[:BOUGHT]->()
=> 1000 rows(:RegisteredCustomer)-[:BOUGHT]->()
=> 250 rows()-[:BOUGHT]->(:Product)
=> 1000
we use the most selective.
val query = if (options.queryMetadata.queryCount.trim.nonEmpty) { | ||
options.queryMetadata.queryCount | ||
} else { | ||
s"""CALL { ${options.query.value} } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clever use of subquery
I used this in order to create the queries: https://neo4j.com/developer/kb/fast-counts-using-the-count-store/ |
@jexp lemme know if there are no further comments |
private val query: String = new Neo4jQueryService(options, new Neo4jQueryReadStrategy(filters)).createQuery() | ||
private val query: String = new Neo4jQueryService(options, new Neo4jQueryReadStrategy(filters)) | ||
.createQuery() | ||
.concat(if (skip != -1 && limit != -1) s" SKIP $$skip LIMIT $$limit" else "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nicer if we had a proper method on createQuery i.e. pass it to the query builder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
with InputPartitionReader[InternalRow] | ||
with Logging { | ||
|
||
private var result: Iterator[Record] = _ | ||
private var session: Session = _ | ||
private var transaction: Transaction = _ | ||
private val driverCache: DriverCache = new DriverCache(options.connection, jobId) | ||
private val driverCache: DriverCache = new DriverCache(options.connection, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need the partitionid in the driver cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because otherwise in a multi-partition env the first one that closes the driver cache close it for everyone.
queryReadStrategy.createStatementForRelationshipCount(options) | ||
} | ||
log.info(s"Executing the following counting query on Neo4j: $query") | ||
session.run(query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you map them back to the source node / rel-type? or don't you need that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean?
|
||
def countForRelationshipWithQuery(filters: Array[Filter]): Long = { | ||
val query = if (filters.isEmpty) { | ||
val sourceQueries = options.relationshipMetadata.source.labels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't we use apoc.meta.stats here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use it, this is the fallback in case it's not present, look here:
https://github.com/neo4j-contrib/neo4j-spark-connector/pull/164/files#diff-85bfb2242b03b769ed6b534ff8992f88R255
if (count <= 0) { | ||
Seq.empty | ||
} else { | ||
val partitionSize = Math.ceil(count / options.partitions).toLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this a whole number division? so ceil
has no effect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch thank you!!!
@@ -53,7 +55,8 @@ object Neo4jUtil { | |||
case _ => autoCloseable.close() | |||
} | |||
} catch { | |||
case _ => throw new Exception("This exception should be logged") // @todo Log | |||
case t: Throwable => if (logger != null) logger | |||
.error(s"Cannot close ${autoCloseable.getClass.getSimpleName} because of the following exception:", t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should not be an error, at most a warning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
override def readSchema(): StructType = callSchemaService { schemaService => schemaService | ||
.struct() } | ||
|
||
private def callSchemaService[T](lambda: SchemaService => T): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supplier ? or factory? instaed of lambda
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the name to function
if you agree I would leave it with a lambda which keeps the code very simple
lambda(schemaService) | ||
} catch { | ||
case e: Throwable => { | ||
hasError = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will logged by Spark, I can add it but it would be redundant
val neo4jPartitions = if (partitionSkipLimit.isEmpty) { | ||
Seq(new Neo4jInputPartitionReader(neo4jOptions, filters, schema, jobId)) | ||
} else { | ||
partitionSkipLimit.zipWithIndex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is not very readable, can we use a caseclass instead of the triple?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -19,20 +21,43 @@ class Neo4jDataSourceReader(private val options: DataSourceOptions, private val | |||
private val neo4jOptions: Neo4jOptions = new Neo4jOptions(options.asMap()) | |||
.validate(options => Validations.read(options, jobId)) | |||
|
|||
override def readSchema(): StructType = { | |||
val schemaService = new SchemaService(neo4jOptions, jobId) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this whole new section is quite hard to understand / reason about, can you do somethign with the naming of methods/params or add a few comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please have a look at my comments
Thank @jexp for the feedback I'm merging this |
fixes #163
We support partitions via skip/limit queries. In order to do so, we compute the total count, for nodes/relationships we leverage the Neo4j count store in order to retrieve the total count about the nodes/relationships we're trying pulling off, for the query we have two possible approaches:
it via the
.option("query.count", "<your cypher query>")
the query must always return onlyone field named
count
which is the result of the count. ie.: