# Kafka / Spark Streaming Assessment

Following is my VINSYS id: bigdata10
<br>I have attached the code path for each question. Please find it below.

__Abstract__
<br>The assessment is based on Spark + Kafka, basic Python, streaming of records, performing a real time analytics using On Premise Kafka and Spark.
<br>Total 3 assessments.

## Implement a Kafka + Spark Integration.
|Column|Type|Description|
|:--|:--|:--|
|order_id|Integer|random number, use random generator to generate id upto 100000|
|item_id|String|random number, use random generator to generate id up to 100, convert to string|
|price|Int|Random generator value from 1 to 50|
|qty|Int|Random generator, 1 to 10|
|state|String|User state, use USA statecodes, pick random state on every publish|

# Q2
Implement a Spark Stream, that subscribes to data from Kafka topic `orders` [from Q1] and sums the amount [price * qty] based on state, on every `5 minutes` window interval.
<br>The result shall be published to Kafka with (state, amount).
<br>For example, if the customers ordered 10 items from NY and CO, with-in last 5 minutes, then we will sum all items purchased on NY [sum (item.qty * item.price)] and CO [sum (item.qty * item.price)]

a. publish the consolidated output [State name, Amount] to Kafka topic [“statewise_earning”] in json format.

b. Publish the consolidated output [State name, Amount] to Amazon RDS table using JDBC in append mode

<br><br>

In [1]:
import findspark
findspark.init()

Consume from `orders` topic.
<br>Calculate aggregate `sum(qty * price)` grouped by `state` every `5 min` + print data to console.
<br>Publish the aggregated values back to Kafka as JSON.

```bash
# Run consumer to listen on messages from `orders`
$kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic orders


# Create a topic called `statewise_earning` to post aggregated data
$kafka-topics.sh  --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic statewise_earning
```

In [2]:
from pyspark.sql import SparkSession

ss = (
    SparkSession.builder.master("local[*]")
    .appName("SparkStream:KafkaOrders").getOrCreate()
)

In [3]:
"""
Read from Kafka topic 'orders'
"""

df_orders = (
    ss.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "192.168.93.128:9092")
    .option("subscribe", "orders")
    .load()
)

In [4]:
"""
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
"""

df_orders.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
"""
I need only 2 things: timestamp + value
DeSerialize just the byte-stream value as only this is needed for further use.
"""

ordersJsonRawDf = df_orders.selectExpr("timestamp", "CAST(value AS STRING)")
ordersJsonRawDf.printSchema() # we get only value as string

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: string (nullable = true)



In [6]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DoubleType


"""
JSON is an object, Spark DF needs a schema
""" 
schema = StructType(
        [
            StructField("order_id", IntegerType(), True),
            StructField("item_id", StringType(), True),
            StructField("price", DoubleType(), True),
            StructField("qty", IntegerType(), True),
            StructField("state", StringType(), True),
        ]
)


"""
Replace just a JSON-string with a JSON-object-with-schema
Now, value is a single column, it contains a struct
"""
jsonDf = ordersJsonRawDf.withColumn("value", F.from_json("value", schema))


"""
Extract JSON object elements as individual columns
"""
ordersDf = jsonDf.select("timestamp", F.col("value.*"))

In [7]:
"""
Q: What are the state-wise sales for the last 300 seconds/5 minutes?
"""

ordersDf = ordersDf.withColumn("Amount", F.col("qty") * F.col("price") )


"""
I'm interested only in aggregating the sales by state for the last 5mins.
"""
windowedAmountSum_interim = (
    ordersDf
    .groupBy("state", F.window(ordersDf.timestamp, "5 minutes", "5 minutes"))
    .agg(F.sum("Amount").alias("Amount"))
)

windowedAmountSum = (
    windowedAmountSum_interim
    .selectExpr("to_json(struct(*)) AS value")
    .selectExpr("CAST(value as STRING)")
)

# windowedAmountSum = (
#     ordersDf
#     .groupBy("state", F.window(ordersDf.timestamp, "300 seconds", "300 seconds"))
#     .agg(F.sum("Amount").alias("Amount"))
#     .selectExpr("to_json(struct(*)) AS value")
#     .selectExpr("CAST(value as STRING)")
# )

### Print to console + publish

In [8]:

"""
Print to console to check what you'll be publishing next.
"""
echoOnconsole = (
    windowedAmountSum
    .writeStream
    .outputMode("complete")
    .format("console")
    .start() # start the query. spark will subscribe to this data
)


"""
Publish the aggregated results to a new topic `statewise_earning` to be consumed by other consumers.
"""
(
    windowedAmountSum
    .writeStream
    .format("kafka")
    .outputMode("complete")
    .option("kafka.bootstrap.servers", "192.168.93.128:9092")
    .option("topic", "statewise_earning")
    .option("checkpointLocation", "file:///c:/spark/temp")
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x243376a03c8>

<br>

## Write/append to RDS

Make sure this is already run in PostgreSQL:
```sql
CREATE TABLE IF NOT EXISTS statewise_earning(state varchar(100), Amount real);
```

In [9]:
# windowed_report_df = (
#     windowedAmountSum
#     .withWatermark("delivered_time", "24 hours")
#     .groupBy('source_id', window('delivered_time', '5 minute'))
#     .count()
# )

In [None]:
windowed_state_amount = windowedAmountSum_interim.selectExpr("state", "Amount")


def sparkstream_to_postgres(df, epoch_id):
    (
        df.write
        .mode('append')
        .format('jdbc')
        .option("url", "jdbc:postgresql://bond-psql.ckzprcrersc2.us-east-2.rds.amazonaws.com:5432/productdb")
        .option("driver", "org.postgresql.Driver")
        .option("dbtable", "statewise_earning")
        .option("user", "postgres")
        .option("password", "AwCT$May21")
        .save()
    )

(
    windowed_state_amount.writeStream
    .foreachBatch(sparkstream_to_postgres)
    .option("checkpointLocation", 'file:///c:/spark/rds')
    .outputMode('update')
    .start()
    .awaitTermination()
)

In [None]:
"""
Straight saving stream-data DOESN'T WORK

(
    windowedAmountSum
    .select("state", "Amount")
    .writeStream
    .format("jdbc")
    .option("url", "jdbc:postgresql://bond-psql.ckzprcrersc2.us-east-2.rds.amazonaws.com:5432/productdb")
    .option("driver", "org.postgresql.Driver")
    .option("dbtable", "statewise_earning")
    .option("user", "postgres")
    .option("password", "AwCT$May21")
    .outputMode("append")
    .start()
    .awaitTermination()
)
"""