In [0]:
# Criar um dataset com informações de Vendas de Produtos eletrônicos;
# Manipular o dataset de Vendas criado;
# Criar um dataset de dimensão com o NOME dos Clientes (Primary Key = cliente_id);
# Responder as principais perguntas de negócio com operações de manipulação, agregação, filtragem e mescla de dados

In [0]:
# Criando um dataset de VENDAS e renomeando as colunas

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, current_timestamp, date_format, from_utc_timestamp, col, rand, sum, desc, count, max, avg
from pyspark.sql.types import StringType
import random

# Iniciar uma SparkSession
spark = SparkSession.builder.appName("Exemplo de Dataset").getOrCreate()

# Dados de exemplo
Vendas_Loja1 = [
    (1, '2024-07-01', 'Celular', 2, 1500.00, 101),
    (2, '2024-07-02', 'Notebook', 1, 2500.00, 102),
    (3, '2024-07-03', 'Tablet', 3, 800.00, 103),
    (4, '2024-07-04', 'Smartwatch', 1, 600.00, 101),
    (5, '2024-07-05', 'Fone de Ouvido', 2, 200.00, 104),
    (6, '2024-07-06', 'Monitor', 1, 1200.00, 102),
    (7, '2024-07-07', 'Smartwatch', 1, 600.00, 101),
    (8, '2024-07-07', 'Fone de Ouvido', 2, 200.00, 104),
    (9, '2024-07-07', 'Monitor', 1, 1200.00, 102),
    (10, '2024-07-08', 'Monitor', 1, 1200.00, 102),
    (11, '2024-07-07', 'Fone de Ouvido', 2, 200.00, 104),
    (12, '2024-07-01', 'Notebook', 1, 2500.00, 101)
]

# Colunas do DataFrame
columns = ["id", "data", "produto", "quantidade", "preco_unitario", "cliente_id"]

# Criar o DataFrame
df_Vendas_Loja1 = spark.createDataFrame(Vendas_Loja1, columns)

display(df_Vendas_Loja1)

id,data,produto,quantidade,preco_unitario,cliente_id
1,2024-07-01,Celular,2,1500.0,101
2,2024-07-02,Notebook,1,2500.0,102
3,2024-07-03,Tablet,3,800.0,103
4,2024-07-04,Smartwatch,1,600.0,101
5,2024-07-05,Fone de Ouvido,2,200.0,104
6,2024-07-06,Monitor,1,1200.0,102
7,2024-07-07,Smartwatch,1,600.0,101
8,2024-07-07,Fone de Ouvido,2,200.0,104
9,2024-07-07,Monitor,1,1200.0,102
10,2024-07-08,Monitor,1,1200.0,102


In [0]:
# Adicionar coluna de preço total da transação
df_Vendas_Loja1 = df_Vendas_Loja1.withColumn("preco_total", col("quantidade") * col("preco_unitario"))

df_Vendas_Loja1.display()

id,data,produto,quantidade,preco_unitario,cliente_id,preco_total
1,2024-07-01,Celular,2,1500.0,101,3000.0
2,2024-07-02,Notebook,1,2500.0,102,2500.0
3,2024-07-03,Tablet,3,800.0,103,2400.0
4,2024-07-04,Smartwatch,1,600.0,101,600.0
5,2024-07-05,Fone de Ouvido,2,200.0,104,400.0
6,2024-07-06,Monitor,1,1200.0,102,1200.0
7,2024-07-07,Smartwatch,1,600.0,101,600.0
8,2024-07-07,Fone de Ouvido,2,200.0,104,400.0
9,2024-07-07,Monitor,1,1200.0,102,1200.0
10,2024-07-08,Monitor,1,1200.0,102,1200.0


In [0]:
# Lista de países
countries = ["Brasil", "Estados Unidos", "Alemanha", "China", "Japão", "França", "Reino Unido"]

# Função UDF para selecionar aleatoriamente um país da lista
def random_country():
    return random.choice(countries)

# Registrar a função UDF
udf_random_country = udf(random_country, StringType())

# Adicionar a coluna com nomes de países aleatórios
df_Vendas_Loja1 = df_Vendas_Loja1.withColumn("País_cliente", udf_random_country())

