From a6747cb58f504bd681162bba67e0679b33b42915 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Fri, 24 Jul 2015 16:20:46 +0400 Subject: [PATCH] wait for starting the receiver before publishing data --- .../org/apache/spark/streaming/mqtt/MQTTUtils.scala | 1 - .../apache/spark/streaming/mqtt/MQTTTestUtils.scala | 12 ++++++++++++ python/pyspark/streaming/tests.py | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 22dabb36efa11..38a1114863d15 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt import scala.reflect.ClassTag -import org.apache.spark.api.java.function.Function import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream} diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 34e81b3f0f84f..e48760a0a08f4 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -28,6 +28,7 @@ import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.JavaStreamingContext import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.util.Utils @@ -121,4 +122,15 @@ private class MQTTTestUtils extends Logging { assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") } + + def waitForReceiverToStart(jssc: JavaStreamingContext) : Unit = { + val latch = new CountDownLatch(1) + jssc.addStreamingListener(new StreamingListener { + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { + latch.countDown() + } + }) + + assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.") + } } diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index d3592f0a1f7c2..b27577f878618 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -884,6 +884,7 @@ def test_mqtt_stream(self): sendData = "MQTT demo for spark streaming" topic = self._randomTopic() result = self._startContext(topic) + self._MQTTTestUtils.waitForReceiverToStart(self.ssc._jssc) self._publishData(topic, sendData) self.wait_for(result, len(sendData)) self._validateStreamResult(sendData, result)