forked from abulbasar/pyspark-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark_stream_json_messages.py
57 lines (42 loc) · 1.52 KB
/
spark_stream_json_messages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import *
"""
Before submitting the job, ensure the env variables are set properly.
export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=python3
unset PYSPARK_DRIVER_PYTHON_OPTS
Start the spark streaming application by running the following command
$SPARK_HOME/bin/spark-submit spark_stream_json_messages.py
You may like to reduce the logging level to WARN in log4j.properties configuration
found in $SPARK_HOME/conf/log4j.properties. Copy this file from the template if not already present.
"""
hostname, port = "localhost", 9999
batch_interval = 2
sc = SparkContext()
print("Spark WebUI: ", sc.uiWebUrl)
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, batch_interval)
lines = ssc.socketTextStream(hostname, port, StorageLevel.MEMORY_ONLY)
# Print the raw dstream
#lines.pprint()
def top10_hashtags(tweets):
if "text" in tweets.columns:
tweets_terms = tweets.select(explode(split("text", ' ')).alias("term"))
return (tweets_terms
.filter("term like '#%'")
.groupBy("term")
.count()
.orderBy(desc("count"))
)
def convert_to_dataframe(rdd):
if rdd.count() > 0:
tweets = sqlContext.read.json(rdd)
top10 = top10_hashtags(tweets)
if top10:
top10.show(10, False)
lines.foreachRDD(convert_to_dataframe)
ssc.start()
ssc.awaitTermination()