In [0]:
%sql
USE CATALOG workspace;
USE SCHEMA datamart

In [0]:
# Importando bibliotecas e funções
from pyspark.sql.functions import lower, col, explode, trim, initcap, when
from pyspark.sql.types import FloatType

In [0]:
# Lendo as tabelas
df_customers = spark.read.table("Bronze_customers")
df_orders = spark.read.table("Bronze_orders")
df_payments = spark.read.table("Bronze_payments")
df_products = spark.read.table("Bronze_products")
df_reviews = spark.read.table("Bronze_reviews")

In [0]:
# 1- Tratando tabela Customer
# Removendo duplicatas e padronizando caracteres de email

#df_customers_tratada = df_customers.drop_duplicates()
#df_customers_tratada = df_customers_tratada.withColumn('email', lower(df_customers_tratada['email']))

# ou

df_customers_silver = (
    df_customers
    .dropDuplicates() 
    .withColumn("email", lower(col("email"))) 
)

#display(df_customers_tratada)
#display(df_customers_silver)

In [0]:
# 2- Tratando tabela Products
# Tratando null e aplicando regra de filtro, preço maior que 0 

#df_products_tratada = df_products.fillna({"category": "Outros"})
#df_products_tratada = df_products_tratada.filter(col("price").cast(FloatType()) > 0)

# OU podia fazer assim tambem

df_products_tratada = (
    df_products
    .fillna("Sem Informação", subset=["category"])  # consigo adicionar outras colunas também no Subset
    .filter(col("price") > 0)                       #Filtrando maior que 0
    ) 

#display(df_products_tratada)

In [0]:
# 3- Tratamento tabela Orders
# Explodindo array de items
# Quebrando a estrutura e trazendo para um modelo tabular

#print("Antes")
#display(df_orders)

# df_orders_tratada = df_orders.select(col("customer_id"),explode(col("items")).alias("novo item"))
# OU

# Explodindo array de items
df_orders_tratada = (
    df_orders
    .withColumn("novo_item", explode(col("items")))
    .drop("items")
)

# Quebrando a estrutura e trazendo para um modelo tabular
df_orders_estruturada = (
    df_orders_tratada
    .select(
    col("customer_id"),
    col("order_date"),
    col("order_id"),
    col("novo_item.*") # Pega tudo dentro de 'novo_item' e cria colunas (.*)
    )
)

#print("FINALIZADA")
#display(df_orders_estruturada)

In [0]:
# 4- Tratamento da tabela de Payments
# Tratando status, limpando espaços e padronizando a primeira letra da palavra maiuscula

#print("Antes")
#display(df_payments)

df_payments_tratada = (
    df_payments
    .withColumn("status", initcap(col("status")))
    .withColumn("status", trim(df_payments["status"]))
)

#print("Depois")
#display(df_payments_tratada)

In [0]:
# 5- Tratamento da tabela de reviews
# Padronizando a coluna rating para numerico com condicional

df_reviews_tratada = (
    df_reviews
    .withColumn("rating",
                when(col("rating") == "um", 1)
                .when(col("rating") == "dois", 2)
                .when(col("rating") == "três", 3)
                .when(col("rating") == "quatro", 4)
                .otherwise(5)
                )
    .withColumn("rating", col("rating").cast(FloatType()))
    .withColumn("comment", trim(col("comment")))
    .withColumn("comment", initcap(col("comment")))
)

#display(df_reviews_tratada)

In [0]:
# Salvando as tabelas Silvers 

df_customers_silver.write.format("delta").mode("overwrite").saveAsTable("workspace.datamart.silver_customers")
df_products_tratada.write.format("delta").mode("overwrite").saveAsTable("workspace.datamart.silver_products")
df_orders_estruturada.write.format("delta").mode("overwrite").saveAsTable("workspace.datamart.silver_orders")
df_payments_tratada.write.format("delta").mode("overwrite").saveAsTable("workspace.datamart.silver_payments")
df_reviews_tratada.write.format("delta").mode("overwrite").saveAsTable("workspace.datamart.silver_reviews")

print("✅ Sucesso! Dados ingeridos e tabelas Silver criadas.")