In [0]:
# Imports necessários
import requests
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, DateType, BinaryType
from pyspark.sql.functions import col, to_date
import urllib.request

***1a camada | Criação do dataset a partir do arquivo de origem***

In [0]:
#ANTES DE COMEÇAR A RODAR OS COMANDOS!

# Lista se há alguma tabela criada no diretório
dbutils.fs.ls("dbfs:/user/hive/warehouse/")

Out[24]: [FileInfo(path='dbfs:/user/hive/warehouse/base_historica_bronze/', name='base_historica_bronze/', size=0, modificationTime=0)]

In [0]:
# Remover o diretório e seu conteúdo
dbutils.fs.rm("dbfs:/user/hive/warehouse/base_historica_bronze", recurse=True),
dbutils.fs.rm("dbfs:/user/hive/warehouse/base_historica_silver", recurse=True),
dbutils.fs.rm("dbfs:/user/hive/warehouse/base_historica_gold", recurse=True),

Out[25]: (False,)

In [0]:
# Definir a URL do arquivo CSV no GitHub
csv_url = "https://raw.githubusercontent.com/joel-c-dutra/mvp-ed/main/base_historica.csv"

# Caminho onde o arquivo será salvo no dbfs (Databricks File System)
dbfs_path = "/tmp/base_historica.csv"

# Baixar o arquivo CSV diretamente para o dbfs
dbutils.fs.mkdirs("/tmp")
dbutils.fs.put(dbfs_path, requests.get(csv_url).text, True)


Wrote 1335499 bytes.
Out[26]: True

In [0]:
# Definir o schema com todos os campos como string
schema_bronze = "dt_venda STRING, loja STRING, uf STRING, produto STRING, canal_venda STRING, tipo_venda STRING, "\
                "vlr_venda STRING, qt_venda STRING, qt_dias_com_estoque STRING, qt_dias_sem_estoque STRING, "\
                "qt_dias_com_estoque_aberta STRING, qt_dias_sem_estoque_aberta STRING, qt_dias_loja_fechada STRING, "\
                "estoque_loja STRING, habilitador STRING"

# Ler o arquivo CSV como DataFrame, tratando todos os campos como string
df_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .option("delimiter", ",") \
    .schema(schema_bronze) \
    .load(dbfs_path)

# Salvar como tabela no Catálogo do Databricks
df_bronze.write.mode("overwrite").saveAsTable("base_historica_bronze")

print("Tabela 'base_historica_bronze' criada com sucesso no Catálogo do Databricks!")

Tabela 'base_historica_bronze' criada com sucesso no Catálogo do Databricks!


In [0]:
%sql

-- Adicionar comentário à coluna 'dt_venda'
ALTER TABLE base_historica_bronze ALTER COLUMN dt_venda COMMENT 'Data da venda';

-- Adicionar comentário à coluna 'loja'
ALTER TABLE base_historica_bronze ALTER COLUMN loja COMMENT 'Código da loja';

-- Adicionar comentário à coluna 'uf'
ALTER TABLE base_historica_bronze ALTER COLUMN uf COMMENT 'Unidade Federativa';

-- Adicionar comentário à coluna 'produto'
ALTER TABLE base_historica_bronze ALTER COLUMN produto COMMENT 'Código do produto';

-- Adicionar comentário à coluna 'canal_venda'
ALTER TABLE base_historica_bronze ALTER COLUMN canal_venda COMMENT 'Canal de venda';

-- Adicionar comentário à coluna 'tipo_venda'
ALTER TABLE base_historica_bronze ALTER COLUMN tipo_venda COMMENT 'Tipo de venda';

-- Adicionar comentário à coluna 'vlr_venda'
ALTER TABLE base_historica_bronze ALTER COLUMN vlr_venda COMMENT 'Valor da venda';

-- Adicionar comentário à coluna 'qt_venda'
ALTER TABLE base_historica_bronze ALTER COLUMN qt_venda COMMENT 'Quantidade vendida';

-- Adicionar comentário à coluna 'qt_dias_com_estoque'
ALTER TABLE base_historica_bronze ALTER COLUMN qt_dias_com_estoque COMMENT 'Quantidade de dias com estoque';

