Skip to content

Commit

Permalink
[SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup
Browse files Browse the repository at this point in the history
This PR contains a few clean-ups that are a part of SPARK-8638: a few style issues got fixed, and a few tests were moved.

Git commit message is wrong BTW :(...

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes apache#7513 from hvanhovell/SPARK-8638-cleanup and squashes the following commits:

4e69d08 [Herman van Hovell] Fixed Perfomance Regression for Shrinking Window Frames (+Rebase)
  • Loading branch information
hvanhovell authored and yhuai committed Jul 19, 2015
1 parent a803ac3 commit 7a81245
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,24 @@ case class Window(
val exprs = windowSpec.orderSpec.map(_.child)
val projection = newMutableProjection(exprs, child.output)
(windowSpec.orderSpec, projection(), projection())
}
else if (windowSpec.orderSpec.size == 1) {
} else if (windowSpec.orderSpec.size == 1) {
// Use only the first order expression when the offset is non-null.
val sortExpr = windowSpec.orderSpec.head
val expr = sortExpr.child
// Create the projection which returns the current 'value'.
val current = newMutableProjection(expr :: Nil, child.output)()
// Flip the sign of the offset when processing the order is descending
val boundOffset = if (sortExpr.direction == Descending) -offset
else offset
val boundOffset =
if (sortExpr.direction == Descending) {
-offset
} else {
offset
}
// Create the projection which returns the current 'value' modified by adding the offset.
val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType))
val bound = newMutableProjection(boundExpr :: Nil, child.output)()
(sortExpr :: Nil, current, bound)
}
else {
} else {
sys.error("Non-Zero range offsets are not supported for windows " +
"with multiple order expressions.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,47 @@ class HiveDataFrameWindowSuite extends QueryTest {
| (PARTITION BY value ORDER BY key RANGE BETWEEN 1 preceding and current row)
| FROM window_table""".stripMargin).collect())
}

test("reverse sliding range frame") {
val df = Seq(
(1, "Thin", "Cell Phone", 6000),
(2, "Normal", "Tablet", 1500),
(3, "Mini", "Tablet", 5500),
(4, "Ultra thin", "Cell Phone", 5500),
(5, "Very thin", "Cell Phone", 6000),
(6, "Big", "Tablet", 2500),
(7, "Bendable", "Cell Phone", 3000),
(8, "Foldable", "Cell Phone", 3000),
(9, "Pro", "Tablet", 4500),
(10, "Pro2", "Tablet", 6500)).
toDF("id", "product", "category", "revenue")
val window = Window.
partitionBy($"category").
orderBy($"revenue".desc).
rangeBetween(-2000L, 1000L)
checkAnswer(
df.select(
$"id",
avg($"revenue").over(window).cast("int")),
Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
Row(10, 6000) :: Nil)
}

// This is here to illustrate the fact that reverse order also reverses offsets.
test("reverse unbounded range frame") {
val df = Seq(1, 2, 4, 3, 2, 1).
map(Tuple1.apply).
toDF("value")
val window = Window.orderBy($"value".desc)
checkAnswer(
df.select(
$"value",
sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)

}
}

This file was deleted.

0 comments on commit 7a81245

Please sign in to comment.