Skip to content

Commit

Permalink
[SPARK-18617][CORE][STREAMING] Close "kryo auto pick" feature for Spa…
Browse files Browse the repository at this point in the history
…rk Streaming

## What changes were proposed in this pull request?

apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <hustyugm@gmail.com>

Closes apache#16052 from uncleGen/SPARK-18617.
  • Loading branch information
uncleGen authored and Robert Kruszewski committed Dec 15, 2016
1 parent eb1f192 commit cb1a44b
Showing 1 changed file with 25 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,31 @@ object TestReceiver {
val counter = new AtomicInteger(1)
}

class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging {

val data: Array[Byte] = "test".getBytes
var receivingThreadOption: Option[Thread] = None

override def onStart(): Unit = {
val thread = new Thread() {
override def run() {
logInfo("Receiving started")
while (!isStopped) {
store(data)
}
logInfo("Receiving stopped")
}
}
receivingThreadOption = Some(thread)
thread.start()
}

override def onStop(): Unit = {
// no clean to be done, the receiving thread should stop on it own, so just wait for it.
receivingThreadOption.foreach(_.join())
}
}

/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
Expand Down

0 comments on commit cb1a44b

Please sign in to comment.