In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, LongType

KAFKA_BOOTSTRAP_SERVERS = "kafka1:19092,kafka2:19093,kafka3:19094"
KAFKA_TOPIC = "events"


events_schema = StructType([ 
    StructField('timestamp', StringType(), True),
    StructField('type', StringType(), True),
    StructField('appName', StringType(), True), 
    StructField('appInstance', StringType(), True),
    StructField('appID', StringType(), True),
    StructField('probeID', StringType(), True),
    StructField('eventID', StringType(), True),
    StructField('correletionID', StringType(), True),
    StructField('locationID', StringType(), True),
    StructField('transactionStart', StringType(), True), 
    StructField('transactionEnd', StringType(), True), 
    StructField('transactionDuration', StringType(), True), 
    StructField('clientIPAddress', StringType(), True),
    StructField('clientPort', StringType(), True), 
    StructField('serverIPAddress', StringType(), True), 
    StructField('serverPort', StringType(), True), 
    StructField('ipProtocol', StringType(), True), 
    StructField('category', StringType(), True), 
    StructField('bytesFromClient', StringType(), True), 
    StructField('bytesToClient', StringType(), True), 
    StructField('bytesFromServer', StringType(), True), 
    StructField('bytesToServer', StringType(), True), 
    StructField('subscriberID', StringType(), True), 
    StructField('applicationProtocol', StringType(), True), 
    StructField('applicationName', StringType(), True), 
    StructField('domain', StringType(), True), 
    StructField('deviceType', StringType(), True), 
    StructField('networkType', StringType(), True), 
    StructField('contentType', StringType(), True), 
    StructField('lostBytesClient', StringType(), True), 
    StructField('lostBytesServer', StringType(), True), 
    StructField('srttMsClient', StringType(), True), 
    StructField('srttMsServer', StringType(), True), 
])



spark = SparkSession \
    .builder \
    .appName("stream-from-Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("spark://spark-master:7077") \
    .getOrCreate()

# Reduce logging
spark.sparkContext.setLogLevel("WARN")

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()

In [2]:
# Parse value from binay to string
json_df = df.selectExpr("cast(value as string) as value")

# Apply Schema to JSON value column and expand the value
from pyspark.sql.functions import from_json

json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], events_schema)).select("value.*") 

In [None]:
# Write the output to console sink to check the output
writing_df = json_expanded_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()
    
# Start the streaming application to run until the following happens
# 1. Exception in the running program
# 2. Manual Interruption
writing_df.awaitTermination()