# Processamento de streaming

Este notebook é utilizado para processar streaming recebido por um tópico de input do kafka e envia os dados processados para um tópico de output.

Importação das bibliotecas e argumentos do spark

In [1]:
import findspark
import os

findspark.init()

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

Criação do SparkSession

In [3]:
spark = SparkSession.builder \
    .appName("KafkaApp") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5") \
    .getOrCreate()

Parâmetros dos tópicos do Kafka

In [4]:
kafka_input_config = {
    "kafka.bootstrap.servers" : "kafka:9092",
    "subscribe" : "input",
    "startingOffsets" : "latest",
    "failOnDataLoss" : "false"
}
kafka_output_config = {
    "kafka.bootstrap.servers" : "kafka:9092",
    "topic" : "output",
    "checkpointLocation" : "./check.txt"
}

Schema de input

In [5]:
df_schema = StructType([
  StructField("transaction_id", StringType(), True),
  StructField("custumer_id", StringType(), True),
  StructField("amount", IntegerType(), True),
  StructField("transaction_timestamp", TimestampType(), True),
  StructField("merchant_id", StringType(), True)
])

Processamento

In [6]:
# leitura do stream

df = spark \
  .readStream \
  .format("kafka") \
  .options(**kafka_input_config) \
  .load() \
  .select(F.from_json(F.col("value").cast("string"), df_schema).alias("json_data")) \
  .select("json_data.*")

# aplica regra de negócio

df = df.select(F.col("custumer_id"), F.col("amount"),
        F.col("transaction_timestamp"), F.col("merchant_id"),
        F.when(
          ((F.col("amount") >= 15) & \
          (F.dayofweek(F.col("transaction_timestamp")) == 6) & \
          (F.col("merchant_id") == "MerchantX")),
        (F.col("amount") * 0.15)).otherwise(0).alias("cashback"))

# Cria o output

output_df = df.select(F.to_json(F.struct(*df.columns)).alias("value"))

write = output_df \
          .writeStream \
          .format("kafka") \
          .options(**kafka_output_config) \
          .start()

write.awaitTermination()
          

KeyboardInterrupt: 