In [4]:
import pandas as pd
import numpy as np
import json
import random

In [1]:
spark.stop()

In [3]:
scala_version = '2.12' 
spark_version = '3.5.1'

spark = (SparkSession.builder
            .appName("Consumer")
            .config("spark.jars", ",".join([
            "/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar",
            "/opt/spark/jars/kafka-clients-3.2.0.jar",
            "/opt/spark/jars/commons-pool2-2.12.0.jar",
            "/opt/spark/jars/spark-streaming-kafka-0-10-assembly_2.12-3.5.1.jar",
            "/opt/spark/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar"
            ]))
            .getOrCreate()
        )

spark.sparkContext.setLogLevel("WARN")

24/08/27 08:14:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
print(spark.sparkContext.getConf().get("spark.jars"))

/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar,/opt/spark/jars/kafka-clients-3.2.0.jar,/opt/spark/jars/commons-pool2-2.12.0.jar,/opt/spark/jars/spark-streaming-kafka-0-10-assembly_2.12-3.5.1.jar,/opt/spark/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar


In [5]:
class Consumer():
    def __init__(self):
        self.BOOTSTRAP_SERVER = "192.168.1.53:9092"
        self.base_dir = "/home/iceberg/notebooks"
        self.topic = "botPredict"
        self.tableName = "bot_db.bot_ddvc_hcm_bot_predict"

    def get_schema(self):
        from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
        schema = StructType([
            StructField("ID", StringType(), True),
            StructField("BOT_ID", StringType(), True),
            StructField("TEXT", StringType(), True),
            StructField("INTENT_CONFIDENCE", StringType(), True),
            StructField("INTENT_NAME", StringType(), True),
            StructField("STEP", IntegerType(), True),
            StructField("NLU_THRESHOLD", StringType(), True),
            StructField("SENDER_ID", StringType(), True),
            StructField("SOURCE", StringType(), True),
            StructField("CREATED_TIME", TimestampType(), True),
            StructField("LAST_UPDATED_TIME", TimestampType(), True),
            StructField("ID_CHATLOG", StringType(), True),
            StructField("UPDATED_INTENT", StringType(), True),
            StructField("LEN_CARD_DATA", IntegerType(), True),
            StructField("STATUS_DELETE", StringType(), True),
            StructField("STATUS_CONFIRM", StringType(), True),
            StructField("INTENT_MAP_CLICK_BUTTON", StringType(), True)
        ])
        return schema

    def ingest_from_kafka(self):
        import pyspark.sql.functions as f
        kafka_df = (spark.readStream
                        .format("kafka")
                        .option("kafka.bootstrap.servers", self.BOOTSTRAP_SERVER)
                        .option("subscribe", f"{self.topic}")
                        .option("startingOffsets", "earliest")
                        .option("maxOffsetsPerTrigger", 10)
                        .load() 
        )
        return kafka_df

    def get_kafka_message(self, kafka_df):
        from pyspark.sql.functions import from_json, cast, to_timestamp
        raw_df = kafka_df.select(
            kafka_df.key.cast("string").alias("key"),
            from_json(kafka_df.value.cast("string"), self.get_schema()).alias("value"),
            "topic",
            "timestamp"            
        )
        
        return raw_df

    def get_quality_df(self, raw_df):
        from pyspark.sql.functions import from_json, cast, to_timestamp
        predict_df = raw_df.select("value.*").filter(raw_df.value.ID != "ID")
        predict_df = predict_df.withColumn("CREATED_TIME", to_timestamp("CREATED_TIME", "dd-MMM-yy hh.mm.ss.SSSSSSSSS a")) \
                            .withColumn("LAST_UPDATED_TIME", to_timestamp("LAST_UPDATED_TIME", "dd-MMM-yy hh.mm.ss.SSSSSSSSS a"))

        return predict_df
        
    def clean(self):
        import shutil
        shutil.rmtree(f"{self.base_dir}/checkpoints/iceberg-consumer")

    def process(self):
        print("Start streaming...", end='')
        kafka_df = self.ingest_from_kafka()
        raw_df = self.get_kafka_message(kafka_df)
        predict_df = self.get_quality_df(raw_df)
        # self.clean()
        sQuery = (predict_df.writeStream
                      .format("iceberg")
                      .queryName("iceberg-ingestion")
                      .option("checkpointLocation", f"{self.base_dir}/checkpoints/iceberg-consumer")
                      .outputMode("append")
                      .trigger(processingTime = "5 seconds")
                      .toTable(f"{self.tableName}")
        )
        print("Done")
        return sQuery 
    


In [6]:
consumer = Consumer()
consumer.clean()

In [9]:
consumer = Consumer()
sQuery = consumer.process()


Start streaming...Done


24/08/27 08:14:42 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/08/27 08:14:42 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

In [21]:
for query in spark.streams.active:
    query.stop()

In [8]:
%%sql
CREATE TABLE IF NOT EXISTS bot_db.bot_ddvc_hcm_bot_predict (
    `ID` STRING,
    `BOT_ID` STRING,
    `TEXT` STRING,
    `INTENT_CONFIDENCE` STRING,
    `INTENT_NAME` STRING,
    `STEP` INT,
    `NLU_THRESHOLD` STRING,
    `SENDER_ID` STRING,
    `SOURCE` STRING,
    `CREATED_TIME` TIMESTAMP,
    `LAST_UPDATED_TIME` TIMESTAMP,
    `ID_CHATLOG` STRING,
    `UPDATED_INTENT` STRING,
    `LEN_CARD_DATA` INT,
    `STATUS_DELETE` STRING,
    `STATUS_CONFIRM` STRING,
    `INTENT_MAP_CLICK_BUTTON` STRING
) USING iceberg
PARTITIONED BY (days(`CREATED_TIME`));

In [13]:
%%sql
SELECT count(*) FROM bot_db.bot_ddvc_hcm_bot_predict

count(1)
296


In [22]:
%%sql
SELECT * FROM bot_db.bot_ddvc_hcm_bot_predict.snapshots

committed_at,snapshot_id,parent_id,operation,manifest_list,summary
2024-08-27 08:10:28.707000,1440165704216497019,,append,s3://warehouse/bot_db/bot_ddvc_hcm_bot_predict/metadata/snap-1440165704216497019-1-4357d42f-b151-4fd3-b7e8-642f5791df91.avro,"{'spark.app.id': 'local-1724746144551', 'changed-partition-count': '0', 'total-equality-deletes': '0', 'total-position-deletes': '0', 'total-delete-files': '0', 'spark.sql.streaming.epochId': '0', 'total-files-size': '0', 'spark.sql.streaming.queryId': '9950349d-e6e0-4541-a862-d86b7ac52e9e', 'total-records': '0', 'total-data-files': '0'}"


In [7]:
%%sql
DROP TABLE bot_db.bot_ddvc_hcm_bot_predict

24/08/27 08:14:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
