# Imports, inicialização dos logs e Workspace Client

In [0]:
pip install databricks-labs-dqx

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()

In [0]:
import logging
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.labs.dqx.engine import DQEngine
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum
import yaml

logging.basicConfig(level = logging.INFO)
logger = logging.getLogger(__name__)

ws_client = WorkspaceClient()


# Funções de qualidade de dados

In [0]:

# Função que analisa os dados da tabela e retorna suas estatísticas
def profile_data(input_df):
    try:
        profiler = DQProfiler(ws_client)
        summary_stats, profiles = profiler.profile(input_df)
        logger.info("Profiling OK")
        return summary_stats, profiles
    except Exception as e:
        logger.error(F'erro no profiling: {str(e)}')
        raise

# Função que gera as regras de qualidade com base nos perfis gerados
def generator_dq_check(profiles):
    try:
        generator = DQGenerator(ws_client)
        checks = generator.generate_dq_rules(profiles)
        logger.info("Generating DQ Check OK")
        return checks
    except Exception as e:
        logger.error(F'erro no generator: {str(e)}')   
        raise


# Função que aplica as regras de qualidade na tabela
def apply_quality_checks(input_df, checks):
    try:
        dq_engine = DQEngine(ws_client)
        silver_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
        logger.info("Checagens aplicadas com sucesso")
        return silver_df, quarantine_df
    except Exception as e:
        logger.error(f"Erro na aplicação de checagens: {str(e)}")
        raise

# Lendo a tabela criada

In [0]:
logger.info("Lendo a tabela de entrada")

try:
    df_inicial = spark.table('workspace.default.vendas_livros')
    logger.info(f'Tabela lida com sucesso')
except Exception as e:
    logger.error(f'Erro ao ler a tabela: {str(e)}')

df_inicial.show()

+----------+--------+--------------+--------+---------+--------------------+----------+--------------------+------+
|data_venda|id_venda|id_linha_venda|id_livro|qtde_item|          nome_livro| categoria|               autor| preco|
+----------+--------+--------------+--------+---------+--------------------+----------+--------------------+------+
|2025-02-04|    4073|          4403|       5|        2|        Dom Casmurro|    Ficção|    Machado de Assis| 86.81|
|2024-08-25|    5136|          5466|       5|        4|        Dom Casmurro|    Ficção|    Machado de Assis|131.48|
|2024-12-18|    NULL|          4485|       2|        5|                1984|    Ficção|       George Orwell| 92.92|
|2025-05-08|    2451|          2781|       9|        2|A Revolução dos B...|    Ficção|       George Orwell|139.69|
|2024-12-29|    4615|          4945|       1|        1|A Menina que Roub...|    Ficção|        Markus Zusak| 92.76|
|2025-02-27|    2825|          3155|       8|        5|  O Pequeno Prínc

In [0]:
# Criacao do schema de tabelas bronze
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")

DataFrame[]

In [0]:
# salvamento da tabela no schema bronze
df_inicial.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("bronze.vendas_livros")


# Testes de qualidade na tabela utilizando o DQX

In [0]:
logger.info("Iniciando o Profiling")

summary_stats, profiles = profile_data(df_inicial)

for coluna, estatistica in summary_stats.items():
    print(f'{coluna}: {estatistica}')

for i in profiles: print(i)