-- Adicionar comentário à coluna 'qt_dias_sem_estoque'
ALTER TABLE base_historica_bronze ALTER COLUMN qt_dias_sem_estoque COMMENT 'Quantidade de dias sem estoque';

-- Adicionar comentário à coluna 'qt_dias_com_estoque_aberta'
ALTER TABLE base_historica_bronze ALTER COLUMN qt_dias_com_estoque_aberta COMMENT 'Quantidade de dias em que a loja tinha estoque e estava aberta para venda';

-- Adicionar comentário à coluna 'qt_dias_sem_estoque_aberta'
ALTER TABLE base_historica_bronze ALTER COLUMN qt_dias_sem_estoque_aberta COMMENT 'Quantidade de dias em que a loja não tinha estoque e estava aberta para venda';

-- Adicionar comentário à coluna 'qt_dias_loja_fechada'
ALTER TABLE base_historica_bronze ALTER COLUMN qt_dias_loja_fechada COMMENT 'Quantidade de dias em que a loja esteve fechada';

-- Adicionar comentário à coluna 'estoque_loja'
ALTER TABLE base_historica_bronze ALTER COLUMN estoque_loja COMMENT 'Quantidade de estoque na loja';

-- Adicionar comentário à coluna 'habilitador'
ALTER TABLE base_historica_bronze ALTER COLUMN habilitador COMMENT 'Determina se a loja deverá ser abastecida caso tenha menos que 5 itens em seu estoque (variável 1 ou 0)';



**Dataquality | base_historica_bronze**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("Dataquality Tabela Bronze - Campos nulos") \
    .getOrCreate()

# Caminho para a tabela Delta Lake
path = "dbfs:/user/hive/warehouse/base_historica_bronze"

# Leia os dados a partir do caminho de arquivo Delta Lake
df = spark.read.format("delta").load(path)

# Verifique se há campos em branco (valores nulos)
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Mostre as contagens de valores nulos
null_counts.show()


+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+
|dt_venda|loja| uf|produto|canal_venda|tipo_venda|vlr_venda|qt_venda|qt_dias_com_estoque|qt_dias_sem_estoque|qt_dias_com_estoque_aberta|qt_dias_sem_estoque_aberta|qt_dias_loja_fechada|estoque_loja|habilitador|
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+
|       0|   0|  0|      0|          0|         0|        0|       0|                  0|                  0|                         0|                         0|                   0|           0|          0|
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+------------------------

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("Dataquality Tabela Bronze - Campos duplicados") \
    .getOrCreate()

# Caminho para a tabela Delta Lake
path = "dbfs:/user/hive/warehouse/base_historica_bronze"

# Leia os dados a partir do caminho de arquivo Delta Lake
df = spark.read.format("delta").load(path)

# Verifique duplicatas com base em todas as colunas
df_duplicates = df.groupBy(df.columns).count().filter("count > 1")

# Mostre as duplicatas (se houver)
df_duplicates.show()


+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+-----+
|dt_venda|loja| uf|produto|canal_venda|tipo_venda|vlr_venda|qt_venda|qt_dias_com_estoque|qt_dias_sem_estoque|qt_dias_com_estoque_aberta|qt_dias_sem_estoque_aberta|qt_dias_loja_fechada|estoque_loja|habilitador|count|
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+-----+
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+-----+



**2a camada | Criação do dataset com o schema definido a partir do dataset inicial**

In [0]:
%sql

-- Remover a tabela se já existir
DROP TABLE IF EXISTS base_historica_silver;

-- Definir o novo schema
CREATE TABLE base_historica_silver (
    dt_venda DATE,
    loja STRING,
    uf STRING,
    produto STRING,
    canal_venda STRING,
    tipo_venda STRING,
    vlr_venda FLOAT,
    qt_venda INT,
    qt_dias_com_estoque INT,
    qt_dias_sem_estoque INT,
    qt_dias_com_estoque_aberta INT,
    qt_dias_sem_estoque_aberta INT,
    qt_dias_loja_fechada INT,
    estoque_loja INT,
    habilitador BOOLEAN
)
USING DELTA;


