In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

kafka_topic_name = "wiki-changes"
kafka_bootstrap_servers = 'localhost:9092'

In [2]:
#1)Start spark session in order to start Dataframe
spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
        .master("local[*]") \
        .config("spark.jars", "file:///C://spark_dependency_jars//commons-pool2-2.8.1.jar,file:///C://spark_dependency_jars//spark-sql-kafka-0-10_2.12-3.0.1.jar,file:///C://spark_dependency_jars//kafka-clients-2.6.0.jar,file:///C://spark_dependency_jars//spark-streaming-kafka-0-10-assembly_2.12-3.0.1.jar") \
        .config("spark.executor.extraClassPath","file:///C://spark_dependency_jars//commons-pool2-2.8.1.jar:file:///C://spark_dependency_jars//spark-sql-kafka-0-10_2.12-3.0.1.jar:file:///C://spark_dependency_jars//kafka-clients-2.6.0.jar:file:///C://spark_dependency_jars//spark-streaming-kafka-0-10-assembly_2.12-3.0.1.jar") \
        .config("spark.executor.extraLibrary","file:///C://spark_dependency_jars//commons-pool2-2.8.1.jar:file:///C://spark_dependency_jars//spark-sql-kafka-0-10_2.12-3.0.1.jar:file:///C://spark_dependency_jars//kafka-clients-2.6.0.jar:file:///C://spark_dependency_jars//spark-streaming-kafka-0-10-assembly_2.12-3.0.1.jar") \
        .config("spark.driver.extraClassPath", "file:///C://spark_dependency_jars//commons-pool2-2.8.1.jar:file:///C://spark_dependency_jars//spark-sql-kafka-0-10_2.12-3.0.1.jar:file:///C://spark_dependency_jars//kafka-clients-2.6.0.jar:file:///C://spark_dependency_jars//spark-streaming-kafka-0-10-assembly_2.12-3.0.1.jar") \
        .getOrCreate()
                                                                ## and also  kafka package added above

In [3]:
#spark context
spark.sparkContext.setLogLevel("ERROR")

In [4]:
#since its contiouns data use any streaming...ie spark streaming or structured streaming or spark sql   
#  since Structured Streaming is more inclined towards real-time streaming but Spark Streaming focuses more on batch processing
# here we use Structured Streaming. so no batch processing here...think  

# https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#overview


#full project
#https://towardsdatascience.com/enabling-streaming-data-with-spark-structured-streaming-and-kafka-93ce91e5b435

In [5]:
# Create stream dataframe setting kafka server, topic and offset option
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()


In [6]:
#readStream-> reading data from topic from kafka server  
# format("kafka")->kafka sink
#.option("subscribe", kafka_topic_name) ->  subscribing to one topic

In [7]:
type(df)  
#ie we can confirm  it is a Structured Streaming-> dstream +sql

pyspark.sql.dataframe.DataFrame

In [8]:
print("Printing Schema/structure of _df: ")
df.printSchema()   # checking datatype of dataframe  ie datatype of {key :value}...

Printing Schema/structure of _df: 
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [9]:
df.columns  # columns

['key', 'value', 'topic', 'partition', 'offset', 'timestamp', 'timestampType']

# Transformation

In [10]:
from pyspark.sql.types import StringType
#  Method 1: Convert binary to string key and value
df1 = (df
    .withColumn("key", df["key"].cast(StringType()))
    .withColumn("value", df["value"].cast(StringType())))

In [11]:
df1.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [12]:
# Method 2:Convert binary to string key and value
df1 = df.selectExpr("CAST(key AS string)", "CAST(value AS string)")
    # selectExpr() t takes SQL expression in a String and returns a new DataFrame.but  in select() it doesn’t have a signature to take Column type and Dataset return type.
    # here we convert  value column from binary to string
   

In [13]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import *

new_schema = StructType([
        StructField("venue", StringType()),
        StructField("visibility", StringType())
    ])

In [26]:
# Create dataframe with new schema with df
df2 = (df1
           .withColumn("value", from_json("value", new_schema))  
           .withColumn("key", from_json("key", new_schema)))

In [27]:
df2.columns

['key', 'value']

In [None]:
#method1
#df1 = df.select("value")  
#df1  \
#    .writeStream \
#    .format("console")\
 #   .start()
## console sink
# Note that you have to call start() to actually start the execution of the query

In [30]:
#method2
#d=df2 \
#        .writeStream \
#        .queryName("aggregates") \
#        .format("memory") \
#        .start()
#spark.sql("select * from aggregates").show()

#.queryName("aggregates1")\->  query name will be the table name
# memory sink

In [31]:
# method 3
#df.createOrReplaceTempView("updates")
#spark.sql("select * from updates")

# Sinking

In [23]:
# Write final result into console for debugging purpose
df7 =  df \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .format("console") \
        .start()
#.trigger(processingTime='5 seconds') \-> define the timing of streaming data processing
 # console sink

In [None]:
df7.awaitTermination()  # to run query forever