display(df_Vendas_Loja1)

id,data,produto,quantidade,preco_unitario,cliente_id,preco_total,País_cliente
1,2024-07-01,Celular,2,1500.0,101,3000.0,Reino Unido
2,2024-07-02,Notebook,1,2500.0,102,2500.0,Brasil
3,2024-07-03,Tablet,3,800.0,103,2400.0,Alemanha
4,2024-07-04,Smartwatch,1,600.0,101,600.0,China
5,2024-07-05,Fone de Ouvido,2,200.0,104,400.0,Estados Unidos
6,2024-07-06,Monitor,1,1200.0,102,1200.0,Reino Unido
7,2024-07-07,Smartwatch,1,600.0,101,600.0,Estados Unidos
8,2024-07-07,Fone de Ouvido,2,200.0,104,400.0,Alemanha
9,2024-07-07,Monitor,1,1200.0,102,1200.0,Japão
10,2024-07-08,Monitor,1,1200.0,102,1200.0,Alemanha


In [0]:
# 5 inserir coluna com nome de atualização para usuario ver quando os dados foram atualizados

# from pyspark.sql. functions import current_timestamp, date_format, from_utc_timestamp
df_Vendas_Loja1 = df_Vendas_Loja1.withColumn ("Atualizacao",    
                  date_format(from_utc_timestamp(current_timestamp(), "America/Sao_Paulo"), "yyyy-MM-dd HH: mm: ss"))

display(df_Vendas_Loja1)

id,data,produto,quantidade,preco_unitario,cliente_id,preco_total,País_cliente,Atualizacao
1,2024-07-01,Celular,2,1500.0,101,3000.0,Brasil,2024-07-13 12: 49: 16
2,2024-07-02,Notebook,1,2500.0,102,2500.0,Alemanha,2024-07-13 12: 49: 16
3,2024-07-03,Tablet,3,800.0,103,2400.0,Reino Unido,2024-07-13 12: 49: 16
4,2024-07-04,Smartwatch,1,600.0,101,600.0,Brasil,2024-07-13 12: 49: 16
5,2024-07-05,Fone de Ouvido,2,200.0,104,400.0,Estados Unidos,2024-07-13 12: 49: 16
6,2024-07-06,Monitor,1,1200.0,102,1200.0,Estados Unidos,2024-07-13 12: 49: 16
7,2024-07-07,Smartwatch,1,600.0,101,600.0,Estados Unidos,2024-07-13 12: 49: 16
8,2024-07-07,Fone de Ouvido,2,200.0,104,400.0,Reino Unido,2024-07-13 12: 49: 16
9,2024-07-07,Monitor,1,1200.0,102,1200.0,Reino Unido,2024-07-13 12: 49: 16
10,2024-07-08,Monitor,1,1200.0,102,1200.0,França,2024-07-13 12: 49: 16


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS BD_ELETRONICOS_VENDAS;

In [0]:
# Salvando a tabela para poder manipular no SQL
df_Vendas_Loja1.write.mode("overwrite").saveAsTable("BD_ELETRONICOS_VENDAS.Vendas_Eletronicos_Loja1")

In [0]:
%sql
DESCRIBE TABLE BD_ELETRONICOS_VENDAS.Vendas_Eletronicos_Loja1;


col_name,data_type,comment
id,bigint,
data,string,
produto,string,
quantidade,bigint,
preco_unitario,double,
cliente_id,bigint,
preco_total,double,
País_cliente,string,
Atualizacao,string,


In [0]:
%sql

SELECT * FROM BD_ELETRONICOS_VENDAS.Vendas_Eletronicos_Loja1;

