In [1]:
#Importando bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, TimestampType


In [2]:
#Criação da sessão do spark
spark = SparkSession.builder \
    .appName("KafkaToParquet") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()


In [3]:
#Definição schema esperado
schema = StructType() \
    .add("id", StringType()) \
    .add("user_id", StringType()) \
    .add("name", StringType()) \
    .add("description", StringType()) \
    .add("price", StringType()) \
    .add("department", StringType()) \
    .add("image", StringType()) \
    .add("createdAt", TimestampType())


In [None]:
"""
Leitura continua do kafka

-conecta com broker
-escuta o topico "products"
-caputra apenas das mensagens mais recentes
"""
raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "products") \
    .option("startingOffsets", "latest") \
    .load()
    
"""
-Tranformamos em texto os arquivo binarios que vem do kafka
-interpretamos como json e aplica o schema
-extrai cada campo do json como a coluna do dataframe definido
"""
json_df = raw_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

"""
Escrita continua
-define a saida sera continua em streaming
-grava em formato parquet
-usamos o checkpointLocation para controlar o processo e evitar duplicações
"""
query = json_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "/home/jovyan/work/parquet_output") \
    .option("checkpointLocation", "/home/jovyan/work/checkpoint") \
    .start()


"""
Espera do processo terminar
- mantem o processo rodando infinitamente até cancelarmos manualmente
"""
query.awaitTermination()