In [0]:
%sql
-- Inserir dados na nova tabela com o novo schema
INSERT INTO base_historica_silver
SELECT
    TO_DATE(dt_venda, 'yyyy-MM-dd') AS dt_venda,
    loja,
    uf,
    produto,
    canal_venda,
    tipo_venda,
    ROUND(CAST(vlr_venda AS FLOAT), 3) AS vlr_venda,
    CAST(qt_venda AS INT) AS qt_venda,
    CAST(qt_dias_com_estoque AS INT) AS qt_dias_com_estoque,
    CAST(qt_dias_sem_estoque AS INT) AS qt_dias_sem_estoque,
    CAST(qt_dias_com_estoque_aberta AS INT) AS qt_dias_com_estoque_aberta,
    CAST(qt_dias_sem_estoque_aberta AS INT) AS qt_dias_sem_estoque_aberta,
    CAST(qt_dias_loja_fechada AS INT) AS qt_dias_loja_fechada,
    CAST(estoque_loja AS INT) AS estoque_loja,
    habilitador
FROM base_historica_bronze;


num_affected_rows,num_inserted_rows
18599,18599


In [0]:
%sql

-- Adicionar comentário à coluna 'dt_venda'
ALTER TABLE base_historica_silver ALTER COLUMN dt_venda COMMENT 'Data da venda';

-- Adicionar comentário à coluna 'loja'
ALTER TABLE base_historica_silver ALTER COLUMN loja COMMENT 'Código da loja';

-- Adicionar comentário à coluna 'uf'
ALTER TABLE base_historica_silver ALTER COLUMN uf COMMENT 'Unidade Federativa';

-- Adicionar comentário à coluna 'produto'
ALTER TABLE base_historica_silver ALTER COLUMN produto COMMENT 'Código do produto';

-- Adicionar comentário à coluna 'canal_venda'
ALTER TABLE base_historica_silver ALTER COLUMN canal_venda COMMENT 'Canal de venda';

-- Adicionar comentário à coluna 'tipo_venda'
ALTER TABLE base_historica_silver ALTER COLUMN tipo_venda COMMENT 'Tipo de venda';

-- Adicionar comentário à coluna 'vlr_venda'
ALTER TABLE base_historica_silver ALTER COLUMN vlr_venda COMMENT 'Valor da venda';

-- Adicionar comentário à coluna 'qt_venda'
ALTER TABLE base_historica_silver ALTER COLUMN qt_venda COMMENT 'Quantidade vendida';

-- Adicionar comentário à coluna 'qt_dias_com_estoque'
ALTER TABLE base_historica_silver ALTER COLUMN qt_dias_com_estoque COMMENT 'Quantidade de dias com estoque';

-- Adicionar comentário à coluna 'qt_dias_sem_estoque'
ALTER TABLE base_historica_silver ALTER COLUMN qt_dias_sem_estoque COMMENT 'Quantidade de dias sem estoque';

-- Adicionar comentário à coluna 'qt_dias_com_estoque_aberta'
ALTER TABLE base_historica_silver ALTER COLUMN qt_dias_com_estoque_aberta COMMENT 'Quantidade de dias em que a loja tinha estoque e estava aberta para venda';

-- Adicionar comentário à coluna 'qt_dias_sem_estoque_aberta'
ALTER TABLE base_historica_silver ALTER COLUMN qt_dias_sem_estoque_aberta COMMENT 'Quantidade de dias em que a loja não tinha estoque e estava aberta para venda';

-- Adicionar comentário à coluna 'qt_dias_loja_fechada'
ALTER TABLE base_historica_silver ALTER COLUMN qt_dias_loja_fechada COMMENT 'Quantidade de dias em que a loja esteve fechada';

-- Adicionar comentário à coluna 'estoque_loja'
ALTER TABLE base_historica_silver ALTER COLUMN estoque_loja COMMENT 'Quantidade de estoque na loja';

-- Adicionar comentário à coluna 'habilitador'
ALTER TABLE base_historica_silver ALTER COLUMN habilitador COMMENT 'Determina se a loja deverá ser abastecida caso tenha menos que 5 itens em seu estoque (variável 1 ou 0)';

**Dataquality | base_historica_silver**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("Dataquality Tabela Silver - Campos nulos") \
    .getOrCreate()

# Caminho para a tabela Delta Lake
path = "dbfs:/user/hive/warehouse/base_historica_silver"

# Leia os dados a partir do caminho de arquivo Delta Lake
df = spark.read.format("delta").load(path)

