diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index f6fba4488e238..9b7af07803b4d 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -14,12 +14,10 @@ ssc = StreamingContext(conf=conf, duration=Seconds(1)) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) - fm_lines = lines.flatMap(lambda x: x.split(" ")) - mapped_lines = fm_lines.map(lambda x: (x, 1)) - reduced_lines = mapped_lines.reduceByKey(add) + words = lines.flatMap(lambda line: line.split(" ")) + mapped_words = words.map(lambda word: (word, 1)) + count = mapped_words.reduceByKey(add) - reduced_lines.pyprint() - count_lines = mapped_lines.count() - count_lines.pyprint() + count.pyprint() ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/python/streaming/wordcount.py b/examples/src/main/python/streaming/wordcount.py index ee52c4e178142..2426345711086 100644 --- a/examples/src/main/python/streaming/wordcount.py +++ b/examples/src/main/python/streaming/wordcount.py @@ -11,21 +11,14 @@ exit(-1) conf = SparkConf() conf.setAppName("PythonStreamingWordCount") - conf.set("spark.default.parallelism", 1) -# still has a bug -# ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1)) ssc = StreamingContext(conf=conf, duration=Seconds(1)) lines = ssc.textFileStream(sys.argv[1]) - fm_lines = lines.flatMap(lambda x: x.split(" ")) - filtered_lines = fm_lines.filter(lambda line: "Spark" in line) - mapped_lines = fm_lines.map(lambda x: (x, 1)) - reduced_lines = mapped_lines.reduceByKey(add) + words = lines.flatMap(lambda line: line.split(" ")) + mapped_words = words.map(lambda x: (x, 1)) + count = mapped_words.reduceByKey(add) - fm_lines.pyprint() - filtered_lines.pyprint() - mapped_lines.pyprint() - reduced_lines.pyprint() + count.pyprint() ssc.start() ssc.awaitTermination()