## Spark Streaming data with Kafka

For creating spark stream with kafka, we need to include an external package for kafka (spark-sql-kafka).
We need to see the reespective version package according to the spark version we are having.
For spark 3.0,the kafka package is spark-sql-kafka-0-10_2.12:3.0.0.

You need to install kafka in the system and also for python, we need to install kafka-python package.

For running kafka in local machine below are the commands to be followed.
Go to the kafka folder in your machine. (/usr/local/kafka in my case)

 - Start the zookeeper instance by using the command.
    **( bin/zookeeper-server-start.sh config/zookeeper.properties )**
 - Start the kafka server by using the command. 
   **( bin/kafka-server-start.sh config/server.properties )**
 - Create a topic by using the command. **( bin/kafka-topics.sh --create --zookeeper localhost:2181 --   replication-factor 1   --partitions 1 --topic covid19 )**
 - List the created topic with **bin/kafka-topics.sh --list --zookeeper localhost:2181**
 - For producer- **bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic**
 - For consumer - **bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning**
 

In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import os
import json
from kafka import KafkaProducer, KafkaConsumer
import requests
import time
from pyspark.sql.types import *
from pyspark.sql.functions import *

os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"

producer = KafkaProducer(bootstrap_servers='localhost:9092')

conf = SparkConf()

spark = SparkSession.builder \
        .config(conf=conf) \
        .appName('kafka-stream') \
        .getOrCreate()



### Read the covid19 data from api and push it to kafka topic using python

In [2]:
url = "https://api.covid19india.org/travel_history.json"
consumer = KafkaConsumer(
    'numtest',
     bootstrap_servers=['localhost:9092'])

response = requests.get(url)
for i in response.json()['travel_history']:
    tDict = { key : i[key] for key in ['pid','address', 'timefrom', 'timeto']}
    producer.send('covid19',json.dumps(tDict).encode('utf-8'))
    # print(json.dumps(tDict).encode('utf-8'))
    time.sleep(100/1000)
    # break
    
fields = [
    StructField('pid',StringType(), True),
    StructField('address',StringType(), True),
    StructField('timefrom',StringType(), True),
    StructField('timeto',StringType(), True)
]
schema = StructType(fields=fields)

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "covid19") \
  .option("startingOffsets", "latest") \
  .load()

kafka_df_string = df.selectExpr("CAST(value AS STRING)")
service_table = kafka_df_string.select(from_json(col('value'), schema).alias("covid"))
final_data = service_table.selectExpr("covid.pid","covid.address","covid.timefrom", "covid.timeto")
query = final_data.writeStream.outputMode("append").queryName("covid").format("json").option("checkpointLocation", "/tmp/checkpoint").option("parquet.block.size", "1024").start("../data/output")
query.awaitTermination()


# ds = df \
#   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
#   .writeStream \
#   .format("console") \
#   .option("kafka.bootstrap.servers", "localhost:9092") \
#   .option("topic", "test-topic") \
#   .start()

# ds.awaitTermination()

b'{"pid": "P35", "address": "Hotel Aryas, Kuthattukulam, Muvattupuzha road", "timefrom": "29/02/2020 10:30:00", "timeto": "29/02/2020 11:30:00"}'
b'{"pid": "P35", "address": "Suresh Hotel Ranni", "timefrom": "01/03/2020 21:30:00", "timeto": "01/03/2020 23:00:00"}'
b'{"pid": "P35", "address": "Post Office Ranni Pazvangadi, Ranni", "timefrom": "02/03/2020 11:00:00", "timeto": "02/03/2020 11:30:00"}'
b'{"pid": "P35", "address": "Knanaya Church, Pazvangadi Ranni", "timefrom": "02/03/2020 11:30:00", "timeto": "02/03/2020 12:00:00"}'
b'{"pid": "P35", "address": "Post Office Ranni Pazvangadi", "timefrom": "02/03/2020 12:00:00", "timeto": "02/03/2020 13:00:00"}'
b'{"pid": "P35", "address": "Golde Emporium, New Hypermarket", "timefrom": "02/03/2020 13:15:00", "timeto": "02/03/2020 14:00:00"}'
b'{"pid": "P35", "address": "Muthoot mini supermarket, Ranni", "timefrom": "02/03/2020 14:30:00", "timeto": ""}'
b'{"pid": "P35", "address": "Imperial Backers, Punalur", "timefrom": "02/03/2020 18:00:00", 

