feat(spark): support for FetchRel offset field#296
Conversation
| if (fetch.getCount.isPresent) { | ||
| val limit = Literal(fetch.getCount.getAsLong.intValue(), IntegerType) | ||
| fetch.getOffset match { | ||
| case 1L => GlobalLimit(limitExpr = limit, child = child) |
There was a problem hiding this comment.
fwiw this is just misusing the field, but I know it was there already so not a problem for this PR 😅
There was a problem hiding this comment.
If I understand this, you're using offset 1 or -1 as a switching field? What's needed to not do this? At the very least, could we used something like -2 to signal this to avoid using a valid offset value as a magic number?
There was a problem hiding this comment.
@Blizzara Do you know if there's anything outside of substrait-spark that's relying on this magic number, or is it something that we can change.
There was a problem hiding this comment.
I'm not aware of anything else, I'd be happy to see it changed! (I had already changed it in our internal fork of substrait-spark, but the code of this PR is nicer than mine 🎉 )
| fetch.getOffset match { | ||
| case 1L => GlobalLimit(limitExpr = limit, child = child) | ||
| case -1L => LocalLimit(limitExpr = limit, child = child) | ||
| case _ => visitFallback(fetch) |
There was a problem hiding this comment.
Can Spark not handle the case where both the offset and count are set?
There was a problem hiding this comment.
Can Spark not handle the case where both the offset and count are set?
Yes, the unit test in this PR sets limit and offset. Spark handles limit by breaking it into two logical relations LocalLimit and GlobalLimit (there's an explanation here).
In the case of the test query in this PR:
select l_partkey from lineitem where l_shipdate < date '1998-01-01' " +
"order by l_shipdate asc, l_discount desc limit 100 offset 1000the spark logical plan is:
GlobalLimit 100
+- Offset 1000
+- LocalLimit 1100
+- Project [l_partkey#7997L]
+- Sort [l_shipdate#8006 ASC NULLS FIRST, l_discount#8002 DESC NULLS LAST], true
+- Project [l_partkey#7997L, l_shipdate#8006, l_discount#8002]
+- Filter (isnotnull(l_shipdate#8006) AND (l_shipdate#8006 < 1998-01-01))
+- Relation ...
I'm not sure how this would best translate into a Substrait plan.
The handling of the limit clause was already in the codebase as inherited from Gluten. As @Blizzara noted, this PR adds the offset. But I can understand that this whole area needs rethinking.
There was a problem hiding this comment.
Based on what you've indicated, is converting FetchRel(count=X, offset = Y, <input>) into:
GlobalLimit X
+- Offset Y
+- LocalLimit X + Y
+ <converted input>
viable?
Generally speaking, Substrait consumers should be written in such a way that they accept all valid Substrait plans, even those not built specifically by or for Spark in this case. It's perfectly valid to set both fields of the FetchRel, even if Spark doesn't.
There was a problem hiding this comment.
viable?
Could be... I'll do some more testing to get a better understanding of this.
Generally speaking, ...
Completely agree. One of the weaknesses of the current test suite it that is relies entirely on round-tripping the query plans. I need to improve on this, somehow.
There was a problem hiding this comment.
From reading the Spark PRs here is my understanding.
When converting from Substrait -> Spark:
Fetch(limit>=0, offset>0) => LocalLimit -> GlobalLimitAndOffset
Fetch(limit<0, offset>0) => Offset
Fetch(limit>=0, offset==0) => LocalLimit -> GlobalLimit
When converting from Spark -> Substrait then the following logic should work:
if current_node == LocalLimit:
if next_node == GlobalLimitAndOffset:
if current_node.limit < next_node.limit:
raise Exception("this plan cannot be converted into Substrait")
else:
yield Fetch(limit=next_node.limit, offset=next_node.offset)
skip(next_node)
elif next_node == GlobalLimit:
if current_node.limit < next_node.limit:
raise Exception("this plan cannot be converted into Substrait")
else:
yield Fetch(limit=next_node.limit, offset=0)
skip(next_node)
else:
raise Exception("this plan cannot be converted into Substrait")
# This cannot have been preceded by LocalLimit because we would have
# skipped it above so it is just a global limit
if current_node == GlobalLimit:
yield Fetch(limit=current_node.limit, offset=0)
if current_node == Offset:
yield Fetch(limit=-1, offset=current_node.offset)
if current_node == GlobalLimitAndOffset:
yield Fetch(limit=current_node.limit, offset=current_node.offset)
The above logic should always round trip successfully (unless I made a mistake in my understanding)
The case where there is either a LocalLimit alone or a LocalLimit followed by GlobalLimit or GlobalLimitAndOffset but the LocalLimit's limit is smaller cannot be supported today.
This plan implements a per-group limit and the closest equivalent Substrait plan would be something like...
┌──────┐ ┌───────────────────────────┐ ┌───────┐ ┌───────────┐
│ SCAN │ │ FILTER (partition_id = 1) │ │ LIMIT │ ────┬────│ UNION ALL │
└──────┘ └───────────────────────────┘ └───────┘ │ └───────────┘
│
│
┌──────┐ ┌───────────────────────────┐ ┌───────┐ │
│ SCAN │ │ FILTER (partition_id = 2) │ │ LIMIT │ ────┼
└──────┘ └───────────────────────────┘ └───────┘ │
│
│
│
... │
│
│
│
┌──────┐ ┌───────────────────────────┐ ┌───────┐ │
│ SCAN │ │ FILTER (partition_id = N) │ │ LIMIT │ ────┘
└──────┘ └───────────────────────────┘ └───────┘
There was a problem hiding this comment.
Thank you @westonpace, that's really helpful. I think I've captured that in the amended commit, and added a couple of extra tests
Add missing support for the ‘offset’ clause in the spark module. Signed-off-by: Andrew Coleman <andrew_coleman@uk.ibm.com>
|
@vbarua I believe I have addressed all the comments and this is ready for re-review. Thanks! |
vbarua
left a comment
There was a problem hiding this comment.
This looks reasonable to me. Thanks fork pushing this forward!
Add missing support for the ‘offset’ clause in the spark module.