Skip to content

Commit

Permalink
Fix the variable name and check null in finally
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Aug 4, 2015
1 parent 4a9c79e commit 9570bec
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,28 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
private val topic = "def"

private var ssc: StreamingContext = _
private var MQTTTestUtils: MQTTTestUtils = _
private var mqttTestUtils: MQTTTestUtils = _

before {
ssc = new StreamingContext(master, framework, batchDuration)
MQTTTestUtils = new MQTTTestUtils
MQTTTestUtils.setup()
mqttTestUtils = new MQTTTestUtils
mqttTestUtils.setup()
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
if (MQTTTestUtils != null) {
MQTTTestUtils.teardown()
MQTTTestUtils = null
if (mqttTestUtils != null) {
mqttTestUtils.teardown()
mqttTestUtils = null
}
}

test("mqtt input stream") {
val sendMessage = "MQTT demo for spark streaming"
val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic,
val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + mqttTestUtils.brokerUri, topic,
StorageLevel.MEMORY_ONLY)

@volatile var receiveMessage: List[String] = List()
Expand All @@ -71,7 +71,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter

// Retry it because we don't know when the receiver will start.
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
MQTTTestUtils.publishData(topic, sendMessage)
mqttTestUtils.publishData(topic, sendMessage)
assert(sendMessage.equals(receiveMessage(0)))
}
ssc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,11 @@ private class MQTTTestUtils extends Logging {
}
}
} finally {
client.disconnect()
client.close()
client = null
if (client != null) {
client.disconnect()
client.close()
client = null
}
}
}

Expand Down

0 comments on commit 9570bec

Please sign in to comment.