In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

# start spark session 
spark = SparkSession.builder \
    .appName("KafkaDebezium") \
    .getOrCreate()

In [None]:
# Define JSON schema 
schema = StructType([
    StructField("schema", StructType([
        StructField("type", StringType(), True),
        StructField("fields", ArrayType(StructType([
            StructField("type", StringType(), True),
            StructField("optional", StringType(), True),
            StructField("default", StringType(), True),
            StructField("field", StringType(), True)
        ])), True),
        StructField("optional", StringType(), True),
        StructField("name", StringType(), True),
        StructField("version", StringType(), True)
    ]), True),

    StructField("payload", StructType([
        StructField("before", StructType([
            StructField("employee_id", StringType(), True),
            StructField("timestamp", LongType(), True),
            StructField("entry_exit", StringType(), True),  # Giriş/çıkış durumu
            StructField("location", StringType(), True),
            StructField("device_id", StringType(), True),
            StructField("access_type", StringType(), True)
        ]), True),
        StructField("after", StructType([
            StructField("employee_id", StringType(), True),
            StructField("timestamp", LongType(), True),
            StructField("entry_exit", StringType(), True),  # Giriş/çıkış durumu
            StructField("location", StringType(), True),
            StructField("device_id", StringType(), True),
            StructField("access_type", StringType(), True)
        ]), True),
        StructField("source", StructType([
            StructField("version", StringType(), True),
            StructField("connector", StringType(), True),
            StructField("name", StringType(), True),
            StructField("ts_ms", LongType(), True),
            StructField("snapshot", StringType(), True),
            StructField("db", StringType(), True),
            StructField("sequence", StringType(), True),
            StructField("schema", StringType(), True),
            StructField("table", StringType(), True),
            StructField("txid", LongType(), True),
            StructField("lsn", LongType(), True),
            StructField("xmin", StringType(), True)
        ]), True),
        StructField("op", StringType(), True),
        StructField("ts_ms", LongType(), True),
        StructField("transaction", StructType([
            StructField("id", StringType(), True),
            StructField("total_order", LongType(), True),
            StructField("data_collection_order", LongType(), True)
        ]), True)
    ]), True)
])

# Read data from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "34.136.76.36:9092") \
    .option("subscribe", "dbserver1.public.turnstile_activity") \
    .load()

# Converting messages to strings
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")

# Converting JSON data to a DataFrame
json_df = kafka_df.select(from_json(col("value"), schema).alias("data"))

# Filtering only entry/exit records (create operations)
# json_df = json_df.filter(col("data.payload.op").isin("c"))  # 'c' : create

# Converting data to a DataFrame
json_df = json_df.select(
    col("data.payload.after.employee_id").alias("employee_id"),
    col("data.payload.after.timestamp").alias("timestamp"),
    col("data.payload.after.entry_exit").alias("entry_exit"),
    col("data.payload.after.location").alias("location"),
    col("data.payload.after.device_id").alias("device_id"),
    col("data.payload.after.access_type").alias("access_type")
)

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [None]:

# holding the data on ram
query = json_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("turnstile_activity") \
    .start()


In [None]:
spark.sql("SELECT * FROM turnstile_activity").show()