'org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0'

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col

scala_version = '2.13'
spark_version = '3.4.3'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.0'
]

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("kafka-example-1")\
    .config("spark.jars.packages", ",".join(packages))\
    .config("spark.jars", "./jars/postgresql-42.6.2.jar")\
    .getOrCreate()

spark

In [5]:
import os
from dotenv import load_dotenv, find_dotenv

load_dotenv()

#Kafka
TOPIC = os.getenv('TOPIC')
BOOTSTRAP_SERVERS = os.getenv('BOOTSTRAP_SERVERS')

# Postgres
HOST = os.getenv('HOST')
PORT =  os.getenv('PORT')
TABLE = os.getenv('TABLE')
URL = os.getenv('URL')
DRIVER = os.getenv('DRIVER')
USER = os.getenv('USER')
PWD = os.getenv('PWD')

In [8]:
df = spark.readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)\
  .option("subscribe", TOPIC)\
  .option("startingOffsets", "earliest")\
  .load()

In [9]:
from pyspark.sql.types import *

json_schema = StructType() \
    .add("gender", StringType()) \
    .add("name", StructType()
         .add("title", StringType())
         .add("first", StringType())
         .add("last", StringType())) \
    .add("location", StructType()
         .add("street", StructType()
              .add("number", StringType())
              .add("name", StringType()))
         .add("city", StringType())
         .add("state", StringType())
         .add("country", StringType())
         .add("postcode", StringType())
         .add("coordinates", StructType()
              .add("latitude", StringType())
              .add("longitude", StringType()))
         .add("timezone", StructType()
              .add("offset", StringType())
              .add("description", StringType()))) \
    .add("email", StringType()) \
    .add("login", StructType()
         .add("uuid", StringType())
         .add("username", StringType())
         .add("password", StringType())
         .add("salt", StringType())
         .add("md5", StringType())
         .add("sha1", StringType())
         .add("sha256", StringType())) \
    .add("dob", StructType()
         .add("date", StringType())
         .add("age", StringType())) \
    .add("registered", StructType()
         .add("date", StringType())
         .add("age", StringType())) \
    .add("phone", StringType()) \
    .add("cell", StringType()) \
    .add("id", StructType()
         .add("name", StringType())
         .add("value", StringType())) \
    .add("picture", StructType()
         .add("large", StringType())
         .add("medium", StringType())
         .add("thumbnail", StringType())) \
    .add("nat", StringType())


In [10]:
parsed_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), json_schema).alias("parsed_value")).select("parsed_value.*")

# Transform data
flattened_df = parsed_df.select(
    col("gender"),
    col("name.first").alias("first_name"),
    col("name.last").alias("last_name"),
    col("location.city").alias("city"),
    col("location.country").alias("country"),
    col("email"),
    col("login.username").alias("username"),
    col("registered.date").cast(TimestampType()).alias("registered_date"),
    col("dob.date").cast(TimestampType()).alias("dob"),
    col("phone")
)

In [11]:
# query = flattened_df \
#     .writeStream \
#     .format("json") \
#     .outputMode("append") \
#     .option("path", "output") \
#     .option("checkpointLocation", "checkpoint") \
#     .option("failOnDataLoss", "false") \
#     .trigger(processingTime="1 minute") \
#     .start()

# query.awaitTermination()

In [7]:
def write_to_postgresql(df,epoch_id):
    df.write \
    .format('jdbc') \
    .options(url=URL,
            driver=DRIVER,
            dbtable=TABLE,
            user=USER,
            password=PWD,
            ) \
    .mode('append') \
    .save()

In [14]:
query = flattened_df \
    .writeStream \
    .foreachBatch(write_to_postgresql)\
    .outputMode('update') \
    .option("checkpointLocation", "checkpoint") \
    .option("failOnDataLoss", "false") \
    .trigger(processingTime="1 minute") \
    .start()

In [15]:
query.awaitTermination()

phần bên dưới dùng python bình thường chạy consumer ok nhưng pyspark phía trên ko đc => lỗi không phải do kafka

In [None]:
import json 
from kafka import KafkaConsumer

consumer = KafkaConsumer(
        'user-topic',
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest'
    )
for message in consumer:
    print(json.loads(message.value))