# Verifique se há campos em branco (valores nulos)
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Mostre as contagens de valores nulos
null_counts.show()

+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+
|dt_venda|loja| uf|produto|canal_venda|tipo_venda|vlr_venda|qt_venda|qt_dias_com_estoque|qt_dias_sem_estoque|qt_dias_com_estoque_aberta|qt_dias_sem_estoque_aberta|qt_dias_loja_fechada|estoque_loja|habilitador|
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+
|       0|   0|  0|      0|          0|         0|       70|       0|                  0|                  0|                         0|                         0|                   0|           0|          0|
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+------------------------

In [0]:
from pyspark.sql import SparkSession

# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("Remove Null Records") \
    .getOrCreate()

# Caminho para a tabela Delta Lake
path = "dbfs:/user/hive/warehouse/base_historica_silver"

# Leia os dados a partir do caminho de arquivo Delta Lake
df = spark.read.format("delta").load(path)

# Filtre os registros com valores nulos no campo vlr_venda
df_cleaned = df.filter(df.vlr_venda.isNotNull())

# Salve o DataFrame resultante de volta no Delta Lake, sobrescrevendo a tabela original
df_cleaned.write.format("delta").mode("overwrite").save(path)


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("Dataquality Tabela Silver - Campos nulos") \
    .getOrCreate()

# Caminho para a tabela Delta Lake
path = "dbfs:/user/hive/warehouse/base_historica_silver"

# Leia os dados a partir do caminho de arquivo Delta Lake
df = spark.read.format("delta").load(path)

# Verifique se há campos em branco (valores nulos)
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Mostre as contagens de valores nulos
null_counts.show()

+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+
|dt_venda|loja| uf|produto|canal_venda|tipo_venda|vlr_venda|qt_venda|qt_dias_com_estoque|qt_dias_sem_estoque|qt_dias_com_estoque_aberta|qt_dias_sem_estoque_aberta|qt_dias_loja_fechada|estoque_loja|habilitador|
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+
|       0|   0|  0|      0|          0|         0|        0|       0|                  0|                  0|                         0|                         0|                   0|           0|          0|
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+------------------------

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("Dataquality Tabela Silver - Campos duplicados") \
    .getOrCreate()

# Caminho para a tabela Delta Lake
path = "dbfs:/user/hive/warehouse/base_historica_silver"

# Leia os dados a partir do caminho de arquivo Delta Lake
df = spark.read.format("delta").load(path)

# Verifique duplicatas com base em todas as colunas
df_duplicates = df.groupBy(df.columns).count().filter("count > 1")

# Mostre as duplicatas (se houver)
df_duplicates.show()

+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+-----+
|dt_venda|loja| uf|produto|canal_venda|tipo_venda|vlr_venda|qt_venda|qt_dias_com_estoque|qt_dias_sem_estoque|qt_dias_com_estoque_aberta|qt_dias_sem_estoque_aberta|qt_dias_loja_fechada|estoque_loja|habilitador|count|
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+-----+
+--------+----+---+-------+-----------+----------+---------+--------+-------------------+-------------------+--------------------------+--------------------------+--------------------+------------+-----------+-----+



**3a camada | Criação do dataset com os dados agregados p/ refinamento e análise dos dados**

In [0]:
%sql
-- Criar a tabela base_historica_gold com dados agregados por mês
CREATE OR REPLACE TABLE base_historica_gold
USING DELTA
AS
SELECT
    CAST(DATE_TRUNC('month', dt_venda) AS DATE) AS dt_anomes,
    uf,
    loja,
    produto,
    canal_venda,
    tipo_venda,
    ROUND(SUM(vlr_venda), 2) AS total_vendas,
    SUM(qt_venda) AS total_itens_vendidos
FROM base_historica_silver
GROUP BY CAST(DATE_TRUNC('month', dt_venda) AS DATE), uf, loja, produto, canal_venda, tipo_venda
ORDER BY CAST(DATE_TRUNC('month', dt_venda) AS DATE), uf, loja, produto, canal_venda, tipo_venda;

num_affected_rows,num_inserted_rows


In [0]:
%sql

-- Adicionar comentário à coluna 'dt_venda'
ALTER TABLE base_historica_gold ALTER COLUMN dt_anomes COMMENT 'Data da venda agregada por mês';

