In [1]:
"""
22520960 Phan Phuoc Loc Ngoc
22521486 Huynh La Viet Toan
22520675 Nguyen Anh Khoa
"""
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

scala_version = '2.12'  # your scala version
spark_version = '3.5.5' # your spark version
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.5.0' #your kafka version
]
spark = SparkSession.builder.master("local").appName("kafka-example").config("spark.jars.packages", ",".join(packages)).getOrCreate()
spark

In [33]:
from pyspark.sql.functions import from_json, col, udf
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, StructField
import logging
import os
import base64

@udf(StringType())
def decode_and_save_video(video_data, video_name):
    try:
        if video_data is None or video_name is None:
            logging.error("video_data or video_name is None")
            return None
        
        # Tạo đường dẫn đầy đủ để lưu video
        output_dir = r"D:\\video_test"
        full_path = os.path.join(output_dir, video_name)

        # Giải mã base64 và lưu file
        video_bytes = base64.b64decode(video_data)
        os.makedirs(os.path.dirname(full_path), exist_ok=True)
        with open(full_path, "wb") as f:
            f.write(video_bytes)
        
        return full_path
    except Exception as e:
        logging.error(f"Error decoding and saving video to {video_name}: {e}")
        return None

In [35]:

# Đọc dữ liệu từ Kafka
schema = StructType([
            StructField("label", StringType()),
            StructField("video_name", StringType()),
            StructField("video_path", StringType()),
            StructField("video_data", StringType())
        ])
# Đọc từ Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "quickstart-topic") \
    .option("startingOffsets", "latest") \
    .load()

# Parse JSON
json_df = df.select(
            from_json(col("value").cast("string"), schema).alias("data")
        ).select("data.*")
# df_parsed = df_raw.selectExpr("CAST(value AS STRING) as json_str") \
#     .select(from_json(col("json_str"), schema).alias("data")) \
#     .select("data.*")
decoded_df = json_df.withColumn("saved_video_path", decode_and_save_video(col("video_data"), col("video_name")))



In [37]:
# Tạo queryName ngẫu nhiên để tránh trùng
from random import randint
randNum = str(randint(0, 10000))
table_name = f"trainStreamTable_{randNum}"

# Ghi dữ liệu vào memory table
query = decoded_df.writeStream \
    .queryName(table_name) \
    .format("memory") \
    .outputMode("append") \
    .trigger(processingTime="5 seconds") \
    .start()

In [5]:
from time import sleep
from IPython.display import display, clear_output

In [38]:
for x in range(0, 2000):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x * 5}")

        # Truy vấn từ memory table
        result1 = spark.sql(f"SELECT * FROM {table_name}")

        # Hiển thị bằng Pandas (ví dụ trong notebook)
        display(result1.toPandas())

        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break

print("Live view ended...")

Showing live view refreshed every 5 seconds
Seconds passed: 45


Unnamed: 0,label,video_name,video_path,video_data,saved_video_path
0,horrible,0001.mp4,D:\bigdata_dataset\home\anhkhoa\spark_video_st...,AAAAGGZ0eXBtcDQyAAAAAG1wNDFpc29tAAAAKHV1aWRcpw...,D:\\video_test\0001.mp4
1,horrible,0002.mp4,D:\bigdata_dataset\home\anhkhoa\spark_video_st...,AAAAGGZ0eXBtcDQyAAAAAG1wNDFpc29tAAAAKHV1aWRcpw...,D:\\video_test\0002.mp4
2,horrible,0003.mp4,D:\bigdata_dataset\home\anhkhoa\spark_video_st...,AAAAGGZ0eXBtcDQyAAAAAG1wNDFpc29tAAAAKHV1aWRcpw...,D:\\video_test\0003.mp4
3,horrible,0004.mp4,D:\bigdata_dataset\home\anhkhoa\spark_video_st...,AAAAGGZ0eXBtcDQyAAAAAG1wNDFpc29tAAAAKHV1aWRcpw...,D:\\video_test\0004.mp4
4,horrible,0005.mp4,D:\bigdata_dataset\home\anhkhoa\spark_video_st...,AAAAGGZ0eXBtcDQyAAAAAG1wNDFpc29tAAAAKHV1aWRcpw...,D:\\video_test\0005.mp4
5,horrible,0006.mp4,D:\bigdata_dataset\home\anhkhoa\spark_video_st...,AAAAGGZ0eXBtcDQyAAAAAG1wNDFpc29tAAAAKHV1aWRcpw...,D:\\video_test\0006.mp4
6,horrible,0007.mp4,D:\bigdata_dataset\home\anhkhoa\spark_video_st...,AAAAGGZ0eXBtcDQyAAAAAG1wNDFpc29tAAAAKHV1aWRcpw...,D:\\video_test\0007.mp4


break
Live view ended...
