From e1ee0163a7a09c5e64baea2c549126ca2b451f56 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Sat, 11 Jul 2015 10:04:26 +0400 Subject: [PATCH] scala style fix --- .../org/apache/spark/streaming/mqtt/MQTTTestUtils.scala | 2 +- .../org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 555c26aad1811..e5036fbc6d626 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -111,7 +111,7 @@ private class MQTTTestUtils extends Logging { /** * Block until at least one receiver has started or timeout occurs. */ - def waitForReceiverToStart(ssc: StreamingContext) = { + def waitForReceiverToStart(ssc: StreamingContext) : Unit = { val latch = new CountDownLatch(1) ssc.addStreamingListener(new StreamingListener { override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index 4d352cba96a3a..d406749c9b4f1 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -54,8 +54,10 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) ssc = new StreamingContext(sparkConf, Milliseconds(500)) val sendMessage = "MQTT demo for spark streaming" - val receiveStream = - MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, StorageLevel.MEMORY_ONLY) + + val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, + StorageLevel.MEMORY_ONLY) + @volatile var receiveMessage: List[String] = List() receiveStream.foreachRDD { rdd => if (rdd.collect.length > 0) {