In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, sum
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import concat, lit, to_timestamp
from pyspark.sql.functions import desc

In [52]:
spark = SparkSession.builder.master("local[*]") \
    .config("spark.executor.instances", "5") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

In [53]:
df_silver_dim_cliente = spark.read.parquet("spark-warehouse/silver/dim_cliente")
df_silver_dim_produto = spark.read.parquet("spark-warehouse/silver/dim_produto")
df_silver_fato_transacao = spark.read.parquet("spark-warehouse/silver/fato_transacao")

In [54]:
# Unindo as colunas month, day, hour e minute em uma única coluna de data e hora
df_silver_fato_transacao = df_silver_fato_transacao.withColumn(
    "data_hora",
    to_timestamp(
        concat(
            lit(df_silver_fato_transacao["year"]),
            lit("-"),
            lit(df_silver_fato_transacao["month"]),
            lit("-"),
            lit(df_silver_fato_transacao["day"]),
            lit(" "),
            lit(df_silver_fato_transacao["hour"]),
            lit(":"),
            lit(df_silver_fato_transacao["minute"])
        ),
        "yyyy-MM-dd HH:mm"
    )
)

# Removendo as colunas month, day, hour e minute, se necessário
df_silver_fato_transacao = df_silver_fato_transacao.drop("month", "day", "hour", "minute")

In [55]:
df_silver_fato_transacao.show()

+-----------+----------+-----------+--------+-------------+--------+-----------+----+-------------------+
| invoice_id|product_id|customer_id|quantity|tax_5_percent|   total|    payment|year|          data_hora|
+-----------+----------+-----------+--------+-------------+--------+-----------+----+-------------------+
|308-81-0538|       286|          3|       4|        14.61|  306.81|Credit card|2019|2019-02-25 17:16:00|
|834-83-1826|       797|          5|       5|        20.51|  430.71|Credit card|2019|2019-02-25 17:16:00|
|873-95-4984|       633|          5|       7|       26.915| 565.215|       Cash|2019|2019-02-15 20:21:00|
|400-80-4065|       616|         10|       4|        13.71|  287.91|Credit card|2019|2019-02-15 20:21:00|
|188-55-0967|       614|          6|      10|       33.235| 697.935|Credit card|2019|2019-01-15 15:01:00|
|660-29-7083|        90|         12|      10|       27.935| 586.635|       Cash|2019|2019-01-15 15:01:00|
|346-84-3103|         8|          5|       5| 

In [56]:
# Extraindo o mês da coluna data_hora
df_silver_fato_transacao = df_silver_fato_transacao.withColumn("mes", month("data_hora"))

# Agrupando por mês e produto e calculando o total de vendas em quantidade e valor
df_total_vendas = df_silver_fato_transacao.groupBy("mes", "product_id").agg(
    sum("quantity").alias("total_vendas_quantidade"),
    sum("total").alias("total_vendas_valor")
)

# Fazendo o left join com a tabela de produtos para obter o nome do produto
df_total_vendas = df_total_vendas.join(df_silver_dim_produto, "product_id", "left")

# Selecionando as colunas necessárias
df_total_vendas = df_total_vendas.select("mes", "product_line", "unit_price", "total_vendas_quantidade", "total_vendas_valor")

# Ordenando por mês e produto
df_total_vendas = df_total_vendas.orderBy("mes", "product_line", desc("total_vendas_valor"))

# Exportando dados de vendas por produto
df_total_vendas.write.mode("overwrite").parquet("spark-warehouse/gold/vendas_por_produto")

In [57]:
df_total_vendas.show()

+---+--------------------+----------+-----------------------+------------------+
|mes|        product_line|unit_price|total_vendas_quantidade|total_vendas_valor|
+---+--------------------+----------+-----------------------+------------------+
|  1|Electronic access...|     88.67|                     10|           931.035|
|  1|Electronic access...|     87.87|                      9|          830.3715|
|  1|Electronic access...|     74.22|                     10|            779.31|
|  1|Electronic access...|     91.56|                      8|           769.104|
|  1|Electronic access...|     72.13|                     10|           757.365|
|  1|Electronic access...|     96.37|                      7|          708.3195|
|  1|Electronic access...|     93.88|                      7|           690.018|
|  1|Electronic access...|     87.08|                      7|           640.038|
|  1|Electronic access...|     66.65|                      9|          629.8425|
|  1|Electronic access...|  

In [58]:
# Tratamento de Dados Faltantes
df_silver_fato_transacao = df_silver_fato_transacao.fillna(0)  # Preencher valores nulos com zero

avg_quantity_per_product = df_silver_fato_transacao.groupBy("product_id").agg(F.avg("quantity").alias("avg_quantity"))

avg_quantity_per_product = avg_quantity_per_product.join(df_silver_dim_produto, "product_id", "left")

avg_quantity_per_product = avg_quantity_per_product.select("product_line", "unit_price", "avg_quantity")

avg_quantity_per_product = avg_quantity_per_product.orderBy("product_line", "unit_price")

# Exportando dados de média por produto
df_total_vendas.write.mode("overwrite").parquet("spark-warehouse/gold/media_por_produto")

In [59]:
avg_quantity_per_product.show()

+--------------------+----------+------------+
|        product_line|unit_price|avg_quantity|
+--------------------+----------+------------+
|Electronic access...|     10.56|         8.0|
|Electronic access...|     10.59|         3.0|
|Electronic access...|     11.81|         5.0|
|Electronic access...|     11.94|         3.0|
|Electronic access...|     12.05|         5.0|
|Electronic access...|      12.1|         8.0|
|Electronic access...|     12.45|         6.0|
|Electronic access...|     13.22|         5.0|
|Electronic access...|     13.78|         4.0|
|Electronic access...|     14.96|         8.0|
|Electronic access...|     15.28|         5.0|
|Electronic access...|     15.69|         3.0|
|Electronic access...|     17.42|        10.0|
|Electronic access...|     18.77|         6.0|
|Electronic access...|     18.93|         6.0|
|Electronic access...|     19.24|         9.0|
|Electronic access...|     19.32|         7.0|
|Electronic access...|     20.77|         4.0|
|Electronic a