In [1]:
import pyspark
from pyspark.sql import SparkSession
from itertools import islice
from pyspark.sql import Row
import pyspark.sql.functions as f

In [2]:
spark = SparkSession.builder.appName('pyspark-training-spark-streaming').master('local[2]').getOrCreate()

In [4]:
spark.getActiveSession()

### Rate format

In [5]:
df = spark \
  .readStream \
  .format("rate") \
  .option("rowsPerSecond", 10) \
  .load()

In [6]:
df2 = df.writeStream.format('console').option('truncate', 'false').start()

In [7]:
df2.stop()

In [None]:
#df2.awaitTermination()

### Kafka format

In [3]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load()

### Split the lines into words

In [4]:
words = df.select(
   f.explode(
       f.split(df.value, " ")
   ).alias("word")
)

### Generate running word count

In [5]:
wordCounts = words.groupBy("word").count()

In [7]:
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [8]:
query.stop()

In [None]:
#query.awaitTermination()

#### user activity

In [3]:
from pyspark.sql.types import StructType

In [4]:
df_ua = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "user-activity") \
  .option("startingOffsets", "latest") \
  .load()

In [5]:
df_ua.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
df_ua_casted = df_ua.withColumn('key', f.col('key').cast('string'))\
                    .withColumn('value', f.col('value').cast('string'))

In [7]:
schema = StructType().add("user_id", "string").add("ts", "timestamp")

In [8]:
df_ua_flattened = df_ua_casted.withColumn('event', f.from_json('value', schema))\
                              .withColumn('user_id', f.col('event')['user_id'])\
                              .withColumn('ts', f.col('event')['ts'])

In [9]:
df_ua_flattened.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- user_id: string (nullable = true)
 |    |-- ts: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- ts: timestamp (nullable = true)



#### Trigger - Fixed interval micro batches

In [16]:
df_query_mb = df_ua_flattened.writeStream\
               .format('console')\
               .option('truncate', 'false')\
               .trigger(processingTime='2 seconds')\
               .start()\
               #.awaitTermination()

In [17]:
df_query_mb.stop()

#### Trigger - Only once

In [19]:
df_query_once = df_ua_flattened.writeStream\
               .format('console')\
               .option('truncate', 'false')\
               .trigger(once = True)\
               .start()\
               #.awaitTermination()

In [20]:
df_query_once.stop()

#### Trigger - Continuous

In [21]:
df_query_cont = df_ua_flattened.writeStream\
               .format('console')\
               .option('truncate', 'false')\
               .option('checkpointLocation', 'D:\\data\\coep\\checkpoint\\cont')\
               .trigger(continuous= '1 second')\
               .start()\
               #.awaitTermination()

In [22]:
df_query_cont.stop()

In [None]:
### Window

#### tumbling window

In [12]:
df_windowed = df_ua_flattened.groupBy(f.col('user_id'), f.window('ts', '5 seconds')).count()

In [13]:
df_windowed_tumbl = df_windowed.writeStream\
               .format('console')\
               .option('truncate', 'false')\
               .option('checkpointLocation', 'D:\\data\\coep\\checkpoint\\tumbl')\
               .trigger(processingTime='5 seconds')\
               .outputMode('update')\
               .start()\

In [14]:
df_windowed_tumbl.stop()

##### with watermark

In [15]:
df_windowed_wm = df_ua_flattened.withWatermark('ts', '5 seconds').groupBy(f.col('user_id'), f.window('ts', '5 seconds')).count()

In [None]:
df_windowed_tumbl_wm = df_windowed_wm.writeStream\
               .format('console')\
               .option('truncate', 'false')\
               .option('checkpointLocation', 'D:\\data\\coep\\checkpoint\\tumbl_wm')\
               .trigger(processingTime='5 seconds')\
               .outputMode('update')\
               .start()\

In [None]:
df_windowed_tumbl_wm.stop()

#### sliding window

In [10]:
df_windowed = df_ua_flattened.groupBy(f.window('ts', '10 seconds', '5 seconds'), f.col('user_id')).count()

In [11]:
df_windowed_sld = df_windowed.writeStream\
               .format('console')\
               .option('truncate', 'false')\
               .option('checkpointLocation', 'D:\\data\\coep\\checkpoint\\sld')\
               .trigger(processingTime='5 seconds')\
               .outputMode('update')\
               .start()\

In [12]:
df_windowed_sld.stop()

##### with watermark

In [13]:
df_windowed_wm = df_ua_flattened.withWatermark('ts', '5 seconds').groupBy(f.window('ts', '10 seconds', '5 seconds'), f.col('user_id')).count()

In [14]:
df_windowed_sld_wm = df_windowed_wm.writeStream\
               .format('console')\
               .option('truncate', 'false')\
               .option('checkpointLocation', 'D:\\data\\coep\\checkpoint\\sld_wm')\
               .trigger(processingTime='5 seconds')\
               .outputMode('update')\
               .start()\

In [15]:
df_windowed_sld_wm.stop()