diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 555c26aad1811..e5036fbc6d626 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -111,7 +111,7 @@ private class MQTTTestUtils extends Logging { /** * Block until at least one receiver has started or timeout occurs. */ - def waitForReceiverToStart(ssc: StreamingContext) = { + def waitForReceiverToStart(ssc: StreamingContext) : Unit = { val latch = new CountDownLatch(1) ssc.addStreamingListener(new StreamingListener { override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 4d352cba96a3a..d406749c9b4f1 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -54,8 +54,10 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) ssc = new StreamingContext(sparkConf, Milliseconds(500)) val sendMessage = "MQTT demo for spark streaming" - val receiveStream = - MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, StorageLevel.MEMORY_ONLY) + + val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, + StorageLevel.MEMORY_ONLY) + @volatile var receiveMessage: List[String] = List() receiveStream.foreachRDD { rdd => if (rdd.collect.length > 0) {