diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a4324a748ae32..970594c7fa10c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -835,9 +835,9 @@ class MQTTStreamTests(PySparkStreamingTestCase): def setUp(self): super(MQTTStreamTests, self).setUp() - utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils") - self._utils = utilsClz.newInstance() + self._MQTTTestUtils = MQTTTestUtilsClz.newInstance() self._MQTTTestUtils.setup() def tearDown(self): @@ -850,7 +850,7 @@ def tearDown(self): def _randomTopic(self): return "topic-%d" % random.randint(0, 10000) - def _validateStreamResult(self, sendData, stream): + def _validateStreamResult(self, sendData, dstream): result = [] def get_output(_, rdd): @@ -862,16 +862,15 @@ def get_output(_, rdd): self.assertEqual(sendData, receiveData) def test_mqtt_stream(self): - """Test the Python Kafka stream API.""" + """Test the Python MQTT stream API.""" topic = self._randomTopic() sendData = "MQTT demo for spark streaming" ssc = self.ssc - self._MQTTTestUtils.createTopic(topic) self._MQTTTestUtils.waitForReceiverToStart(ssc) self._MQTTTestUtils.publishData(topic, sendData) - stream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic) + stream = MQTTUtils.createStream(ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic) self._validateStreamResult(sendData, stream)