# Prerrequisites

Installing Spark and Apache Kafka Library in VM


---



In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz
!tar xf spark-3.0.2-bin-hadoop3.2.tgz
!pip -q install findspark

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop3.2"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.8.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'

In [3]:
import findspark
findspark.init()

Exception: Unable to find py4j, your SPARK_HOME may not be configured correctly

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# create the session
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .getOrCreate()

spark.version

'3.0.2'

Creating tunnel</br>
**To Check the Spark UI, open the URL printed by running the above command : https://######/jobs/, /SQL/**


In [5]:
 from google.colab.output import eval_js
 print(eval_js("google.colab.kernel.proxyPort(4040)"))

https://ojoa47pzdja-496ff2e9c6d22116-4040-colab.googleusercontent.com/


In [6]:
BROKER_IP = "ec2-100-24-117-5.compute-1.amazonaws.com"

# Bronze

## Customers Table

In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_customers = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", BROKER_IP + ":39092") \
  .option("subscribe", "stream_customers_json") \
  .load()

In [8]:
schema_customers = StructType(
    [
     StructField('ID', StringType(), True),
     StructField('NAME', StringType(), True),
     StructField('__DELETED', StringType(), True)
    ]
)

In [9]:
dataset_customers = (
    df_customers
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .withColumn("value", from_json("value", schema_customers)) \
    .select(col('value.ID').cast(IntegerType()).alias("id"), col("value.NAME").alias("name"))
    .withColumn("timestamp_customer", current_timestamp())
)

In [10]:
dataset_customers.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- timestamp_customer: timestamp (nullable = false)



In [11]:
spark.sql("DROP TABLE IF EXISTS customers_bronze")

DataFrame[]

In [12]:
!rm -Rf /delta/customers_bronze

In [13]:
spark.sql("""
CREATE TABLE customers_bronze
(id INT, name STRING, timestamp_customer TIMESTAMP)
USING delta
LOCATION '/delta/customers_bronze'
""")

DataFrame[]

In [14]:
# https://docs.delta.io/latest/delta-update.html#-merge-in-streaming&language-python

from delta.tables import *

deltaTable_customers_bronze = DeltaTable.forPath(spark, "/delta/customers_bronze")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertCustomersToDelta(microBatchOutputDF, batchId):
  deltaTable_customers_bronze.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.id = t.id") \
      .whenMatchedUpdateAll() \
      .whenNotMatchedInsertAll() \
      .execute()  

In [15]:
dataset_customers.writeStream \
  .format("delta") \
  .foreachBatch(upsertCustomersToDelta) \
  .outputMode("update") \
  .queryName("Customers_query") \
  .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f78e9a8cad0>

**Wait for a seconds until delta data is updated in the previous step**

In [41]:
spark.sql("SELECT * FROM customers_bronze LIMIT 10").show(truncate = False)

+---+---------+----------------------+
|id |name     |timestamp_customer    |
+---+---------+----------------------+
|1  |Alexander|2021-03-18 08:55:35.83|
|4  |Honorato |2021-03-18 08:55:35.83|
|2  |Lawrence |2021-03-18 08:55:35.83|
|3  |Andrew   |2021-03-18 08:55:35.83|
|5  |Darius   |2021-03-18 08:55:35.83|
+---+---------+----------------------+



## Orders Table

In [17]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_orders = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", BROKER_IP + ":39092") \
  .option("subscribe", "stream_orders_json") \
  .load()

In [18]:
schema_orders = StructType(
    [
     StructField('ID', StringType(), True),
     StructField('PRODUCT', StringType(), True),
     StructField('QUANTITY', StringType(), True),
     StructField('CUSTOMER_ID', StringType(), True),
     StructField('__DELETED', StringType(), True)
    ]
)

In [19]:
dataset_orders = (
    df_orders
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .withColumn("value", from_json("value", schema_orders)) \
    .select(col('value.ID').cast(IntegerType()).alias("id"), col("value.PRODUCT").cast(IntegerType()).alias("product_id"), col("value.QUANTITY").cast(IntegerType()).alias("quantity"), col("value.CUSTOMER_ID").cast(IntegerType()).alias("customer_id"))
    .withColumn("timestamp_order", current_timestamp())
)

In [20]:
spark.sql("DROP TABLE IF EXISTS orders_bronze")

DataFrame[]

In [21]:
!rm -Rf /delta/orders_bronze

In [22]:
spark.sql("""
CREATE TABLE orders_bronze
(id INT, product_id INT, quantity INT, customer_id INT, timestamp_order TIMESTAMP)
USING delta
LOCATION '/delta/orders_bronze'
""")

DataFrame[]

In [23]:
# https://docs.delta.io/latest/delta-update.html#-merge-in-streaming&language-python

from delta.tables import *

deltaTable_orders_bronze = DeltaTable.forPath(spark, "/delta/orders_bronze")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertOrdersToDelta(microBatchOutputDF, batchId):
  deltaTable_orders_bronze.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.id = t.id") \
      .whenMatchedUpdateAll() \
      .whenNotMatchedInsertAll() \
      .execute()  

