New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes issue #161: Add PushDownFilters support #162
Conversation
…eo4j by a provided query
==== Schema | ||
|
||
If APOC are installed, schema will be created with `apoc.meta.relTypeProperties`. Otherwise the first 10 (or any number specified by the `schema.flatten.limit` option) results will be flattened and the schema will be create from those properties. | ||
If APOC are available, the schema will be created with `apoc.meta.relTypeProperties`. Otherwise the first 10 (or any number specified by the `schema.flatten.limit` option) results will be flattened and the schema will be create from those properties. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One sentence per line please. Makes it easier for diffs and is the rule for Neo4j related docs (asciidoc)
if (filters.nonEmpty) { | ||
val filtersMap: Map[PropertyContainer, Array[Filter]] = filters.map(filter => { | ||
if (filter.isAttribute(Neo4jUtil.RELATIONSHIP_SOURCE_ALIAS)) { | ||
(sourceNode, filter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps nicer to work with a case class
} | ||
}).groupBy[PropertyContainer](_._1).mapValues(_.map(_._2)) | ||
|
||
matchQuery.where( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hard to reason about what happens in this code block
@@ -195,6 +200,15 @@ Spark works with data in a tabular fixed schema. To accomplish this Neo4j Connec | |||
|
|||
TK list of supported data types | |||
|
|||
=== Consideration on the filters | |||
|
|||
The Neo4j Spark Connector implements the SupportPushDownFilters interface, that allows you to push the Spark filters down to the Neo4j layer. In this way the data that Spark will receive will be already filtered by Neo4j. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably mention that all filters are ANDed together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done at line 213
|
||
private def createNode(name: String, labels: Seq[String]) = { | ||
val primaryLabel = labels.head | ||
val otherLabels = labels.takeRight(labels.size - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
labels.tail()
.toMap | ||
.asJava | ||
|
||
def connectorVersion: String = properties.getOrDefault("version", "UNKNOWN").toString | ||
|
||
def mapSparkFiltersToCypher(filter: Filter, container: org.neo4j.cypherdsl.core.PropertyContainer, attributeAlias: Option[String] = None): Condition = { | ||
filter match { | ||
case eqns: EqualNullSafe => container.property(attributeAlias.getOrElse(eqns.attribute)).isEqualTo(Cypher.literalOf(eqns.value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo parameters instead of literals, can be in a later PR
.toMap | ||
.asJava | ||
|
||
def connectorVersion: String = properties.getOrDefault("version", "UNKNOWN").toString | ||
|
||
def mapSparkFiltersToCypher(filter: Filter, container: org.neo4j.cypherdsl.core.PropertyContainer, attributeAlias: Option[String] = None): Condition = { | ||
filter match { | ||
case eqns: EqualNullSafe => container.property(attributeAlias.getOrElse(eqns.attribute)).isEqualTo(Cypher.literalOf(eqns.value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coalesce around the value?
|
||
val result = df.where("age > 20").collectAsList() | ||
|
||
assertEquals(1, result.size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these tests should probably check the actual entities are correct in the results, not just the count
.option("relationship.target.labels", "Person") | ||
.load() | ||
|
||
assertEquals(1, df.filter("`<source>`.`id` = '10' AND `<target>`.`id` = '1'").collectAsList().size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
df can do count without collectToList
.option("relationship.target.labels", "Person") | ||
.load() | ||
|
||
assertEquals(1, df.filter("`source.id` = 10 AND `target.id` = 1").collectAsList().size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the syntax is different from before where it was <source>.<id>
?
why? we should allow only one (simpler one)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same explanation as my comment in quickstart.adoc
, is due to the presence of .option("relationship.nodes.map", "false")
They are different because the result format is different (map vs column). We can talk about anyway
|
||
If `relationship.node.map` is set to **true** | ||
|
||
* ``\`<source>`.\`[property]` `` for the source node map properties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I got your question right: this is used to filter the source, rel, or target columns when are returned as a map (using the relationship.node.map
== true). This is the syntax used to access properties inside said maps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but why needs to be a different syntax there, can't we figure that out internally? and use the appropriate adjusted filter. Just trying to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two ways to represent a relationship:
- with
relationship.node.map=false
=> we have the nodes and relationship flattened so every node has the namespacesource/target
and the relationship hasrel
. This is the most descriptive representation; - with
relationship.node.map=true
=> we have the nodes represented as map, in order to compact the table representation.
(p2:Person {name: 'Jane Doe'}) | ||
""") | ||
|
||
val result = df.select("name").where("NOT name = 'John Doe'").collectAsList() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there also a not operator on the predicate API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We added a test with the !=
operator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant on the spark filter API i.e. can I do .not().where() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No you can't
|
||
@Test | ||
def `should not return attribute if filter doesn't have it` { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what this test means.
@@ -43,5 +43,27 @@ class Neo4jImplicitsTest { | |||
assertEquals(value, actual) | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the other implicits also be tested?
@@ -13,7 +14,7 @@ class Neo4jQueryServiceTest { | |||
options.put(QueryType.LABELS.toString.toLowerCase, "Person") | |||
val neo4jOptions: Neo4jOptions = new Neo4jOptions(options) | |||
|
|||
val query: String = new Neo4jQueryService(neo4jOptions, new Neo4jQueryReadStrategy()).createQuery() | |||
val query: String = new Neo4jQueryService(neo4jOptions, new Neo4jQueryReadStrategy(Array[Filter]())).createQuery() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't it have a default value of an empty array instead?
|
||
val query: String = new Neo4jQueryService(neo4jOptions, new Neo4jQueryReadStrategy(filters)).createQuery() | ||
|
||
assertEquals("MATCH (n:`Person`) WHERE n.name = 'John Doe' RETURN n", query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please also test the generation of a few more queries
- and/or/not combos (esp. with parentheses around OR groups)
- source/target/rel filters both with map-mode and without
- multi label?
- label filters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, really cool feature!!
Please have a look at my comments. And decide if you want to act on them and if so if in this PR or later.
Are there also other predicates possible and functions possible? like |
@@ -177,6 +177,11 @@ every single node property as column prefixed by `source` or `target` | |||
|`sample` | |||
|No | |||
|
|||
|`pushdown.filters.enabled` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just out of curiosity, why would I ever not want this enabled? What situation would mean I should disable it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe some users prefer to leave the filtering to Spark because, for instance, Spark would be faster in filtering some properties that might not have been indexed on Neo4j.
Also, if we have a bug on the filtering, this could be a nice patch 😅
for functions: you should use Spark functions, since they're not converted to Filters and pushed down to Neo4j. At the moment any way to handle them. for the labels: they are implicitly defined when we use the
|
|
||
assertEquals("MATCH (source:`Person`) " + | ||
"MATCH (target:`Person`) " + | ||
"MATCH (source)-[rel:`KNOWS`]->(target) WHERE (source.name = coalesce('John Doe', '') OR target.name = coalesce('John Doe', '')) RETURN source, rel, target", query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm not sure about the semantics of EqualTo in spark?
b.c. now we also get all the records returned that have name=''
? not sure if that's intended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'm not sure we have to distinguish between null and not null equalto?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the coalesce approach works here as it would return wrong data.
Do you have a pointer to the "null" semantics for the spark pushdown filters?
Otherwise I would just use regular cypher expressions and ignore the null bit.
If you want to specifiy x IS NULL OR x = 'foo'
I would generate exactly that but that will add a penalty to the cypher query performance.
Looks good otherwise.
@jexp I update the PR as you requested please lemme know what are your thoughts about. |
case notNull: IsNotNull => container.property(attributeAlias.getOrElse(notNull.attribute)).isNotNull | ||
case isNull: IsNull => container.property(attributeAlias.getOrElse(isNull.attribute)).isNull | ||
case startWith: StringStartsWith => container.property(attributeAlias.getOrElse(startWith.attribute)) | ||
.startsWith(Functions.coalesce(Cypher.literalOf(startWith.value), Cypher.literalOf(""))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought you wanted to remove the coalesce?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be missing test that would have spotted them. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@conker84 ping
val neo4jOptions: Neo4jOptions = new Neo4jOptions(options) | ||
|
||
val filters: Array[Filter] = Array[Filter]( | ||
EqualNullSafe("name", "John Doe"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where are the semantics of EqualNullSafe defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment
@jexp PR updated |
Fixes #161
Added PushDownFilters support.
Is applied just with
labels
andrelationship
, it apply withrelationship
and therelationship.nodes.map
options set totrue
.When applied the filters are executed on the Cypher query, moving less data from Neo4j to Spark. When not applied the filters are executed by Spark.
It can be manually disabled setting
pushdown.filters.enabled
option tofalse
.