From b34c3c117fbec0d5cbdca32037c72ee4c8fb7110 Mon Sep 17 00:00:00 2001 From: prabs Date: Fri, 6 Feb 2015 01:42:28 +0530 Subject: [PATCH] adress comments --- examples/src/main/python/streaming/mqtt_wordcount.py | 5 ++--- python/pyspark/streaming/mqtt.py | 8 +++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/examples/src/main/python/streaming/mqtt_wordcount.py b/examples/src/main/python/streaming/mqtt_wordcount.py index 524d177a74e34..3c5a73d25c2e3 100644 --- a/examples/src/main/python/streaming/mqtt_wordcount.py +++ b/examples/src/main/python/streaming/mqtt_wordcount.py @@ -44,11 +44,10 @@ sc = SparkContext(appName="PythonStreamingMQTTWordCount") ssc = StreamingContext(sc, 1) - broker_url = sys.argv[1] + brokerUrl = sys.argv[1] topic = sys.argv[2] - data = MQTTUtils.createStream(ssc, topic, broker_url) - lines = data.map(lambda x: x[1]) + lines = MQTTUtils.createStream(ssc, brokerUrl, topic) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index 423869bffdf74..d87e9fcbfd67c 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -19,7 +19,7 @@ from py4j.java_gateway import java_import, Py4JError from pyspark.storagelevel import StorageLevel -from pyspark.serializers import PairDeserializer, NoOpSerializer +from pyspark.serializers import UTF8Deserializer from pyspark.streaming import DStream __all__ = ['MQTTUtils'] @@ -28,7 +28,7 @@ class MQTTUtils(object): @staticmethod - def createStream(ssc, brokerUrl, topic + def createStream(ssc, brokerUrl, topic, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2): """ Create an input stream that pulls messages from a Mqtt Broker. @@ -52,6 +52,4 @@ def createStream(ssc, brokerUrl, topic print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \ "scala-*/spark-streaming-mqtt-assembly-*.jar" raise e - ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) - stream = DStream(jstream, ssc, ser) - return stream + return DStream(jstream, ssc, UTF8Deserializer())