-- Adicionar comentário à coluna 'loja'
ALTER TABLE base_historica_gold ALTER COLUMN uf COMMENT 'Unidade Federativa';

-- Adicionar comentário à coluna 'uf'
ALTER TABLE base_historica_gold ALTER COLUMN loja COMMENT 'Código da loja';

-- Adicionar comentário à coluna 'produto'
ALTER TABLE base_historica_gold ALTER COLUMN produto COMMENT 'Código do produto';

-- Adicionar comentário à coluna 'canal_venda'
ALTER TABLE base_historica_gold ALTER COLUMN canal_venda COMMENT 'Canal de venda';

-- Adicionar comentário à coluna 'tipo_venda'
ALTER TABLE base_historica_gold ALTER COLUMN tipo_venda COMMENT 'Tipo de venda';

-- Adicionar comentário à coluna 'vlr_venda'
ALTER TABLE base_historica_gold ALTER COLUMN total_vendas COMMENT 'Total de vendas';

-- Adicionar comentário à coluna 'qt_venda'
ALTER TABLE base_historica_gold ALTER COLUMN total_itens_vendidos COMMENT 'Quantidade de unidades vendidas';

**Dataquality | base_historica_gold**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("Dataquality Tabela Gold - Campos nulos") \
    .getOrCreate()

# Caminho para a tabela Delta Lake
path = "dbfs:/user/hive/warehouse/base_historica_gold"

# Leia os dados a partir do caminho de arquivo Delta Lake
df = spark.read.format("delta").load(path)

# Verifique se há campos em branco (valores nulos)
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Mostre as contagens de valores nulos
null_counts.show()

+---------+---+----+-------+-----------+----------+------------+--------------------+
|dt_anomes| uf|loja|produto|canal_venda|tipo_venda|total_vendas|total_itens_vendidos|
+---------+---+----+-------+-----------+----------+------------+--------------------+
|        0|  0|   0|      0|          0|         0|           0|                   0|
+---------+---+----+-------+-----------+----------+------------+--------------------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

# Inicialize a sessão Spark
spark = SparkSession.builder \
    .appName("Dataquality Tabela Gold - Campos duplicados") \
    .getOrCreate()

# Caminho para a tabela Delta Lake
path = "dbfs:/user/hive/warehouse/base_historica_gold"

# Leia os dados a partir do caminho de arquivo Delta Lake
df = spark.read.format("delta").load(path)

# Verifique duplicatas com base em todas as colunas
df_duplicates = df.groupBy(df.columns).count().filter("count > 1")

# Mostre as duplicatas (se houver)
df_duplicates.show()

+---------+---+----+-------+-----------+----------+------------+--------------------+-----+
|dt_anomes| uf|loja|produto|canal_venda|tipo_venda|total_vendas|total_itens_vendidos|count|
+---------+---+----+-------+-----------+----------+------------+--------------------+-----+
+---------+---+----+-------+-----------+----------+------------+--------------------+-----+



In [0]:
%sql
SELECT * FROM base_historica_gold

dt_anomes,uf,loja,produto,canal_venda,tipo_venda,total_vendas,total_itens_vendidos
2023-01-01,BA,ABC20007,Produto X,LOJA,PROMOCAO,3715.79,2
2023-01-01,BA,ABC20007,Produto Z,LOJA,PROMOCAO,6563.63,4
2023-01-01,BA,ABC21382,Produto X,LOJA,PROMOCAO,3702.02,2
2023-01-01,BA,ABC21382,Produto Z,LOJA,PROMOCAO,6340.96,4
2023-01-01,BA,ABC5228,Produto X,SITE,PROMOCAO,904.0,0
2023-01-01,BA,ABC5228,Produto X,SITE,REGULAR,2833.05,4
2023-01-01,BA,ABC5228,Produto Z,SITE,REGULAR,6355.0,0
2023-01-01,BA,ABC9048,Produto X,SITE,REGULAR,2709.8,3
2023-01-01,BA,ABC9048,Produto Z,SITE,REGULAR,4325.82,3
2023-01-01,CE,ABC12953,Produto X,SITE,PROMOCAO,1365.0,37


Ao final do pipeline, a tabela **base_historica_gold** é disponibilizada para o processo da dataviz na etapa de análise e acompanhamento dos dados.