id,data,produto,quantidade,preco_unitario,cliente_id,preco_total,País_cliente,Atualizacao
8,2024-07-07,Fone de Ouvido,2,200.0,104,400.0,Reino Unido,2024-07-13 12: 49: 18
9,2024-07-07,Monitor,1,1200.0,102,1200.0,Brasil,2024-07-13 12: 49: 18
5,2024-07-05,Fone de Ouvido,2,200.0,104,400.0,Brasil,2024-07-13 12: 49: 18
6,2024-07-06,Monitor,1,1200.0,102,1200.0,Reino Unido,2024-07-13 12: 49: 18
2,2024-07-02,Notebook,1,2500.0,102,2500.0,Estados Unidos,2024-07-13 12: 49: 18
3,2024-07-03,Tablet,3,800.0,103,2400.0,França,2024-07-13 12: 49: 18
11,2024-07-07,Fone de Ouvido,2,200.0,104,400.0,França,2024-07-13 12: 49: 18
12,2024-07-01,Notebook,1,2500.0,101,2500.0,China,2024-07-13 12: 49: 18
10,2024-07-08,Monitor,1,1200.0,102,1200.0,Estados Unidos,2024-07-13 12: 49: 18
4,2024-07-04,Smartwatch,1,600.0,101,600.0,Brasil,2024-07-13 12: 49: 18


In [0]:
# Criando tabela de 'dimensão':CLIENTES para obter o nome dos Clientes

# Lista de nomes para os clientes
nomes = ["Alice", "Bob", "Carol", "David"]

# Lista de IDs de clientes
ids = list(range(101, 105))

# Garantindo que cada ID receba um nome único
clientes = list(zip(ids, nomes))

# Criando um DataFrame com os dados dos clientes
df_clientes = spark.createDataFrame(clientes, ["cliente_id", "nome_cliente"])

# Selecionando apenas as colunas desejadas
df_clientes = df_clientes.select("cliente_id", "nome_cliente")

# Exibindo o DataFrame para verificação
df_clientes.show()

# Remover a tabela existente para evitar conflitos de esquema
spark.sql("DROP TABLE IF EXISTS BD_ELETRONICOS_VENDAS.CLIENTES")

# Salvando a tabela para poder manipular no SQL, sobrescrevendo completamente a tabela original
df_clientes.write.option("mergeSchema", "true").mode("overwrite").saveAsTable("BD_ELETRONICOS_VENDAS.CLIENTES")

+----------+------------+
|cliente_id|nome_cliente|
+----------+------------+
|       101|       Alice|
|       102|         Bob|
|       103|       Carol|
|       104|       David|
+----------+------------+



In [0]:
%sql

SELECT * FROM BD_ELETRONICOS_VENDAS.CLIENTES;

cliente_id,nome_cliente
101,Alice
103,Carol
104,David
102,Bob


In [0]:
# Descobrindo o produto mais vendido
df_Top_Produto_Qtd = df_Vendas_Loja1\
                .groupBy("produto")\
                .agg(sum("quantidade").alias("total_vendido"))\
                .orderBy(desc("total_vendido"))\
                .limit(1)

df_Top_Produto_Qtd.display()

# Extrair o valor do produto mais vendido
produto_mais_vendido = df_Top_Produto_Qtd.collect()[0]["produto"]

# Mostrar a frase
print(f"O produto mais vendido foi '{produto_mais_vendido}'")

produto,total_vendido
Fone de Ouvido,6
Tablet,3
Monitor,3


Databricks visualization. Run in Databricks to view.

O produto mais vendido foi 'Fone de Ouvido'


In [0]:
# Descobrindo o produto que mais trouxe faturamento
df_Top_Produto_Fat = df_Vendas_Loja1\
                    .groupBy("produto")\
                    .agg((sum(col("quantidade") * col("preco_unitario"))).alias("Valor_Total"))\
                    .orderBy(desc("Valor_Total"))\
                    .limit(1)

df_Top_Produto_Fat.display()

produto,Valor_Total
Notebook,5000.0


In [0]:

# Descobrindo o dia de maior faturamento
df_Top_Dia_Fat = df_Vendas_Loja1\
                    .groupBy("data")\
                    .agg((sum(col("quantidade") * col("preco_unitario"))).alias("Valor_Total"))\
                    .orderBy(desc("Valor_Total"))\
                    .limit(1)

df_Top_Dia_Fat.display()

data,Valor_Total
2024-07-01,5500.0


In [0]:
# Mostrando o dia de maior faturamento, os itens que ajudaram a alcançar essa marca, suas quantidades e seus preços médios

# Coletando o resultado do dia de maior faturamento
top_dia_fat = df_Top_Dia_Fat.collect()[0]

