# Exercícios - Structured Streaming com Kafka

In [1]:
# importando bibliotecas que vamos usar
from pyspark.sql.functions import *

In [21]:
#1. Ler o tópico do kafka "topic_kvspark" em modo batch
topic_read = spark.read\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("subscribe","topic-kvspark")\
    .load()

In [3]:
#2. Visualizar o schema do tópico
topic_read.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# 3. Visualizar o tópico com o campo key e value convertidos em string
topic_string = topic_read.select(col("key").cast("string"),col("value").cast("string"))
topic_string.show(5)

+---+-----+
|key|value|
+---+-----+
|  1| MSg1|
|  8| Msg8|
|  3| Msg3|
| 13|Msg13|
| 30|Msg30|
+---+-----+
only showing top 5 rows



In [14]:
#4. Ler o tópico do kafka "topic_kvspark" em modo streaming
topic_read_stream = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("subscribe","topic-kvspark")\
    .load()

## opção de ler desde o início do topic
#    .option("startingOffsets, earliest")

In [16]:
#5. Visualizar o schema do tópico em streaming
topic_read_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [18]:
#6. Alterar o tópico em streaming com o campo key e value convertidos para string
topic_string_stream = topic_read_stream.select(col("key").cast("string"),col("value").cast("string"))

## observação que por enquanto não conseguimos mostrar, por que está em Streaming e no Jupyter não conseguimos monstrar.

In [19]:
#7. Salvar o tópico em streaming no tópico topic-kvspark-output a cada 5 segundos
topic_string_stream_output = topic_string_stream.writeStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("topic","topic-kvspark-output")\
    .option("checkpointLocation","/user/jader/kafka_checkpoint")\
    .trigger(continuous='5 second')\
    .start()

In [20]:
## para testar podemos visualizar os status
topic_string_stream_output.status

{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [23]:
#8. Salvar o tópico na pasta hdfs://namenode:8020/user/<nome>/Kafka/topic-kvspark-output
topic_string_stream_output = topic_string_stream.writeStream\
    .format("parquet")\
    .option("checkpointLocation","/user/jader/kafka_checkpoint2")\
    .option("path","/user/jader/kafka/topic-kvspark-output")\
    .start()

In [25]:
# conferindo no HDFS se está salvo
!hdfs dfs -ls /user/jader/kafka/topic-kvspark-output/

Found 1 items
drwxr-xr-x   - root supergroup          0 2022-08-05 10:47 /user/jader/kafka/topic-kvspark-output/_spark_metadata
