Skip to content

Commit

Permalink
wait for starting the receiver before publishing data
Browse files Browse the repository at this point in the history
  • Loading branch information
prabeesh committed Jul 24, 2015
1 parent 87fc677 commit a6747cb
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt

import scala.reflect.ClassTag

import org.apache.spark.api.java.function.Function
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -121,4 +122,15 @@ private class MQTTTestUtils extends Logging {

assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
}

def waitForReceiverToStart(jssc: JavaStreamingContext) : Unit = {
val latch = new CountDownLatch(1)
jssc.addStreamingListener(new StreamingListener {
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
latch.countDown()
}
})

assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
}
}
1 change: 1 addition & 0 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ def test_mqtt_stream(self):
sendData = "MQTT demo for spark streaming"
topic = self._randomTopic()
result = self._startContext(topic)
self._MQTTTestUtils.waitForReceiverToStart(self.ssc._jssc)
self._publishData(topic, sendData)
self.wait_for(result, len(sendData))
self._validateStreamResult(sendData, result)
Expand Down

0 comments on commit a6747cb

Please sign in to comment.