In [2]:
! pyspark --version

22/06/06 13:52:17 WARN Utils: Your hostname, vplentz-computer resolves to a loopback address: 127.0.1.1; using 192.168.1.19 instead (on interface wlp2s0)
22/06/06 13:52:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.15
Branch HEAD
Compiled by user hgao on 2022-01-20T19:26:14Z
Revision 4f25b3f71238a00508a356591553f2dfa89f8290
Url https://github.com/apache/spark
Type --help for more information.


In [5]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, sum, window, to_json, struct
from pyspark.sql.types import *
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 pyspark-shell'

In [6]:
spark_session = SparkSession.builder.appName('ClickStreamUsersStreamMetrics').getOrCreate()

### With the variable starting offsets you can control if you want to process data from topics beggining or from latest messages

In [7]:
starting_offsets = 'latest'
# starting_offsets = 'earliest'

pageviews_stream_df = spark_session.readStream.format('kafka')\
    .option('kafka.bootstrap.servers', 'localhost:9092')\
    .option('subscribe', 'pageviews')\
    .option("startingOffsets", starting_offsets)\
    .load()

### Set pageviews schema

In [8]:
pageviews_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [9]:
schema = StructType([
    StructField('viewtime', IntegerType()),
    StructField('pageid', StringType()),
    StructField('userid', StringType())
])

In [10]:
pageviews_stream_df = pageviews_stream_df.select(
    col('key').cast("string"),
    from_json(col('value').cast('string'), schema).alias('value'),
    col('topic'), col('partition'),
    col('offset'), col('timestamp'),
    col('timestampType')
)

In [11]:
pageviews_stream_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- viewtime: integer (nullable = true)
 |    |-- pageid: string (nullable = true)
 |    |-- userid: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Pageviews as minutes

In [12]:
pageviews_stream_df = pageviews_stream_df.withColumn('value.viewtime', col('value.viewtime')/60)

### Processing how many pageviews a user has per minute (for user 1, 2 and 3)

In [13]:
users_list = ['User_1', 'User_2', 'User_3'] 
users_views_per_minute_df = pageviews_stream_df.filter(col('value.userid').isin(users_list))

In [15]:
# USING THE PROCESSING TIME, WE HAVE NO EVENT TIME IN THIS DATASET
users_views_per_minute_df = users_views_per_minute_df.withWatermark("timestamp", "1 minutes").\
groupby(col('value.userid'), window("timestamp", "1 minute").alias('grouped_minute')).\
agg(sum('value.viewtime').alias('sum_viewtime'))

In [17]:
users_views_per_minute_df.select(
    to_json(struct("userid", "grouped_minute")).alias("key"),
    col('sum_viewtime').cast('string').alias('value')).writeStream \
  .format("kafka") \
  .trigger(processingTime='2 minutes')\
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("checkpointLocation", "../checkpoints/") \
  .option("topic", "user_pageview_stats") \
  .outputMode("append") \
  .start()

<pyspark.sql.streaming.StreamingQuery at 0x7febc7e23d60>

In [15]:
users_views_per_minute_df.select('*').writeStream.format("console").start()

<pyspark.sql.streaming.StreamingQuery at 0x7fbf25de7f70>