feat(spark): add Window support#307
Conversation
I've gone ahead and reviewed and merged the other PRs so we shouldn't have any issues with conflict there, beyond the one that exists now. I can review this PR tomorrow. |
03ac4c8 to
f093ec4
Compare
| WindowSpecDefinition(_, _, SpecifiedWindowFrame(frameType, lower, upper))) => | ||
| (fromSpark(frameType), fromSparkPreceding(lower), fromSparkFollowing(upper)) | ||
| case WindowExpression(_, WindowSpecDefinition(_, _, UnspecifiedFrame)) => | ||
| (WindowBoundsType.ROWS, UNBOUNDED, CURRENT_ROW) |
There was a problem hiding this comment.
I was curious about the default behaviour if the frame is unspecified, because for Postgres it is RANGE UNBOUNDED PRECEDING, however it looks like for Spark it seems to depend on whether the ordering is defined:
* @note
* When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding,
* unboundedFollowing) is used by default. When ordering is defined, a growing window frame
* (rangeFrame, unboundedPreceding, currentRow) is used by default.
There was a problem hiding this comment.
Good spot, I hadn't noticed that comment.
|
|
||
| val (frameType, lower, upper) = sparkExp match { | ||
| case WindowExpression(_: OffsetWindowFunction, _) => | ||
| (WindowBoundsType.ROWS, UNBOUNDED, CURRENT_ROW) |
There was a problem hiding this comment.
What is an OffsetWindowFunction. Does it somehow override the frame bounds?
There was a problem hiding this comment.
That's my understanding. TBH I'm not sure exactly how it should translate to substrait, but it's ignored by spark (it matches the lag/lead functions)
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L404
There was a problem hiding this comment.
You actually should get the frame here as well properly. Otherwise the bound will be incorrect for other consumers (unbounded preceeding and current row will not work for Lead, for example) and Spark will throw. You'll see that if you add logicalPlan2.show() into SubstraitPlanTestBase::assertSqlToSubstraitRoundTrip and re-run the lag/lead test. (We should also make the tests always evaluate the converted plans to ensure they are actually valid, I'll try to get to that)
Can be fixed by just removing this case.
There was a problem hiding this comment.
Ok, I've removed it for now
Blizzara
left a comment
There was a problem hiding this comment.
Thanks! Overall looks good, but there's some things in the bounds that I think aren't always correct
| .append("sorts=") | ||
| .append(window.getSorts) | ||
| }) | ||
| } |
There was a problem hiding this comment.
FWIW, in our internal fork I just deleted the whole RelToVerboseString thing. I don't see it bringing that much value over pretty-printing just the protobuf. And there's a fair amount of work to maintain this.
|
|
||
| val WINDOW_SIGS: Seq[Sig] = Seq( | ||
| s[RowNumber]("row_number"), | ||
| s[Rank]("rank"), |
There was a problem hiding this comment.
From looking at our internal fork's window impl, the rank functions in Spark have some child column but substrait doesn't define any. I wonder how it works here, ie how do get a match still?
There was a problem hiding this comment.
| s[CumeDist]("cume_dist"), | ||
| s[NTile]("ntile"), | ||
| s[Lead]("lead"), | ||
| s[Lag]("lag"), |
There was a problem hiding this comment.
I also have a note around Lead and Lag having an NullType null as a child, which I think Substrait doesn't support, did you run into anything like that?
There was a problem hiding this comment.
I haven't seen this - do you have a test case?
There was a problem hiding this comment.
Looks like you had, you just handle it here: https://github.com/substrait-io/substrait-java/pull/307/files#diff-78e1a9d4e9f4c7d8968b3ba83d3cc5222ade95d28e16b0328277c3f8c8a9d313R183 😄
There was a problem hiding this comment.
Oh yes - not known for my memory! 😂
| case other => throw new UnsupportedOperationException(s"Unsupported bounds type: $other.") | ||
| } | ||
|
|
||
| def fromSparkPreceding(bound: Expression): WindowBound = bound match { |
There was a problem hiding this comment.
Substrait defines the bounds as being strictly positive integers, but iirc Spark may have a "preceeding -1 row" for example. This is what I used:
expr match {
case UnboundedPreceding => WindowBound.UNBOUNDED
case UnboundedFollowing => WindowBound.UNBOUNDED
case CurrentRow => WindowBound.CURRENT_ROW
case e: Literal =>
e.dataType match {
case IntegerType => {
val offset = e.eval().asInstanceOf[Int]
if (offset < 0) WindowBound.Preceding.of(-offset)
else if (offset == 0) WindowBound.CURRENT_ROW
else WindowBound.Following.of(offset)
}
}
case _ => throw new UnsupportedOperationException(s"Unexpected bound: $expr")
}
}
There was a problem hiding this comment.
Thanks, I've changed it this.
| case UNBOUNDED => UnboundedPreceding | ||
| case CURRENT_ROW => CurrentRow | ||
| case p: Preceding => Literal(p.offset()) | ||
| case _ => throw new UnsupportedOperationException(s"Unsupported bounds expression $bound") |
There was a problem hiding this comment.
I think this also doesn't work for the same reason as above, the func.lowerBound() may very well be a Following (and vice versa below). So you should handle both in both.
Our version was a bit different, yours might be nice (once fixed):
def toSparkFrame(
boundsType: WindowBoundsType,
lowerBound: WindowBound,
upperBound: WindowBound): WindowFrame = {
val frameType = boundsType match {
case WindowBoundsType.ROWS => RowFrame
case WindowBoundsType.RANGE => RangeFrame
case WindowBoundsType.UNSPECIFIED => return UnspecifiedFrame
}
SpecifiedWindowFrame(
frameType,
toSparkBound(lowerBound, isLower = true),
toSparkBound(upperBound, isLower = false))
}
private def toSparkBound(bound: WindowBound, isLower: Boolean): Expression = {
bound.accept(new WindowBoundVisitor[Expression, Exception] {
override def visit(preceding: WindowBound.Preceding): Expression =
Literal(-preceding.offset().intValue())
override def visit(following: WindowBound.Following): Expression =
Literal(following.offset().intValue())
override def visit(currentRow: WindowBound.CurrentRow): Expression = CurrentRow
override def visit(unbounded: WindowBound.Unbounded): Expression =
if (isLower) UnboundedPreceding else UnboundedFollowing
})
}
There was a problem hiding this comment.
Yours is nicer :).
| | | ||
| |""".stripMargin | ||
| assertSqlSubstraitRelRoundTrip(query) | ||
| } |
There was a problem hiding this comment.
maybe add a test with two different partitions in same select? I think it should pass fine but just in case
| assertSqlSubstraitRelRoundTrip(query) | ||
| } | ||
|
|
||
| test("min") { |
There was a problem hiding this comment.
nit: maybe name this as "aggregate" or something, as I think that's what it's testing (fact that it's min specifically is less relevant)?
| test("min") { | |
| test("aggregate") { |
|
Thanks @Blizzara, there’s some really useful feedback here. I hadn’t realised you had already been implementing this, it would be awesome if you want to collaborate :) I’d only implemented as much as necessary to enable the windowing tests in the TPC-DS suite to pass. If you’ve got other tests that require additional logic in the converter, then it would be great to add these. But perhaps in a future PR in the interest of moving things forward incrementally? |
f093ec4 to
fc0f15a
Compare
Yea, shame on me - it's been on my todo-list ever since you added this into substrait-java to pull up our changes. Some of those changes are non-trivial refactorings which might be annoying, but I'll try to get to it!
I'd like to see the bounds things fixed, since currently this implementation doesn't adhere to the bounds (preceeding, following) being strictly positive. That's not a problem for the roundtrip tests, but it means a plan generated here may not be valid Substrait that can be consumed by other |
Starting with #311! After that I'll add structs (+ fix the name handling), and then I have some more complicated changes in how FunctionMappings works to allow for more complicated mappings. |
82cb729 to
73d3bc9
Compare
To support the OVER clause in SQL Signed-off-by: Andrew Coleman <andrew_coleman@uk.ibm.com>
73d3bc9 to
86a0548
Compare
| WindowSpecDefinition(_, _, SpecifiedWindowFrame(frameType, lower, upper))) => | ||
| (fromSpark(frameType), fromSpark(lower), fromSpark(upper)) | ||
| case WindowExpression(_, WindowSpecDefinition(_, orderSpec, UnspecifiedFrame)) => | ||
| if (orderSpec.isEmpty) { |
There was a problem hiding this comment.
just for posterity, this comes from Spark notes: https://github.com/apache/spark/blob/250f8affd04e4be14446dd02a1c52716e54a226d/sql/api/src/main/scala/org/apache/spark/sql/expressions/Window.scala#L36
| "q21", "q22", "q23a", "q23b", "q24a", "q24b", "q25", "q26", "q27", "q28", "q29", | ||
| "q30", "q31", "q32", "q33", "q37", "q38", | ||
| "q40", "q41", "q42", "q43", "q46", "q48", | ||
| val successfulSQL: Set[String] = Set("q1", "q3", "q4", "q5", "q7", "q8", |
There was a problem hiding this comment.
given how this is growing, might be worth starting to list the ones that don't work instead 😄
There was a problem hiding this comment.
Exactly, I was going to do this after my next PR which will add support for more numeric functions. The number of failing tests will be relatively small then.
To support the OVER clause in SQL
This fixes some of the TPC-DS tests.