# Obtendo o dia de maior faturamento e o valor total faturado
dia_maior_faturamento = top_dia_fat["data"]
total_faturado = top_dia_fat["Valor_Total"]

# Filtrando os produtos vendidos apenas no dia de maior faturamento
produtos_maior_faturamento = df_Vendas_Loja1\
    .filter(col("data") == dia_maior_faturamento)\
    .groupBy("data", "produto")\
    .agg(sum("quantidade").alias("Total_Quantidade"), 
         sum(col("quantidade") * col("preco_unitario")).alias("Total_Valor"),
         avg("preco_unitario").alias("Preco_Medio"))\
    .orderBy(desc("Total_Quantidade"))

# Mostrar os detalhes dos produtos vendidos no dia de maior faturamento
print(f"Dia de maior faturamento: {dia_maior_faturamento} e o faturamento foi de R$ {total_faturado}")
produtos_maior_faturamento.show()

Dia de maior faturamento: 2024-07-01 e o faturamento foi de R$ 5500.0
+----------+--------+----------------+-----------+-----------+
|      data| produto|Total_Quantidade|Total_Valor|Preco_Medio|
+----------+--------+----------------+-----------+-----------+
|2024-07-01| Celular|               2|     3000.0|     1500.0|
|2024-07-01|Notebook|               1|     2500.0|     2500.0|
+----------+--------+----------------+-----------+-----------+



In [0]:
# MESCLA DE TABELAS

#Tabela "Clientes" (Dimensão)
df_clientes = spark.read.table("BD_ELETRONICOS_VENDAS.CLIENTES")

# tabela "Vendas" (Fato)
df_Vendas_Loja1 = spark.read.table("BD_ELETRONICOS_VENDAS.Vendas_Eletronicos_Loja1")

# Fazendo a mescla (left join) para a nova tabela ter todas as informações de vendas + nome dos clientes
df_tabela_consolidada = df_Vendas_Loja1.join(df_clientes, on='cliente_id', how='left')

# Mostrando o resultado
display(df_tabela_consolidada)

cliente_id,id,data,produto,quantidade,preco_unitario,preco_total,País_cliente,Atualizacao,nome_cliente
104,8,2024-07-07,Fone de Ouvido,2,200.0,400.0,Reino Unido,2024-07-13 12: 49: 18,David
102,9,2024-07-07,Monitor,1,1200.0,1200.0,Brasil,2024-07-13 12: 49: 18,Bob
104,5,2024-07-05,Fone de Ouvido,2,200.0,400.0,Brasil,2024-07-13 12: 49: 18,David
102,6,2024-07-06,Monitor,1,1200.0,1200.0,Reino Unido,2024-07-13 12: 49: 18,Bob
102,2,2024-07-02,Notebook,1,2500.0,2500.0,Estados Unidos,2024-07-13 12: 49: 18,Bob
103,3,2024-07-03,Tablet,3,800.0,2400.0,França,2024-07-13 12: 49: 18,Carol
104,11,2024-07-07,Fone de Ouvido,2,200.0,400.0,França,2024-07-13 12: 49: 18,David
101,12,2024-07-01,Notebook,1,2500.0,2500.0,China,2024-07-13 12: 49: 18,Alice
102,10,2024-07-08,Monitor,1,1200.0,1200.0,Estados Unidos,2024-07-13 12: 49: 18,Bob
101,4,2024-07-04,Smartwatch,1,600.0,600.0,Brasil,2024-07-13 12: 49: 18,Alice


In [0]:
# Descobrindo o nome do cliente que mais gastou $$$

df_cliente_mais_rentavel = df_tabela_consolidada\
    .groupBy(df_tabela_consolidada.nome_cliente)\
    .agg(sum(col("quantidade") * col("preco_unitario")).alias("Total_Valor"))\
    .orderBy(desc("Total_Valor"))#\
    #.limit(1)

df_cliente_mais_rentavel.show(5)

+------------+-----------+
|nome_cliente|Total_Valor|
+------------+-----------+
|       Alice|     6700.0|
|         Bob|     6100.0|
|       Carol|     2400.0|
|       David|     1200.0|
+------------+-----------+