id_venda: {'count': 1000, 'mean': 4574.755308392316, 'stddev': 2822.6645544642715, 'min': 2, '25%': 2109, '50%': 4698, '75%': 6900, 'max': 9657, 'count_non_null': 989, 'count_null': 11}
id_linha_venda: {'count': 1000, 'mean': 4913.413, 'stddev': 2838.9122031031397, 'min': 4, '25%': 2445, '50%': 5087, '75%': 7253, 'max': 9987, 'count_non_null': 1000, 'count_null': 0}
id_livro: {'count': 1000, 'mean': 5.009, 'stddev': 2.5759570172179957, 'min': 1, '25%': 3, '50%': 5, '75%': 7, 'max': 9, 'count_non_null': 1000, 'count_null': 0}
qtde_item: {'count': 1000, 'mean': 3.025, 'stddev': 1.440294917525381, 'min': 1, '25%': 2, '50%': 3, '75%': 4, 'max': 5, 'count_non_null': 1000, 'count_null': 0}
nome_livro: {'count': 1000, 'mean': 1984.0, 'stddev': 0.0, 'min': '1984', '25%': '1984.0', '50%': '1984.0', '75%': '1984.0', 'max': 'Sapiens', 'count_non_null': 1000, 'count_null': 0}
categoria: {'count': 1000, 'mean': None, 'stddev': None, 'min': 'Ficção', '25%': None, '50%': None, '75%': None, 'max': 'Nã

In [0]:
logger.info("Gerando as regras de checagem")

checks = generator_dq_check(profiles)

for i in (checks): print(i)

{'check': {'function': 'is_not_null', 'arguments': {'column': 'data_venda'}}, 'name': 'data_venda_is_null', 'criticality': 'error'}
{'check': {'function': 'is_in_range', 'arguments': {'column': 'id_venda', 'min_limit': 2, 'max_limit': 9657}}, 'name': 'id_venda_isnt_in_range', 'criticality': 'error'}
{'check': {'function': 'is_not_null', 'arguments': {'column': 'id_linha_venda'}}, 'name': 'id_linha_venda_is_null', 'criticality': 'error'}
{'check': {'function': 'is_in_range', 'arguments': {'column': 'id_linha_venda', 'min_limit': 4, 'max_limit': 9987}}, 'name': 'id_linha_venda_isnt_in_range', 'criticality': 'error'}
{'check': {'function': 'is_not_null', 'arguments': {'column': 'id_livro'}}, 'name': 'id_livro_is_null', 'criticality': 'error'}
{'check': {'function': 'is_in_list', 'arguments': {'column': 'id_livro', 'allowed': [8, 2, 7, 9, 3, 6, 4, 5, 1]}}, 'name': 'id_livro_other_value', 'criticality': 'error'}
{'check': {'function': 'is_in_range', 'arguments': {'column': 'id_livro', 'min_

In [0]:
logger.info("Aplicando as regras de checagem")

df_validado, df_quarentena = apply_quality_checks(df_inicial, checks)

df_validado.limit(5).show()

+----------+--------+--------------+--------+---------+--------------------+---------+----------------+------+
|data_venda|id_venda|id_linha_venda|id_livro|qtde_item|          nome_livro|categoria|           autor| preco|
+----------+--------+--------------+--------+---------+--------------------+---------+----------------+------+
|2025-02-04|    4073|          4403|       5|        2|        Dom Casmurro|   Ficção|Machado de Assis| 86.81|
|2024-08-25|    5136|          5466|       5|        4|        Dom Casmurro|   Ficção|Machado de Assis|131.48|
|2024-12-18|    NULL|          4485|       2|        5|                1984|   Ficção|   George Orwell| 92.92|
|2025-05-08|    2451|          2781|       9|        2|A Revolução dos B...|   Ficção|   George Orwell|139.69|
|2024-12-29|    4615|          4945|       1|        1|A Menina que Roub...|   Ficção|    Markus Zusak| 92.76|
+----------+--------+--------------+--------+---------+--------------------+---------+----------------+------+



In [0]:
logger.info("Verificação de nullos após a aplicação das regras")
nulls = df_validado.select([sum(F.col(c).isNull().cast('int')).alias(c) for c in df_validado.columns])

nulls.show()

+----------+--------+--------------+--------+---------+----------+---------+-----+-----+
|data_venda|id_venda|id_linha_venda|id_livro|qtde_item|nome_livro|categoria|autor|preco|
+----------+--------+--------------+--------+---------+----------+---------+-----+-----+
|         0|     101|             0|       0|        0|         0|        0|    0|    0|
+----------+--------+--------------+--------+---------+----------+---------+-----+-----+



In [0]:
logger.info("Remoção dos nulos")

try: 
    df_validado = df_validado.dropDuplicates(['id_venda'])\
            .na.drop(subset=['id_venda'])
except Exception as e:
    logger.error(f'Erro ao remover os nulos: {str(e)}')


In [0]:
def gerar_tb_autor_mais_popular(df_validado):
    try:
        valor_autor = df_validado.groupBy('data_venda','autor').agg(F.sum('qtde_item').alias('valor_autor_popular'))

        valor_autor = valor_autor.orderBy('data_venda', F.desc('valor_autor_popular')).dropDuplicates(['data_venda']).drop('autor')

        return valor_autor
    except Exception as e:
        logger.info(f'Erro ao gerar a tabela valor_autor: {str(e)}')

In [0]:
logger.info("Criação da tabela valor_autor com o autor mais popular da livraria")

valor_autor = gerar_tb_autor_mais_popular(df_validado)
valor_autor.orderBy('data_venda').limit(5).show()


+----------+-------------------+
|data_venda|valor_autor_popular|
+----------+-------------------+
|2024-06-16|                 29|
|2024-06-17|                 23|
|2024-06-18|                 19|
|2024-06-19|                 12|
|2024-06-20|                 19|
+----------+-------------------+



In [0]:
df_validado.schema

StructType([StructField('data_venda', DateType(), True), StructField('id_venda', LongType(), True), StructField('id_linha_venda', LongType(), True), StructField('id_livro', LongType(), True), StructField('qtde_item', LongType(), True), StructField('nome_livro', StringType(), True), StructField('categoria', StringType(), True), StructField('autor', StringType(), True), StructField('preco', DoubleType(), True)])

In [0]:
def gerar_tb_resumo_vendas_diarias(df_validado):
    try: 
        resumo_vendas_diarias = df_validado.groupBy('data_venda')\
            .agg(
                F.sum('qtde_item').alias('total_vendas'),
                F.sum(
                    F.when(F.col('categoria').rlike('(?i)^(?!.*Não)Ficção'), F.col('qtde_item')).otherwise(0)
                ).alias('quantidade_ficcao'),
                F.sum(
                    F.when(F.col('categoria').rlike('(?i)Não Ficção'), F.col('qtde_item')).otherwise(0)
                ).alias('quantidade_n_ficcao'),
                F.max(F.round(F.col('qtde_item')*F.col('preco'), 2)).alias('valor_maximo_venda'),
                F.min(F.round(F.col('qtde_item')*F.col('preco'), 2)).alias('valor_minimo_venda'),
                F.round(F.avg((F.col('qtde_item')*F.col('preco'))), 2).alias('valor_medio_venda')
            )
        resumo_vendas_diarias = resumo_vendas_diarias.join(valor_autor, how='inner', on = 'data_venda')
        return resumo_vendas_diarias
    except Exception as e:
        logger.error(f'Erro ao criar a tabela: {str(e)}')

In [0]:
logger.info("Criação da tabela resumo_vendas_diarias com todas as colunas exigidas")

resumo_vendas_diarias = gerar_tb_resumo_vendas_diarias(df_validado)
resumo_vendas_diarias.orderBy('data_venda').limit(5).show()

+----------+------------+-----------------+-------------------+------------------+------------------+-----------------+-------------------+
|data_venda|total_vendas|quantidade_ficcao|quantidade_n_ficcao|valor_maximo_venda|valor_minimo_venda|valor_medio_venda|valor_autor_popular|
+----------+------------+-----------------+-------------------+------------------+------------------+-----------------+-------------------+
|2024-06-16|          89|               83|                  6|            743.35|             42.72|           213.84|                 29|
|2024-06-17|         127|              104|                 23|             678.1|             34.87|           245.68|                 23|
|2024-06-18|          63|               45|                 18|            746.65|             21.85|           267.73|                 19|
|2024-06-19|          58|               52|                  6|             620.0|             21.73|           210.12|                 12|
|2024-06-20|        

In [0]:
# Criacao do schema de tabelas silver
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")

DataFrame[]

In [0]:
logger.info("Salvando a tabela silver resumo_vendas_diarias")

# salvando a tabela resumo_vendas_diarias
resumo_vendas_diarias.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').saveAsTable('silver.resumo_vendas_diarias')

In [0]:
# qual autor mais vende livros na livraria?

autor_mais_vendido = df_validado.groupBy('autor').agg(F.sum('qtde_item').alias('vendas'))

autor_mais_vendido = autor_mais_vendido.orderBy(F.desc('vendas')).collect()[0][0]

print(autor_mais_vendido)
logger.info(f'O autor que mais vende livros: {autor_mais_vendido}')


George Orwell


In [0]:
# qual a data campeã de vendas na livraria?

data_maior_venda = df_validado.groupBy('data_venda').agg(F.sum('qtde_item').alias('vendas_por_data'))

data_maior_venda = data_maior_venda.orderBy(F.desc('vendas_por_data')).collect()[0][0]

print(data_maior_venda)
logger.info(f'A data campeã de vendas: {data_maior_venda}')

2024-06-17