In [24]:
dataset_orders.writeStream \
  .format("delta") \
  .foreachBatch(upsertOrdersToDelta) \
  .outputMode("update") \
  .queryName("Orders_query") \
  .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f78e9a32c50>

**Wait for a seconds until delta data is updated in the previous step**

In [25]:
spark.sql("SELECT * FROM orders_bronze LIMIT 10").show(truncate = False)

+---+----------+--------+-----------+---------------+
|id |product_id|quantity|customer_id|timestamp_order|
+---+----------+--------+-----------+---------------+
+---+----------+--------+-----------+---------------+



# Silver

## Joining customers and orders

**Streaming from orders table**

In [26]:
df_customers_delta = spark.read.format("delta").load("/delta/customers_bronze")


In [27]:
df_orders_delta = (spark.readStream.format("delta")  
  .option("startingVersion", "0")  
  .load("/delta/orders_bronze")
)

In [28]:
orders_customers = (df_customers_delta
                    .join(df_orders_delta, df_customers_delta.id == df_orders_delta.customer_id)
                    .select(df_orders_delta.customer_id, df_customers_delta.name, df_orders_delta.product_id, df_orders_delta.quantity, df_orders_delta.timestamp_order)                    
)

In [29]:
spark.sql("DROP TABLE IF EXISTS customers_orders_silver")

DataFrame[]

In [30]:
!rm -Rf /delta/customers_orders_silver

In [31]:
spark.sql("""
CREATE TABLE IF NOT EXISTS customers_orders_silver 
(customer_id INT, name STRING, product_id INT, quantity INT, timestamp_order TIMESTAMP) 
USING DELTA 
LOCATION '/delta/customers_orders_silver'
""")

DataFrame[]

In [32]:
(orders_customers.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/customers_orders_silver/_checkpoints/customers_orders_silver-json")
  .queryName("Join_to_silver_query")
  .start("/delta/customers_orders_silver"))

<pyspark.sql.streaming.StreamingQuery at 0x7f78e9a980d0>

**Wait for a seconds until delta data is updated in the previous step**

In [33]:
spark.sql("SELECT * FROM customers_orders_silver LIMIT 10").show(truncate = False)

+-----------+----+----------+--------+---------------+
|customer_id|name|product_id|quantity|timestamp_order|
+-----------+----+----------+--------+---------------+
+-----------+----+----------+--------+---------------+



## Updating customers to silver table

**Updates in customer tables won't be propagated into customers_orders_silver table. </br>To fix that, it's necessary to merge customers into customers_orders_silver table**

In [34]:
# https://docs.delta.io/latest/delta-update.html#-merge-in-streaming&language-python

from delta.tables import *

deltaTable_customers_orders_silver = DeltaTable.forPath(spark, "/delta/customers_orders_silver")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):

  latestChangeForEachKey = microBatchOutputDF \
    .selectExpr("id", "struct(name, timestamp_customer) as otherCols") \
    .groupBy("id") \
    .agg(max("otherCols").alias("latest")) \
    .select("id", "latest.*") \

  deltaTable_customers_orders_silver.alias("t").merge(
      latestChangeForEachKey.alias("s"),
      "s.id = t.customer_id") \
    .whenMatchedUpdate(
      set = {
        "name": "s.name"
      }
    ).whenNotMatchedInsert(
      values = {
        "customer_id": "s.id",
        "name": "s.name",
        "product_id": "null",
        "quantity": "null",
        "timestamp_order": "null"
      }
    ).execute()

In [35]:
df_customers_delta_streaming = (spark.readStream.format("delta")  
  .option("startingVersion", "0")
  .option("ignoreChanges", True)
  .load("/delta/customers_bronze")
)

In [36]:
# Write the output of a streaming aggregation query into Delta table
df_customers_delta_streaming.writeStream \
  .format("delta") \
  .foreachBatch(upsertToDelta) \
  .outputMode("update") \
  .queryName("Customers_to_silver_query") \
  .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f78e9a98f50>

In [37]:
spark.sql("SELECT * FROM customers_orders_silver").show(truncate = False)

+-----------+----+----------+--------+---------------+
|customer_id|name|product_id|quantity|timestamp_order|
+-----------+----+----------+--------+---------------+
+-----------+----+----------+--------+---------------+



# Gold

In [43]:
customer_grouped = spark.sql("""
SELECT customer_id, name, count(*) as count FROM customers_orders_silver
GROUP BY customer_id, name
""")

customer_grouped.show()

+-----------+---------+-----+
|customer_id|     name|count|
+-----------+---------+-----+
|          4| Honorato|   32|
|          2| Lawrence|   19|
|          1|Alexander|   13|
|          5|   Darius|   17|
|          3|   Andrew|   19|
+-----------+---------+-----+



In [44]:
import plotly.express as px

fig = px.pie(customer_grouped.toPandas(), values='count', names='name', title='By Customer')
fig.show()

**Stoping all Streaming queries**

In [40]:
#for query in spark.streams.active:
#  query.stop()