In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
import pyspark.sql.functions as f

In [3]:
spark = SparkSession.builder \
    .appName("KafkaSparkStructuredStreaming") \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0') \
    .getOrCreate()

In [4]:
# Definir o schema para os dados
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("cc_num", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("event_datetime", StringType(), True),
    StructField("event_unix_time", StringType(), True),
    StructField("category", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("value", DoubleType(), True),
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True)
])

In [5]:
# Criar um DataFrame representando os dados no Kafka
kafka_df = ( 
    spark.read
    .format("kafka")
    .option("kafka.bootstrap.servers", f"broker:29092")
    .option("subscribe", "com.miserani.credit_events")
    .option("startingOffsets", "earliest")
    .option("endingOffsets", "latest")
    .load()
)

In [6]:
# Deserializar os valores do Kafka
kafka_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [7]:
kafka_df.show(truncate=False, n=1, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 key   | 89719338548                                                                                                                                                                                                                                                                                                                 
 value | {'cpf': 89719338548, 'cc_num': 5476993877334938, 'first': 'Diogo', 'last': 'Castro', 'trans_num': 444435866330, 'trans_date': '2023-07-18 00:02:44', 'trans_time': '2023-07-18 00:02:44', 'unix_time': 1181605647, 'category': 'grocery', 'merchant': 'da Costa', 'value': 92.36, 'lat': '60.596218', 'long': '155.246136'} 
only showing top 1 row

In [8]:
# Converter os dados JSON em um DataFrame
events_df = kafka_df.select(f.from_json("value", schema).alias("values")).select("values.*")

In [9]:
events_df.show(truncate=False, n=1, vertical=True)

-RECORD 0-------------------------
 cpf        | 89719338548         
 cc_num     | 5476993877334938    
 first      | Diogo               
 last       | Castro              
 trans_num  | 444435866330        
 trans_date | 2023-07-18 00:02:44 
 trans_time | 2023-07-18 00:02:44 
 unix_time  | 1181605647          
 category   | grocery             
 merchant   | da Costa            
 value      | 92.36               
 lat        | 60.596218           
 long       | 155.246136          
only showing top 1 row



In [10]:
# Criar um DataFrame representando os dados no Kafka
kafka_stream_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", f"broker:29092")
    .option("subscribe", "com.miserani.credit_events")
    .load()
)

In [11]:
kafka_stream_df = kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [12]:
events_stream_df = kafka_stream_df.select(f.from_json("value", schema).alias("values")).select("values.*")

In [13]:
def print(df, _):
    df.show(truncate=False)

In [14]:
query_print = events_stream_df.writeStream \
    .outputMode("append") \
    .foreachBatch(print) \
    .start()

+---+------+-----+----+---------+----------+----------+---------+--------+--------+-----+---+----+
|cpf|cc_num|first|last|trans_num|trans_date|trans_time|unix_time|category|merchant|value|lat|long|
+---+------+-----+----+---------+----------+----------+---------+--------+--------+-----+---+----+
+---+------+-----+----+---------+----------+----------+---------+--------+--------+-----+---+----+



In [56]:
from collections import Counter
from IPython.display import display, clear_output
import folium
global_markers = []

def update_map(spark_df, epoch_id):
    df = spark_df.toPandas()
    
    for _, row in df.iterrows():
        global_markers.append((float(row['lat']), float(row['long']), row['merchant'], row['category']))
    
    m = folium.Map(location=[0, 0], zoom_start=2)
    
    category_counter = Counter([marker[3] for marker in global_markers])
    for lat, long, merchant, category in global_markers:
        popup_content = f"Merchant: {merchant}<br>Category: {category}<br>Top Categories in Region: {category_counter.most_common(3)}"
        folium.Marker([lat, long], tooltip=category, popup=popup_content).add_to(m)
    
    clear_output(wait=True)
    if len(global_markers) > 0:
        display(m)

In [57]:
query_map = events_stream_df.writeStream \
    .outputMode("append") \
    .foreachBatch(update_map) \
    .start()

In [None]:
query_print.awaitTermination()
query_map.awaitTermination()