In [1]:
# 1. Ler o tópico do kafka “topic-kvspark” em modo batch
from pyspark.sql.functions import *


In [2]:
# batch
topic_read = spark.read\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("subscribe","topic-kvspark")\
    .load()

**2. Visualizar o schema do tópico**

In [3]:
# o topico criado aceita todos os comandos em batch
# para o streaming temos que alterar o key e value para string
# 2. Visualizar o schema do tópico

topic_read.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



**3. Visualizar o tópico com o campo key e value convertidos em string**

In [4]:
# 3. Visualizar o tópico com o campo key e value convertidos em string
# Mostrando o topico em binario
topic_string = topic_read.select(col("key"),col("value"))
topic_string.show()

+-------+--------------------+
|    key|               value|
+-------+--------------------+
|   [31]|       [4D 73 67 31]|
|   [31]|       [4D 53 47 32]|
|   [33]|                [64]|
|   [34]|                [73]|
|   [37]|[61 73 64 66 2C 3...|
|   [38]|[48 65 6C 6C 6F 2...|
|   [39]|[56 61 69 20 63 7...|
|   [31]|          [6D 73 67]|
|   [31]|                [61]|
|   [31]|                [62]|
|   [31]|                [63]|
|   [31]|                [64]|
|   [31]|                [65]|
|   [31]|             [65 66]|
|   [31]|                [66]|
|   [31]|             [67 2C]|
|   [34]|                [61]|
|   [34]|                [35]|
|[64 34]|             [64 35]|
|   null|[74 65 73 74 65 2...|
+-------+--------------------+
only showing top 20 rows



In [5]:
# Alterando para STRING e mostrando mensagem em string
topic_string = topic_read.select(col("key").cast("string"),col("value").cast("string"))
topic_string.show()

+----+-------------+
| key|        value|
+----+-------------+
|   1|         Msg1|
|   1|         MSG2|
|   3|            d|
|   4|            s|
|   7|  asdf,75,fds|
|   8|  Hello World|
|   9|Vai curintcha|
|   1|          msg|
|   1|            a|
|   1|            b|
|   1|            c|
|   1|            d|
|   1|            e|
|   1|           ef|
|   1|            f|
|   1|           g,|
|   4|            a|
|   4|            5|
|  d4|           d5|
|null|    teste 123|
+----+-------------+
only showing top 20 rows



**4. Ler o tópico do kafka “topic-kvspark” em modo streaming**

In [6]:
# Lendo em modo Streaming
topic_read_streaming = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("subscribe","topic-kvspark")\
    .option("startingOffsets","earliest")\
    .load()

**5. Visualizar o schema do tópico em streaming**

In [7]:
topic_read_streaming.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



**6. Alterar o tópico em streaming com o campo key e value convertidos para string**

In [8]:
# Alterando o Key e Valeu para String
# nao tem como usar o Show pois esta em streaming
# tem que salvar e usar o start no console

topic_string_streaming = topic_read_streaming.select(col("key").cast("string"),col("value").cast("string"))
#topic_string_streaming.show()

**7. Salvar o tópico em streaming no tópico topic-kvspark-output a cada 5 segundos**

In [9]:
# Leitura
topic_string_stream_output = topic_string_streaming.writeStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("topic","topic-kvspark-output")\
    .option("checkpointLocation","/user/marcos/kafka_checkpoint1")\
    .trigger(continuous="5 second")\
    .start()
# criar um console pra poder aparecer o topic topic-kvspark-output, pois esta lendo apartir da criação do topico e não os
# antigos

In [10]:
topic_string_stream_output.status

{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [11]:
# testando leitura do novo topico criado usando o batch
topic_read2 = spark.read\
    .format("kafka")\
    .option("kafka.bootstrap.servers","kafka:9092")\
    .option("subscribe","topic-kvspark-output")\
    .load()

In [12]:
topic_string2 = topic_read2.select(col("key").cast("string"),col("value").cast("string"),"partition","offset")
topic_string2.show()

+----+--------------------+---------+------+
| key|               value|partition|offset|
+----+--------------------+---------+------+
|null|           teste 123|        0|     0|
|null|              marcos|        0|     1|
|null|testando novo top...|        0|     2|
|null|leitura stream de...|        0|     3|
|   2|                MSG3|        0|     4|
|   1|                Msg1|        0|     5|
|   1|                MSG2|        0|     6|
|   3|                   d|        0|     7|
|   4|                   s|        0|     8|
|   7|         asdf,75,fds|        0|     9|
|   8|         Hello World|        0|    10|
|   9|       Vai curintcha|        0|    11|
|   1|                 msg|        0|    12|
|   1|                   a|        0|    13|
|   1|                   b|        0|    14|
|   1|                   c|        0|    15|
|   1|                   d|        0|    16|
|   1|                   e|        0|    17|
|   1|                  ef|        0|    18|
|   1|    

**8. Salvar o tópico na pasta hdfs://namenode:8020/user/<nome>/Kafka/topic-kvspark-output**

In [None]:
# caso necessite parar a sessão 
topic_string_stream_output.stop()

In [19]:
topic_string_stream_output = topic_string_streaming.writeStream\
    .format("csv")\
    .option("checkpointLocation","/user/marcos/kafka_checkpoint2")\
    .option("path","/user/marcos/kafka/topic_kvspark-output")\
    .start()

In [20]:
!hdfs dfs -ls /user/marcos/kafka/topic_kvspark-output/

Found 3 items
drwxr-xr-x   - root supergroup          0 2021-07-07 00:55 /user/marcos/kafka/topic_kvspark-output/_spark_metadata
-rw-r--r--   2 root supergroup        161 2021-07-07 00:55 /user/marcos/kafka/topic_kvspark-output/part-00000-fc551508-67a5-4613-a0e7-55777eebc1b2-c000.csv
-rw-r--r--   2 root supergroup        129 2021-07-07 00:55 /user/marcos/kafka/topic_kvspark-output/part-00001-880b2dd8-21d3-4c09-97a3-7a4880140ee7-c000.csv


In [21]:
!hdfs dfs -cat /user/marcos/kafka/topic_kvspark-output/part-00000-fc551508-67a5-4613-a0e7-55777eebc1b2-c000.csv

1,Msg1
1,MSG2
3,d
4,s
7,"asdf,75,fds"
8,Hello World
9,Vai curintcha
1,msg
1,a
1,b
1,c
1,d
1,e
1,ef
1,f
1,"g,"
4,a
4,5
d4,d5
"",teste 123
"",testando novo topico
