# Spark Streaming with Kafka

## Initialize Spark

In [5]:
import os
import findspark
findspark.init()

## Preparing the Environment

In [6]:
import os
os.environ['PYSPARK_SUBMIT_ARGS']= '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.3 pyspark-shell'

## Import dependencies
- We need to import the necessary pySpark modules for Spark, Spark Streaming, and Spark Streaming with Kafka.

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F

## Create Spark context

In [8]:
spark=SparkSession.builder \
                  .master("local[4]") \
                  .appName("KafkaWordCount") \
                  .getOrCreate()

## Create Streaming Context
- We pass the Spark context along with the batch duration which here is set to 10 seconds.

In [9]:
myschema = StructType([StructField(c, StringType()) for c in ["timestamp", "info", "country", "message"]])

## Connect to Kafka
- The topic connected to is **test**, from consumer group spark-streaming.

In [10]:
df=spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers","localhost:9092") \
        .option("subscribe","deneme2") \
        .option("startingOffsets", "earliest") \
        .load()

## Message Processing
- Structure Streaming API (dataframe) provides functions to count the number of messages in the log, and to print them to the output:

**timestamp** <br/> **country** <br/> **count**


In [11]:
rawQuery = df \
            .writeStream \
            .queryName("qraw")\
            .format("memory")\
            .start()

In [None]:
raw = spark.sql("select * from qraw")
raw.show()

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+



In [None]:
# Get the "value" from Kafka message
df2=df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") 

In [None]:
# Split messages by space and get a column
split_col = F.split(df2['value'], ' ')
df3 = df2.withColumn('timestamp', split_col.getItem(0))
df3 = df3.withColumn('info', split_col.getItem(1))
df3 = df3.withColumn('country', split_col.getItem(2))
df3 = df3.withColumn('message', split_col.getItem(3)).drop('value')

In [None]:
# Display d3 dataframe's schema
df3.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- info: string (nullable = true)
 |-- country: string (nullable = true)
 |-- message: string (nullable = true)



In [None]:
# Count the number of country per timestamp
word_counts=df3.groupBy(F.col("timestamp"),F.col("country")) \
               .count() \
               .sort(desc("count"))

In [None]:
# Due to will be recording the output of word_counts into Casandra, we need to create a table in Casandra that has same schema of word_counts dataframe.
word_counts.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- country: string (nullable = true)
 |-- count: long (nullable = false)



## Write Messages

In [18]:
# Write to Casandra. 
# query = word_counts.writeStream \
#                    .option("checkpointLocation", '/tmp/check_point/') \
#                    .format("org.apache.spark.sql.cassandra") \
#                    .option("keyspace", "demo") \
#                    .option("table", "test") \
#                    .start()

In [None]:
# Write to Jupyter console.
query = word_counts.writeStream \
                   .outputMode("complete") \
                   .format("console") \
                   .start()

In [None]:
query.awaitTermination()

In [None]:
query.stop()