# Movendo os dados do Kafka para o Delta

Na primeira etapa, você enviou os dados de clientes, pedidos e itens de pedidos do Postgres para o Kafka. O Apache Kafka armazena as mensagens em estrutura de tópicos, de forma confivel e escalável. Agora, na segunda etapa, iremos ler os dados do Kafka e armazenar na tabela bronze no Delta Lake. A tabela bronze armazena os dados brutos, sem nenhuma transformação, para que posteriormente seja possível aplicar qualquer transformação sobre os dados, sem perder a origem da informação. 

## Preparando o ambiente

O código abaixo adiciona a **raiz** do projeto, que contém códigos e dados necessários para o "Hands on".

In [1]:
root = '/home/bigdata/jupyterhub'

import sys
sys.path.append(root)

wd = '/delta'

O trecho de código abaixo prepara o ambiente, carregando códigos auxiliares e dados de configuração.O código disponível no pacote *commom.utils* na classe *DataframeUtils* contém vários métodos que facilitam a leitura e escrita dos dados do Postgres. A classe *DataframeUtils* também inicia uma instância do Apache Spark com o Delta Lake integrado ao Spark.

Já o arquivo *config.yaml* tem os dados de acesso ao Postgres e Kafka.

In [2]:
import yaml

from common.utils import DataframeUtils
import pyspark.sql.functions as F

config = yaml.safe_load(open('../config.yaml'))
dfu = DataframeUtils(config)

## Leitura dos dados do Kafka e escrita no Delta Lake

O método *process* facilita a leitura dos dados do Kafka utilizando o Apache Spark. Com o trecho do código abaixo, os dados são lidos do tópico do Kafka e armazenados no Dataframe **df** (de forma lazy). Para ler os dados Kafka, tem que passar como argumento o *format('kafka')* para indicar que é uma leitura do Kafka, o host do servidor boostrap, o tópico que irá ler as mensagens e o offset para indicar a partir de onde irá ler as mensagens.

```
df = dfu.spark() \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', kafka) \
        .option('subscribe', topic) \
        .option('startingOffsets', 'earliest') \
        .option("maxOffsetsPerTrigger", max_triggers) \
        .load()
```

Os dados lidos do Kafka estão no formato **JSON** e, por isso, tem que ser transformados para o formato tabular para poder inserir no formato Delta. Por isso, a chave **key** do Kafka torna uma coluna do Delta, com nome *key*, que identifica a tupla e o conteúdo da mensagem em **value** é escrito em outra coluna com o nome *value*. Como os dados do Kafka chegam no formato de *stream*, eles são armazenados utilizando a função **writeStream**, que vai recebendo os dados do Kafka, fazendo a transformação para o formato tabular e armazenando na tabela Gold. O trecho `option('mergeSchema', 'true')` indica que se houver mudanças no esquema de dados, será feito um "merge" dos esquemas no Delta Lake. O Delta Lake armazena o histórico de alterações dos dados e permite indicar o diretório de checkpoint através da opção `.option('checkpointLocation', checkpoint_dir)`.

```
df
 .withColumn('key', F.col('key').cast('string'))
 .withColumn('value', F.col('value').cast('string'))
 .writeStream
 .option('mergeSchema', 'true')
 .format('delta')
 .outputMode('append')
 .option('checkpointLocation', checkpoint_dir)
 .start(output_dir)
```


In [3]:
def process(topic, output_dir, checkpoint_dir, kafka, max_triggers:int=1000):
    df = dfu.spark() \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', kafka) \
        .option('subscribe', topic) \
        .option('startingOffsets', 'earliest') \
        .load()

    return (df
     .withColumn('key', F.col('key').cast('string'))
     .withColumn('value', F.col('value').cast('string'))
     .writeStream
     .option('mergeSchema', 'true')
     .format('delta')
     .outputMode('append')
     .option('checkpointLocation', checkpoint_dir)
     .start(output_dir))

Nos próximos três trechos de código, serão escritos os dados de cliente em *'/delta/data/clientes-bronze'* e os dados de pedidos em *'/delta/data/pedidos-bronze'*.

In [4]:
kafka = dfu.config()['kafka']['host']

In [5]:
dsc = process('clientes', wd + '/data/clientes-bronze', wd + '/checkpoints/clientes-checkpoint', kafka, 1000)

In [14]:
dsi = process('itens', wd + '/data/itens-bronze', wd + '/checkpoints/itens-checkpoint', kafka, 1000)

# Exercício

Agora é com **você**. Neste exercício você irá utilizar os códigos apresentados acima como exemplo para fazer a **leitura dos dados no Kafka e escrita dos dados com o Delta Lake**.

**Não será permitido utilizar a função process()**. Você deverá construir seu próprio código utilizando os dados fornecidos abaixo:

In [None]:
spark = dfu.spark()
topic = 'itens'
output_dir = wd + '/data/itens-bronze'
checkpoint_dir = wd + '/checkpoints/itens-checkpoint'
max_triggers=1000

No trecho de código abaixo, você deverá ler o fluxo de dados do tópico do Kafka e armazenar no Dataframe Spark **df_itens**.

In [None]:
df_itens = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', kafka) \
        .option('subscribe', topic) \
        .option('startingOffsets', 'earliest') \
        .option("maxOffsetsPerTrigger", max_triggers) \
        .load()

No trecho de código abaixo, você deverá escrever o fluxo de dados que está no Dataframe *df_itens* no formato **delta**.

In [None]:
dsi = df_itens \
     .withColumn('key', F.col('key').cast('string')) \
     .withColumn('value', F.col('value').cast('string')) \
     .writeStream \
     .option('mergeSchema', 'true') \
     .format('delta') \
     .outputMode('append') \
     .option('checkpointLocation', checkpoint_dir) \
     .start(output_dir)

## Monitorando o fluxo de dados
Os códigos abaixo monitoram o fluxo de dados que está sendo processado nos Dataframes Spark e escritos no formato Delta. 

In [13]:
dsp.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [None]:
dsc.status

In [24]:
dsi.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [None]:
dsc.lastProgress

In [None]:
dsp.lastProgress

In [None]:
dsi.lastProgress

In [None]:
dfu.print_streaming_chart(dsc)

In [None]:
dfu.print_streaming_chart(dsp)

In [None]:
dfu.print_streaming_chart(dsi)