
# Spark Consumer


## Install Spark


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

## Set java and spark home

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

## Install Kafka Dependencies

In [None]:
!wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.7/spark-sql-kafka-0-10_2.11-2.4.7.jar
!wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.11/2.4.7/spark-streaming-kafka-0-10-assembly_2.11-2.4.7.jar

## Add Kafka Dependecies to spark shell

In [None]:
import os
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /content/spark-sql-kafka-0-10_2.11-2.4.7.jar pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /content/spark-streaming-kafka-0-10-assembly_2.11-2.4.7.jar,/content/spark-sql-kafka-0-10_2.11-2.4.7.jar pyspark-shell'

## Initialize Spark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
from pyspark.sql.functions import from_json, expr
from pyspark.sql.types import StructType, StructField, TimestampType, LongType, DoubleType, StringType

import pandas as pd

import time

import chartify


## Declare JSON Schema

In [None]:
schema = StructType([
    StructField("start", TimestampType(), True), 
    StructField("end", TimestampType(), True),
    StructField("count", LongType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("locationName", StringType(), True)
])

## Set kafka variables

In [None]:
username = "pujo"
server_ip = "34.87.150.250"
bootstrap_servers = f"{server_ip}:9092,{server_ip}:9093,{server_ip}:9094"
schema_registry_url = f"http://{server_ip}:8081"

## Create Kafka Consumer to Console

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, expr
from pyspark.sql.types import StructType, StructField, TimestampType, LongType, IntegerType, StringType
import time


class KafkaToConsoleApp:
    """
    The KafkaToConsoleApp reads records from a Kafka topic and shows them on the console.
    """

    def __init__(self, processing_time):
        self.spark = SparkSession.builder.master("local[*]").getOrCreate()
        print("Spark version is: %s" % self.spark.version)
        print(self.spark.sparkContext.getConf().getAll())
        self.processingTime = processing_time

    @staticmethod
    def write_micro_batch(micro_batch_df, batch_id):
        ts = time.localtime()
        print("Showing batch %s at %s" % (batch_id, time.strftime("%Y-%m-%d %H:%M:%S", ts)))
        micro_batch_df.orderBy("ordinal").show(truncate=False)

    def load(self, output_mode):
        self.get_events_df().writeStream \
            .outputMode(output_mode) \
            .trigger(processingTime=self.processingTime) \
            .foreachBatch(self.write_micro_batch) \
            .start()
        self.spark.streams.awaitAnyTermination()

    def get_events_df(self):
        schema = StructType([
            StructField("ordinal", LongType(), True),
            StructField("locationId", IntegerType(), True),
            StructField("timestamp", TimestampType(), True),
            StructField("amount", LongType(), True),
        ])

        # The events are watermarked on the eventTimestamp custom field (not the kafka timestamp)
        # Delay threshold indicates how much time the system will wait for new events based on the watermark
        return self.spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("subscribe", f"{username}-spark-events") \
            .load() \
            .withColumn("key", expr("string(key)")) \
            .withColumn("value", from_json(expr("string(value)"), schema)) \
            .withColumn("ordinal", expr("value.ordinal")) \
            .withColumn("locationId", expr("value.locationId")) \
            .withColumn("eventTimestamp", expr("value.timestamp")) \
            .withColumn("amount", expr("value.amount")) \
            .withWatermark(eventTime="eventTimestamp", delayThreshold="30 seconds") \
            .drop("value")

In [None]:
x = KafkaToConsoleApp(processing_time="10 seconds")
x.load(output_mode="append")

## Count Event

In [None]:
from pyspark.sql.functions import window
import time

class KafkaEventCountApp(KafkaToConsoleApp):
    """
    The KafkaEventCountApp reads records from a Kafka topic, calculates the aggregate count grouped by time window and
    shows the result on the screen
    """

    @staticmethod
    def write_micro_batch(micro_batch_df, batch_id):
        ts = time.localtime()
        print("Showing batch: %s, at %s" % (batch_id, time.strftime("%Y-%m-%d %H:%M:%S", ts)))
        micro_batch_df.orderBy(micro_batch_df["window.start"]).show(truncate=False)

    def get_events_df(self):
        events_df = super().get_events_df()
        count_df = events_df.groupBy(window(events_df["eventTimestamp"], "60 seconds")).count()
        return count_df

In [None]:
x = KafkaEventCountApp(processing_time="10 seconds")
x.load("complete")