# Lab 9.1: MultiQueryDemo

## Tổng quan bài tập
**Đề bài**: Ở bài Lab này, bạn sẽ được hướng dẫn các thao tác về việc Đọc dữ liệu Stream từ Kafka cũng như cách ghi dữ liệu ở cả dạng File Sink và Kafka Sink

## Tài nguyên
Do bài Lab này liên quan đến đến xử lý dữ liệu Stream với Kafka, vậy nên bạn sẽ cần cài đặt Kafka, bạn có thể tham khảo video sau về cách cài đặt:
- [Cài đặt Kafka](https://funix.udemy.com/course/spark-streaming-using-python/learn/lecture/21955580#overview)

Bạn sẽ cần tải các Kafka Script ở [link sau](https://drive.google.com/file/d/1RMiLzXVTiRlNsvfF3I63ylKVGjBXt7AK/view?usp=sharing) để có thể sử dụng Kafka.


Bạn cũng sẽ cần tải các dữ liệu ở [link sau](https://drive.google.com/file/d/18RrmmF1h2-HS6wvteZ5lPcxs4iPCm5CB/view?usp=sharing) để có thể kiểm thử cho bài Lab. 


Ngoài ra, bạn có thể tham khảo các video sau trong trường hợp chưa hiểu cách làm bài Lab:
- [Multi-query Streams Application](https://funix.udemy.com/course/spark-streaming-using-python/learn/lecture/21955620#overview)

Import các Package cần thiết

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType

Khởi tạo Spark Session

In [None]:
spark = SparkSession \
	.builder \
	.appName([...]) \
	.master([...]) \
	.config("spark.streaming.stopGracefullyOnShutdown", "true") \
	.getOrCreate()

Tạo schema cho dữ liệu đầu vào

In [None]:
schema = StructType([
	StructField("InvoiceNumber", StringType()),
	StructField("CreatedTime", LongType()),
	StructField("StoreID", StringType()),
	StructField("PosID", StringType()),
	StructField("CashierID", StringType()),
	StructField("CustomerType", StringType()),
	StructField("CustomerCardNo", StringType()),
	StructField("TotalAmount", DoubleType()),
	StructField("NumberOfItems", IntegerType()),
	StructField("PaymentMethod", StringType()),
	StructField("CGST", DoubleType()),
	StructField("SGST", DoubleType()),
	StructField("CESS", DoubleType()),
	StructField("DeliveryType", StringType()),
	StructField("DeliveryAddress", StructType([
		StructField("AddressLine", StringType()),
		StructField("City", StringType()),
		StructField("State", StringType()),
		StructField("PinCode", StringType()),
		StructField("ContactNumber", StringType())
	])),
	StructField("InvoiceLineItems", ArrayType(StructType([
		StructField("ItemCode", StringType()),
		StructField("ItemDescription", StringType()),
		StructField("ItemPrice", DoubleType()),
		StructField("ItemQty", IntegerType()),
		StructField("TotalValue", DoubleType())
	]))),
])

Hãy hoàn thiện các phần `[...]` để hoàn thiện đoạn code và giải quyết bài toán theo yêu cầu.

In [None]:
# Đọc dữ liệu từ Kafka
kafka_df = spark.readStream \
	.format([...]) \
	.option("kafka.bootstrap.servers", [...]) \
	.option("subscribe", "invoices") \
	.option("startingOffsets", "earliest") \
	.load()

# Chuyển dữ liệu từ dạng JSON về MapType()
value_df = kafka_df.select([...].alias("value"))

# Xử lý dữ liệu
notification_df = value_df.select("value.InvoiceNumber", "value.CustomerCardNo", "value.TotalAmount") \
	.withColumn("EarnedLoyaltyPoints", expr("TotalAmount * 0.2"))

# Ghi dữ liệu ở dạng Kafka Sink
# Chuyển đổi Dataframe về dạng key - value
kafka_target_df = notification_df.selectExpr([...])

notification_writer_query = kafka_target_df \
	.writeStream \
	.queryName("Notification Writer") \
	.format([...]) \
	.option("kafka.bootstrap.servers", [...]) \
	.option("topic", "notifications") \
	.outputMode([...]) \
	.option("checkpointLocation", [...]) \
	.start()

# Trích xuất các dữ liệu
explode_df = value_df.selectExpr("value.InvoiceNumber", "value.CreatedTime", "value.StoreID",
									"value.PosID", "value.CustomerType", "value.PaymentMethod", "value.DeliveryType",
									"value.DeliveryAddress.City",
									"value.DeliveryAddress.State", "value.DeliveryAddress.PinCode",
									"explode(value.InvoiceLineItems) as LineItem")

flattened_df = explode_df \
	.withColumn("ItemCode", expr("LineItem.ItemCode")) \
	.withColumn("ItemDescription", expr("LineItem.ItemDescription")) \
	.withColumn("ItemPrice", expr("LineItem.ItemPrice")) \
	.withColumn("ItemQty", expr("LineItem.ItemQty")) \
	.withColumn("TotalValue", expr("LineItem.TotalValue")) \
	.drop("LineItem")


# Ghi dữ liệu dưới dạng File Sink
invoice_writer_query = flattened_df.writeStream \
	.format([...]) \
	.queryName("Flattened Invoice Writer") \
	.outputMode([...]) \
	.option("path", "output") \
	.option("checkpointLocation", [...]) \
	.start()

print("Waiting for Queries")
spark.streams.awaitAnyTermination()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
# import os

spark = SparkSession \
	.builder \
	.appName("MultiQueryDemo") \
	.master("local") \
	.config("spark.streaming.stopGracefullyOnShutdown", "true") \
	.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
	.getOrCreate()

schema = StructType([
	StructField("InvoiceNumber", StringType()),
	StructField("CreatedTime", LongType()),
	StructField("StoreID", StringType()),
	StructField("PosID", StringType()),
	StructField("CashierID", StringType()),
	StructField("CustomerType", StringType()),
	StructField("CustomerCardNo", StringType()),
	StructField("TotalAmount", DoubleType()),
	StructField("NumberOfItems", IntegerType()),
	StructField("PaymentMethod", StringType()),
	StructField("CGST", DoubleType()),
	StructField("SGST", DoubleType()),
	StructField("CESS", DoubleType()),
	StructField("DeliveryType", StringType()),
	StructField("DeliveryAddress", StructType([
		StructField("AddressLine", StringType()),
		StructField("City", StringType()),
		StructField("State", StringType()),
		StructField("PinCode", StringType()),
		StructField("ContactNumber", StringType())
	])),
	StructField("InvoiceLineItems", ArrayType(StructType([
		StructField("ItemCode", StringType()),
		StructField("ItemDescription", StringType()),
		StructField("ItemPrice", DoubleType()),
		StructField("ItemQty", IntegerType()),
		StructField("TotalValue", DoubleType())
	]))),
])


In [3]:
# Đọc dữ liệu từ Kafka
kafka_df = spark.readStream \
	.format("kafka") \
	.option("kafka.bootstrap.servers", "localhost:9092") \
	.option("subscribe", "invoices") \
	.option("startingOffsets", "earliest") \
	.load()
