In [0]:
import delta
import sys

sys.path.insert(0, "../lib/")   # Importando funções da Lib.

import utils

In [0]:
catalog = 'bronze'
schema = 'olist_ecommerce'
tablename = dbutils.widgets.get('tablename')
id_field = dbutils.widgets.get('id_field')
timestamp_field = dbutils.widgets.get('timestamp_field')
df_schema = utils.import_schema(tablename)  # Função que importa o schema do JSON e relaciona com a tabela 
# a depender do nome. Para conseguir os Schemas, precisei cria-los a mão no código, tive essa abordagem pois arquivos CSV # tudo é considerado texto e string, ou seja, não existe valores bool, float ou int.


In [0]:
# Esta camada de código tem o objetivo de fazer a ingestão de um arquivo Full Load na pasta de Bronze. 
# O arquivo Full Load é uma Tabela em formato CSV, que contém dados mas que precisam ser atualizados através de arquivos
# CDC (Change Data Capture).

# Nesta parte do código, precisamos primeiro ler o arquivo CSV com o Spark que logo após a leitura se transforma em um
# Dataframe. 
# No código podemos notar .option("header", "true"), utilizei esse comando pois com ele conseguimos nos comunicar
# com o Spark e dizer que o arquivo CSV contém cabeçalho, ou seja, nome de colunas presentes na primeira.

if not utils.table_exists(spark, catalog, schema, tablename): # Utilizando um IF, Se a tabela existir a função retorna 1 
# e não faz nada. Mas se a tabela não existir faz a ingestão da full load na camada de Bronze.

  print('Tabela não existente. Criando...')


  df_full_table = spark.read.format("csv").option("header", "true").schema(df_schema).load(f"/Volumes/raw/olist_ecommerce/full_load/{tablename}/")


    # Logo após a leitura preciso salvar o Dataframe no Schema de Bronze.

  (df_full_table.coalesce(1)    # Com coalesce(1), garantimos que o Spark nos entregue apenas 1 arquivo salvo.
    .write
    .format("delta")    # Formato de salvamento do arquivo.
    .mode("overwrite")  # Modo de salvamento, aqui se caso ja contesse dados no Schema o "overwrite" iria subscreve-los.
    .saveAsTable(f"{catalog}.{schema}.{tablename}"))   # Aqui é onde dizemos para o Spark o caminho de salvamento, e especicamos que no Schema o arquivo será salvo como Tabela.
  
else:
  print('Tabela já existente :/')

In [0]:
%sql
SELECT * FROM bronze.olist_ecommerce.customers

In [0]:
bronze = delta.DeltaTable.forName(spark, f"{catalog}.{schema}.{tablename}") # Aqui conseguimos fazer com que, conversamos com nossa tabela através de uma variável.

In [0]:
# Nesta camada de código preciso da importação de arquivos CDC. Onde no futuro esses arquivos que precisam ser atualizados
# serão a principal fonte de dados para a atualização da Tabela presente no Schema Bronze.

def upsert(df, deltatable):

    df.drop("modifed_at")
    df.createOrReplaceGlobalTempView(f"view_{tablename}")   # Global View pode ser acessada de qualquer sessão do Spark, pois é uma View Global.

    query = f''' 
        SELECT *
        FROM global_temp.view_{tablename}
        QUALIFY ROW_NUMBER() OVER(PARTITION BY {id_field} ORDER BY {timestamp_field} DESC) = 1
    '''     # Nesta parte do código estamos fazendo uma consulta SQL, onde tenho o objetivo de fazer um filtro 
            # com que apareça os dados mais atualizados de cada cliente presente no arquivo CDC.

    df_cdc_table = spark.sql(query)    # Transformando o resultado da consulta SQL em um Dataframe.
    
# Merge para atualizar a tabela com dados recentes dos arquivos CDC.
    (deltatable.alias("b")
           .merge(df_cdc_table.alias("c"), f"b.{id_field} = c.{id_field}")    # Fazendo um JOIN entre as tabelas.
           .whenMatchedDelete(condition = "c.OP = 'D'")      # Condição para deletar um dado presente na Tabela Bronze.
           .whenMatchedUpdateAll(condition = "c.OP = 'U'")   # Condição para atualizar um dado presente na Tabela Bronze
           .whenNotMatchedInsertAll(condition = "c.OP = 'I' OR c.OP = 'U'") # Condição para inserir um dado novo 
                                                                            # na Tabela Bronze.
           .execute()

) 


df_stream = (spark.readStream  # Importando e fazendo a leitura de dados CDC com o Spark e Streaming.
                  .format("cloudFiles")
                  .option("cloudFiles.format", "csv")
                  .schema(df_schema)
                  .load(f"/Volumes/raw/olist_ecommerce/cdc/{tablename}/")
    )

stream = (df_stream.writeStream
            .option("checkpointLocation", f"/Volumes/raw/olist_ecommerce/cdc/{tablename}_checkpoint/")
            # Pasta que controla o último progresso feito pelo stream.
            .foreachBatch(lambda df, bathID: upsert(df, bronze)) 
# Para cada lote de dados que vier pelo stream, ele aplica uma função de upsert que recebe "df"(lote de dados), 
# faz a mesclagem com Merge e salva na base bronze (deltatable).
            .trigger(availableNow=True)

)


In [0]:
start = stream.start()