In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()
print(spark.version)

3.5.0


In [4]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType


# JSON schema tanımı oluşturma
schema = StructType([
    StructField("schema", StructType([
        StructField("type", StringType(), True),
        StructField("fields", StructType([
            StructField("type", StringType(), True),
            StructField("optional", StringType(), True),
            StructField("default", StringType(), True),
            StructField("field", StringType(), True)
        ]), True),
        StructField("optional", StringType(), True),
        StructField("name", StringType(), True),
        StructField("version", StringType(), True)
    ]), True),

    StructField("payload", StructType([
        StructField("before", StructType([
            StructField("id", IntegerType(), True),
            StructField("first_name", StringType(), True),
            StructField("last_name", StringType(), True),
            StructField("email", StringType(), True),
            StructField("phone", StringType(), True),
            StructField("created_at", LongType(), True)
        ]), True),
        StructField("after", StructType([
            StructField("id", IntegerType(), True),
            StructField("first_name", StringType(), True),
            StructField("last_name", StringType(), True),
            StructField("email", StringType(), True),
            StructField("phone", StringType(), True),
            StructField("created_at", LongType(), True)
        ]), True),
        StructField("source", StructType([
            StructField("version", StringType(), True),
            StructField("connector", StringType(), True),
            StructField("name", StringType(), True),
            StructField("ts_ms", LongType(), True),
            StructField("snapshot", StringType(), True),
            StructField("db", StringType(), True),
            StructField("sequence", StringType(), True),
            StructField("schema", StringType(), True),
            StructField("table", StringType(), True),
            StructField("txId", LongType(), True),
            StructField("lsn", LongType(), True),
            StructField("xmin", LongType(), True)
        ]), True),
        StructField("op", StringType(), True),
        StructField("ts_ms", LongType(), True),
        StructField("transaction", StructType([
            StructField("id", StringType(), True),
            StructField("total_order", LongType(), True),
            StructField("data_collection_order", LongType(), True)
        ]), True)
    ]), True)
])

In [5]:
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "34.13.181.106:9092") \
    .option("subscribe", "dbserver1.public.customers") \
    .load()

# Mesajları string'e dönüştürme
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")

# JSON verilerini DataFrame'e dönüştürme
json_df = kafka_df.select(from_json(col("value"), schema).alias("data"))

# Sadece UPDATE işlemlerini filtreleme
json_df = json_df.filter(col("data.payload.op") == "u")

# Verileri tablo haline getirme
json_df = json_df.select(
    col("data.payload.after.id").alias("id"),
    col("data.payload.after.first_name").alias("first_name"),
    col("data.payload.after.last_name").alias("last_name"),
    col("data.payload.after.email").alias("email"),
    col("data.payload.after.phone").alias("phone"),
    col("data.payload.after.created_at").alias("created_at")
)

In [None]:
json_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("customers") \
    .start()

In [12]:
spark.sql("SELECT * FROM customers").show()

+---+----------+---------+------------------+-------+----------------+
| id|first_name|last_name|             email|  phone|      created_at|
+---+----------+---------+------------------+-------+----------------+
|  3|    Alice1|     lsdd|example3@email.com|1591515|1748633568152619|
+---+----------+---------+------------------+-------+----------------+

