Skip to content

Commit

Permalink
[SPARK-5315][Streaming] Fix reduceByWindow Java API not work bug
Browse files Browse the repository at this point in the history
`reduceByWindow` for Java API is actually not Java compatible, change to make it Java compatible.

Current solution is to deprecate the old one and add a new API, but since old API actually is not correct, so is keeping the old one meaningful? just to keep the binary compatible? Also even adding new API still need to add to Mima exclusion, I'm not sure to change the API, or deprecate the old API and add a new one, which is the best solution?

Author: jerryshao <saisai.shao@intel.com>

Closes apache#4104 from jerryshao/SPARK-5315 and squashes the following commits:

5bc8987 [jerryshao] Address the comment
c7aa1b4 [jerryshao] Deprecate the old one to keep binary compatible
8e9dc67 [jerryshao] Fix JavaDStream reduceByWindow signature error
  • Loading branch information
jerryshao authored and tdas committed Jan 23, 2015
1 parent 3c3fa63 commit e0f7fb7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Expand Up @@ -90,6 +90,10 @@ object MimaExcludes {
// SPARK-5297 Java FileStream do not work with custom key/values
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
) ++ Seq(
// SPARK-5315 Spark Streaming Java API returns Scala DStream
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
)

case v if v.startsWith("1.2") =>
Expand Down
Expand Up @@ -211,7 +211,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @deprecated As this API is not Java compatible.
*/
@deprecated("Use Java-compatible version of reduceByWindow", "1.3.0")
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
Expand All @@ -220,6 +222,24 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
}

/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
windowDuration: Duration,
slideDuration: Duration
): JavaDStream[T] = {
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
}

/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. However, the reduction is done incrementally
Expand Down
Expand Up @@ -306,7 +306,17 @@ public void testReduce() {

@SuppressWarnings("unchecked")
@Test
public void testReduceByWindow() {
public void testReduceByWindowWithInverse() {
testReduceByWindow(true);
}

@SuppressWarnings("unchecked")
@Test
public void testReduceByWindowWithoutInverse() {
testReduceByWindow(false);
}

private void testReduceByWindow(boolean withInverse) {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3),
Arrays.asList(4,5,6),
Expand All @@ -319,8 +329,14 @@ public void testReduceByWindow() {
Arrays.asList(24));

JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(),
JavaDStream<Integer> reducedWindowed = null;
if (withInverse) {
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new IntegerDifference(), new Duration(2000), new Duration(1000));
} else {
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new Duration(2000), new Duration(1000));
}
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);

Expand Down

0 comments on commit e0f7fb7

Please sign in to comment.