In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Kafka_stream") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

In [None]:
kafka_logs_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "test-topic")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false") # <--- continue despite of data loss
    .load()
)

In [None]:
kafka_logs_df.show()

In [None]:
kafka_logs_df.printSchema()

In [None]:
from pyspark.sql.functions import expr

kafka_json_df = kafka_logs_df.withColumn("value", expr("cast(value as string)"))

In [None]:
kafka_logs_df.printSchema()

In [None]:
kafka_logs_df.show()

In [None]:
from pyspark.sql.types import StringType, StructField, StructType, LongType

log_schema = StructType([
    StructField('timestamp', StringType(), True),
    StructField('level', StringType(), True),
    StructField('message', StringType(), True),
    StructField('source', StringType(), True),
    StructField('request_id', LongType(), True),
    StructField('user_id', LongType(), True),
    StructField('id', StringType(), True)
])


In [None]:
from pyspark.sql.functions import from_json,col
streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), log_schema)).selectExpr("values_json.*")


In [None]:
streaming_df.printSchema()

In [None]:
(streaming_df
 .writeStream
 .format("console")
 .outputMode("append")
 .option("checkpointLocation", "brand_new")
 .start()
 .awaitTermination())