Skip to content

Commit

Permalink
scala style fix
Browse files Browse the repository at this point in the history
  • Loading branch information
prabeesh committed Jul 11, 2015
1 parent a5a8f9f commit e1ee016
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private class MQTTTestUtils extends Logging {
/**
* Block until at least one receiver has started or timeout occurs.
*/
def waitForReceiverToStart(ssc: StreamingContext) = {
def waitForReceiverToStart(ssc: StreamingContext) : Unit = {
val latch = new CountDownLatch(1)
ssc.addStreamingListener(new StreamingListener {
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val sendMessage = "MQTT demo for spark streaming"
val receiveStream =
MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, StorageLevel.MEMORY_ONLY)

val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic,
StorageLevel.MEMORY_ONLY)

@volatile var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
if (rdd.collect.length > 0) {
Expand Down

0 comments on commit e1ee016

Please sign in to comment.