1. Ler o tópico do kafka “topic-kvspark” em modo batch
2. Visualizar o schema do tópico
3. Visualizar o tópico com o campo key e value convertidos em string
4. Ler o tópico do kafka “topic-kvspark” em modo streaming
5. Visualizar o schema do tópico em streaming
6. Alterar o tópico em streaming com o campo key e value convertidos para string
7. Salvar o tópico em streaming no tópico topic-kvspark-output a cada 5 segundos
8. Salvar o tópico na pasta hdfs://namenode:8020/user/<nome>/Kafka/topic-kvspark-output


In [1]:
#necessario importar as functions do SQl função "col"
from pyspark.sql.functions import *

In [2]:
# fazer a leitura do topic kafka com formato kafka, localhost(kafka:9092),qual topico(topic-kvspark)
topic_read = spark.read\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:9092")\
    .option("subscribe","topic-kvspark")\
    .load()

In [3]:
#mostrar o schema
topic_read.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
#lembre se de importa a functions do sql
#ler os campos key e value(utilizar o .cast para converter de binario para string) e mostrar com show
topic_string = topic_read.select(col("key").cast("string"),col("value").cast("string"))
topic_string.show()

+----+-----+
| key|value|
+----+-----+
|   1| Msg1|
|   3| Msg3|
|   4| Msg4|
|null| msg2|
|null| msg4|
|null| msg6|
|null| msg8|
|null|msg10|
|null|  ola|
|null|  ola|
|   2| Msg2|
|   5| Msg5|
|   6| Msg6|
|null| msg1|
|null| msg3|
|null| msg5|
|null| msg7|
|null| msg9|
|null|  ola|
|null|  ola|
+----+-----+



In [20]:
#lendo novamente os dados do kafka, só que agora com stream
#.option("startingOffsets","earliest")\ #---> para  ler os topicos desde o inicio que foram gerados

topic_read_stream = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:9092")\
    .option("subscribe","topic-kvspark")\
    .option("startingOffsets","earliest")\
    .load()

In [21]:
#mostrar o schema
topic_read_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
#AGORA COM STREAM
#lembre se de importa a functions do sql
#ler os campos key e value(utilizar o .cast para converter de binario para string) e mostrar com show
topic_string_stream = topic_read_stream.select(col("key").cast("string"),col("value").cast("string"))


In [22]:
# Criar um topico no kafka topic-kvspark-output para as mensagens de saida
topic_string_stream_output = topic_string_stream.writeStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:9092")\
    .option("topic","topic-kvspark-output")\
    .option("checkpointLocation", "/user/ronnan/kafka_checkpoint_csv3")\
    .trigger(continuous='5 second')\
    .start()

In [23]:
# verificar o topico do kafka qual seu status atual 
topic_string_stream_output.status

{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [None]:
# verificar se o topic leu corretamente após a execução no stream de saida
topic_read2 = spark.read\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:9092")\
    .option("subscribe","topic-kvspark-output")\
    .load()

In [None]:
#verificar2
topic_string2 = topic_read2.select(col("key").cast("string"),col("value").cast("string"),"partition","offset")
topic_string2.show()


In [11]:
#salvando o topic no diretorio do hdfs no formato csv
topic_string_stream_output = topic_string_stream.writeStream\
    .format("csv")\
    .option("checkpointLocation", "/user/ronnan/kafka_checkpoint_csv1")\
    .option("path","/user/ronnan/kafka/topic-kvspark-output")\
    .start()

In [12]:
#verificando se o topic foi salvo no diretorio do hdfs
!hdfs dfs -ls /user/ronnan/kafka/topic-kvspark-output

Found 6 items
drwxr-xr-x   - root supergroup          0 2022-04-18 22:15 /user/ronnan/kafka/topic-kvspark-output/_spark_metadata
-rw-r--r--   2 root supergroup          0 2022-04-18 22:07 /user/ronnan/kafka/topic-kvspark-output/part-00000-14e8bddf-3695-428a-bde7-42c5cb20aac4-c000.csv
-rw-r--r--   2 root supergroup          7 2022-04-18 22:15 /user/ronnan/kafka/topic-kvspark-output/part-00000-ae7ef6b4-6a9e-4c5b-8c5a-7d754d8b63b8-c000.csv
-rw-r--r--   2 root supergroup          7 2022-04-18 22:15 /user/ronnan/kafka/topic-kvspark-output/part-00000-b1165005-0ae8-4d4c-af22-5dd14809c2c6-c000.csv
-rw-r--r--   2 root supergroup          7 2022-04-18 22:15 /user/ronnan/kafka/topic-kvspark-output/part-00001-2fde33b5-ba2c-4bed-a290-6a16a2095aec-c000.csv
-rw-r--r--   2 root supergroup          7 2022-04-18 22:15 /user/ronnan/kafka/topic-kvspark-output/part-00001-98958cef-7dcc-4334-a8f6-21dd488c67bd-c000.csv


In [None]:
!hdfs dfs -cat /user/ronnan/kafka/topic-kvspark-output/*.csv