b'{"pid": "P38", "address": "travel by Thachilethu Bus from Ranni to Kottayam", "timefrom": "03/06/2020 08:15", "timeto": "03/06/2020 10:15"}'
b'{"pid": "P38", "address": "halt at Palathara Textiles, Kanjikuzhy, Kottayam", "timefrom": "03/06/2020 10:30", "timeto": "03/06/2020 11:15"}'
b'{"pid": "P38", "address": "travel by Mahiniyam bus from Kanjikuzhy to Ranni", "timefrom": "03/06/2020 14:00", "timeto": "03/06/2020 16:00"}'
b'{"pid": "P38", "address": "Admitted to hospital", "timefrom": "8/3/2020", "timeto": ""}'
b'{"pid": "P39", "address": "halt at Jathanickal Cherukulangara Bakery, Ranni", "timefrom": "3/4/2020 06:00", "timeto": "3/4/2020 08:00"}'
b'{"pid": "P39", "address": "halt at Marthoma Hospital, Ranni", "timefrom": "03/04/2020 19:00", "timeto": "03/04/2020 20:30"}'
b'{"pid": "P39", "address": "travel by Thachilethu Bus from Ranni to Kottayam", "timefrom": "03/06/2020 08:15", "timeto": "03/06/2020 10:15"}'
b'{"pid": "P39", "address": "halt at Palathara Textiles, Kanjikuzhy, Ko

b'{"pid": "P1", "address": "Strict Home isolation", "timefrom": "11.03.2020 12:00 AM", "timeto": "11.03.2020 12:00 PM"}'
b'{"pid": "P1", "address": "Strict Home isolation", "timefrom": "12.03.2020 12:00 AM", "timeto": "12.03.2020 09:00 PM"}'
b'{"pid": "P1", "address": "Travelling in \\nAmbulance \\nby road", "timefrom": "12.03.2020 09:00 PM", "timeto": "12.03.2020 10:00 PM"}'
b'{"pid": "P1", "address": "Patient reached Isolation ward, GMC Kannur", "timefrom": "12.03.2020 10:00 PM", "timeto": ""}'
b'{"pid": "P4", "address": "Bangalore Airport - Came from USA through London,Heathrow", "timefrom": "08.03.2020 04:30 AM", "timeto": "08.03.2020 06:15 AM"}'
b'{"pid": "P4", "address": "HOME", "timefrom": "08.03.2020 06:15 AM", "timeto": "08.03.2020 02:30 PM"}'
b'{"pid": "P4", "address": "Visited RGICD at 02:30pm and sample was collected and sent to NIV Bangalore", "timefrom": "08.03.2020 02:30 PM", "timeto": "08.03.2020 02:30 PM"}'
b'{"pid": "P4", "address": "Whole day he was under home quaran

b'{"pid": "P37", "address": "Baby Palace, Ranni, Ittiyapara", "timefrom": "3/5/2020 11:30:00", "timeto": "3/5/2020 11:45:00"}'
b'{"pid": "P37", "address": "Unimoni/ UAE Exchange, Ranni", "timefrom": "3/5/2020 11:45:00", "timeto": "3/5/2020 12:00:00"}'
b'{"pid": "P37", "address": "SP Office, Pathanamthitta", "timefrom": "3/5/2020 12:15:00", "timeto": "3/5/2020 12:45:00"}'
b'{"pid": "P37", "address": "Royal Studio, Pathanamthitta", "timefrom": "3/5/2020 12:45:00", "timeto": "3/5/2020 13:15:00"}'
b'{"pid": "P37", "address": "Josco Jewlerry", "timefrom": "3/5/2020 13:15:00", "timeto": "3/5/2020 14:00:00"}'
b'{"pid": "P37", "address": "Ranni Gate Hospital", "timefrom": "3/5/2020 15:00:00", "timeto": "3/5/2020 15:00:00"}'
b'{"pid": "P116", "address": "Met father at Fathers House Near KNZ Function hall, Kalaburagi", "timefrom": "02/29/2020 0:00", "timeto": "02/29/2020 0:00"}'
b'{"pid": "P116", "address": "Stayed at own house Fort", "timefrom": "03/01/2020 00:00", "timeto": "03/02/2020 00:00"}

KeyboardInterrupt: 

### Read the output data written on the folder

In [15]:
jsonData = spark.read.json("../data/output/*.json")
jsonData.show(2)

+--------------------+---+-------------------+-------------------+
|             address|pid|           timefrom|             timeto|
+--------------------+---+-------------------+-------------------+
|Hotel Aryas, Kuth...|P35|29/02/2020 10:30:00|29/02/2020 11:30:00|
|  Suresh Hotel Ranni|P35|01/03/2020 21:30:00|01/03/2020 23:00:00|
+--------------------+---+-------------------+-------------------+
only showing top 2 rows

