/
pyspark_structured_stream_kafka.py
114 lines (80 loc) · 3.93 KB
/
pyspark_structured_stream_kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
################################################################################################################################
#
# PySpark Structured Streaming with Kafka 0.10
#
################################################################################################################################
'''
Usage:
/usr/hdp/current/spark2-client/bin/pyspark \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
/usr/hdp/current/spark2-client/bin/spark-submit \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \
pyspark_structured_stream_kafka.py
'''
import os,sys
import datetime,time
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, window, udf, desc, asc
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("StructuredStream_with_Kafka") \
.getOrCreate()
events = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "dzaratsian2.field.hortonworks.com:6667") \
.option("subscribe", "clickstream") \
.load()
events = events.selectExpr("CAST(value AS STRING)")
# Option #1: Parse by column, using withColumn
parsed_events = events.withColumn('uid', split(events['value'],',')[0].cast(StringType()) ) \
.withColumn('user', split(events['value'],',')[1].cast(StringType()) ) \
.withColumn('datetime', split(events['value'],',')[2].cast(TimestampType()) ) \
.withColumn('state', split(events['value'],',')[3].cast(StringType()) ) \
.withColumn('duration', split(events['value'],',')[4].cast(FloatType()) ) \
.withColumn('rate', split(events['value'],',')[5].cast(FloatType()) ) \
.withColumn('action', split(events['value'],',')[6].cast(StringType()) )
#parsed_events.show(10,False)
###################################################################################################
# Displaying user count. 60 second window with 15 sec sliding duration...
###################################################################################################
# http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.window
# pyspark.sql.functions.window(timeColumn, windowDuration, slideDuration=None, startTime=None)
windowedCounts = parsed_events.groupBy(
window(parsed_events.datetime, "1 minutes", "15 seconds"),
parsed_events.user) \
.count() \
.sort(desc("count"))
query1 = windowedCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
###################################################################################################
# Displaying average duration by user. 60 second window with 15 sec sliding duration...
###################################################################################################
windowedAvg = parsed_events.groupBy(
window(parsed_events.datetime, "1 minutes", "15 seconds"),
parsed_events.user) \
.agg({'duration': 'mean'}) \
.sort(desc("avg(duration)"))
query2 = windowedAvg \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query1.awaitTermination()
query2.awaitTermination()
'''
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper dzaratsian2.field.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic clickstream
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --zookeeper dzaratsian2.field.hortonworks.com:2181 --topic clickstream
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper dzaratsian2.field.hortonworks.com:2181 --list
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper dzaratsian2.field.hortonworks.com:2181 --topic clickstream --from-beginning
'''
#ZEND