In [None]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import from_json, col 
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql import DataFrame


# 1. create SparkSession 
spark = SparkSession.builder \
    .appName("KafkaSparkStreamingETL") \
    .master("local[2]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .getOrCreate()

# 2. Matikan pemanggilan native Windows API untuk Hadoop (biar gak error nativeio)
spark.sparkContext._jsc.hadoopConfiguration().set("hadoop.native.lib", "false")

# 3. read from kafka 
kafka_stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9093") \
    .option("subscribe", "server.public.region, server.public.territory") \
    .option("startingOffsets", "earliest") \
    .load()

# 4. Ambil kolom value & convert ke string
kafka_df = kafka_stream_df.selectExpr("CAST(value AS STRING)")

# Schema yang sesuai struktur Kafka JSON (dengan schema & payload)
territory_kafka_schema = StructType([
    StructField("schema", StructType([])),  # Optional, kita abaikan
    StructField("payload", StructType([
        StructField("territoryid", IntegerType(), False),
        StructField("territorydescription", StringType(), True),
        StructField("regionid", IntegerType(), True)
    ]))
])


# Define schema sesuai struktur kafka JSON 
region_kafka_schema = StructType([
    StructField("schema", StructType([])),  # Optional, kita abaikan
    StructField("payload", StructType([
        StructField("regionid", IntegerType(), False),
        StructField("regiondescription", StringType(), True)
    ]))
])

# Parse JSON dari Kafka messages menjadi DataFrame
territory_parsed_df = kafka_df \
    .filter(col('value').contains('"territoryid"')) \
    .select(from_json(col('value'), territory_kafka_schema).alias('json')) \
    .select("json.payload.*")
# region_parsed_df = kafka_df2 \
#     .filter(col('value').contains('"regionid"')) \
#     .select(from_json(col('value'), region_kafka_schema).alias('json')) \
#     .select("json.payload.*")


query = territory_parsed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

