#### Spark Streaming
* Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
<img src="Spark_Streaming.png" width="700">
* There are four types of Streaming Data Sources
    * Socket Source
    * Rate Source
    * File Source
    * Kafka Source
* Kafka Data Source
    * Kafka Data Source is the streaming data source for Apache Kafka in Spark Structured Streaming.
    * Kafka Data Source provides a streaming source and a streaming sink for micro-batch and continuous stream processing
<img src="Kafka_Data_Source.png" width="700">
* Micro -Batch Stream Processing vs Continuous Stream Processing
    * Kafka Data Source supports Micro-Batch Stream Processing (i.e. Trigger.Once and Trigger.ProcessingTime triggers) via KafkaMicroBatchReader.
    * Kafka Data Source supports Continuous Stream Processing (i.e. Trigger.Continuous trigger) via KafkaContinuousReader.
<img src="Structured_Streaming.png" width="700">

#### Create EMR Cluster for Running our Kafka Streaming Application
* Create Cluster
    * Go to AWS Services > EMR > Create Cluster.
    * Go to advanced options .
    * Select the release emr-6.1.0 with applications Hadoop 3.2.1, Hive 3.1.2 and Spark 3.0.0.
    <img src="Create_Cluster.png" width="700">
    * Choose 1 master and 2 core nodes with Instance Type m5a.xlarge.
    <img src="Cluster_Nodes.png" width="700">
    * Under Security Options, Select your EC2-Key-Pair to be able to SSH into your cluster and click Create Cluster.
    <img src="Advanced_Options.png" width="700">
    * Under Security Group Settings for master node configure SSH from your IP
    <img src="Security_Group.png" width="700">

#### Deploy the Spark Application as per the deployment section of "Structured Streaming + Kafka Integration Guide”
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

#### Kafka Setup 
* Download Kafka
    * Go to https://kafka.apache.org/downloads
    * Under 2.4.1, Binary downloads Choose Scala 2.12 
    * Go to shell and type
<code>
wget https://archive.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz
tar xzf kafka_2.12-0.10.2.2.tgz
mv kafka_2.12-0.10.2.2 kafka
</code>
<img src="Download_Kafka.png" width="700">
* Running Zookeeper
<code>
bin/zookeeper-server-start.sh config/zookeeper.properties
</code>
* Running Kafka broker
    * Duplicate Session and run the below command
    <code>
    bin/kafka-server-start.sh config/server.properties
    </code>
    <img src="Kafka_Broker.png" width="700">
* Create producer topic
<code>
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-a
</code>
<img src="Producer_Topic.png" width="700">
* Create consumer topic
<code>
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-b
</code>
Check topic creation
bin/kafka-topics.sh --list --zookeeper ip-172-31-34-198:2181
<img src="Topic_Creation.png" width="700">
* Launch producer job
<code>
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-a
</code>
<img src="Producer_Job.png" width="700">
* Launch consumer job
<code>
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-b --from-beginning
</code>
<img src="Consumer_Job.png" width="700">

#### PySpark Setup
* Install PySpark
<code>
pip install pyspark –-user
</code>
* Getting additional jar files
spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar
<code>
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.0.0/spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar
</code>
spark-sql-kafka-0-10_2.12-3.0.0.jar
<code>
wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0/spark-sql-kafka-0-10_2.12-3.0.0.jar
</code>
commons-pool2-2.8.0.jar
<code>
wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar
</code>
kafka-clients-0.10.2.2.jar
<code>
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.2/kafka-clients-0.10.2.2.jar
</code>

#### Running PySpark Job
* Run PySpark Kafka Streaming Code 
<code>
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 /home/hadoop/capstone/Kafka-Sink-Job.py#We provide the maven dependency in packages groupId:artifactId:version 
</code>

    * Load method returns a Streaming DataFrame
    <code>
    kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "topic-a") \
    .option("startingOffsets", "earliest") \
    .load() 
    <code>
    * Assigning a schema to a json string can be achieved using from_json() function
    <code>
    value_df = kafka_df.select(from_json(col("value").cast("string"),schema).alias("value"))
    </code>
    * writeStream method gives us the DataStreamWriter, start() method is like an action which starts a background Spark Job
    <code>
    notification_writer_query = kafka_target_df \
    .writeStream \
    .queryName("Notification Writer") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "topic-b") \
    .outputMode("append") \
    .option("checkpointLocation", "chk-point-dir") \
    .start()
    </code>
    * Start method is a non-blocking method that starts a background job and returns, so we must wait for the background job to complete
    <code>
    notification_writer_query.awaitTermination()
    </code>
    
* Input Records
Our Input Records consist of the total Confirmed Cases, Total Recovered, Total Deaths and Todays Confirmed cases of COVID-19 for the date 30-09-2020
<img src="Input_Records.png" width="700">

* Pushing first record to the producer node
    * {"Date":"30-09-2020","Province":"Alberta","Confirmed":18062,"Recovered":16213,"Deaths":267,"NumberToday":153}
    <img src="Producer_Record_1.png" width="700">
    * The record is processed in the micro-batch and pushed to the consumer node
    <img src="Consumer_Record_1.png" width="700">
    * Pushing second record to the producer node
    <img src="Producer_Record_2.png" width="700">
    * As soon as new input is received the Spark background process triggers a new micro-batch which reads the new record from producer, applies transformation and pushes the output dataframe to the consumer node
    <img src="Consumer_Record_2.png" width="700">