In [0]:
#Quais os produtos que mais tiveram carrinhos abandonados?
#Segue abaixo Script

In [0]:
#Importando bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, least, greatest,month, year, col, lag, when,min,sum, struct,count, concat_ws
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

In [0]:
#Carrega as tabelas
carts = spark.table("default.tb_carts")
cart_entries = spark.table("default.tb_cartentries")

In [0]:
# Filtra carrinhos que consideramos como "abandonados"
# (Ajuste este filtro conforme sua definição de carrinho abandonado)
abandoned_carts = carts.filter(carts.p_deliverystatus.isNull())
abandoned_carts.count()

16047042

In [0]:
# Junta carrinhos abandonados com suas entradas
abandoned_products = abandoned_carts.join(cart_entries, carts.PK == cart_entries.p_order)

In [0]:
# Agrupa por produto e contar
product_counts = abandoned_products.groupBy("p_product").count().orderBy("count", ascending=False)

In [0]:
# Resposta
# Mostra os produtos que mais tiveram carrinhos abandonados
product_counts.show()

+-----------------+-----+
|        p_product|count|
+-----------------+-----+
|8.797231284225E12| 1773|
|8.803303194625E12| 1137|
|8.806227116033E12| 1095|
|8.806415695873E12|  997|
|8.810544365569E12|  658|
|8.803803037697E12|  594|
|8.802941566977E12|  562|
|8.797239410689E12|  544|
|8.804189536257E12|  538|
|8.797220405249E12|  530|
|8.810874339329E12|  495|
|8.797576757249E12|  476|
|8.810511007745E12|  470|
|8.804192780289E12|  456|
|8.805407326209E12|  396|
|8.798170808321E12|  382|
|8.806483427329E12|  347|
|8.803303227393E12|  334|
|8.808349663233E12|  327|
|8.802941796353E12|  318|
+-----------------+-----+
only showing top 20 rows



In [0]:
# Quais as duplas de produtos em conjunto que mais tiveram carrinhos abandonados?
# Segue script abaixo

In [0]:
# Filtrar carrinhos que consideramos como "abandonados"
abandoned_carts_df = carts.filter(carts.p_deliverystatus.isNull())

In [0]:
# Juntar carrinhos abandonados com suas entradas
entries_in_abandoned_carts = abandoned_carts_df.join(cart_entries, carts.PK == cart_entries.p_order)

In [0]:
#join para encontrar pares de produtos no mesmo carrinho
product_pairs = entries_in_abandoned_carts.alias("a").join(entries_in_abandoned_carts.alias("b"), 
                                                  (col("a.p_order") == col("b.p_order")) & 
                                                  (col("a.p_product") != col("b.p_product")))

In [0]:
# Para evitar duplicatas (produto X com produto Y é o mesmo que produto Y com produto X), 
# ordenando os pares
product_pairs = product_pairs.withColumn("product1", least(col("a.p_product"), col("b.p_product")))
product_pairs = product_pairs.withColumn("product2", greatest(col("a.p_product"), col("b.p_product")))

In [0]:
# Agrupar por duplas de produtos e contar
pair_counts = product_pairs.groupBy("product1", "product2").count().orderBy("count", ascending=False)

In [0]:
# Mostrar as duplas de produtos que mais tiveram carrinhos abandonados
pair_counts.show()

+-----------------+-----------------+-----+
|         product1|         product2|count|
+-----------------+-----------------+-----+
|8.797983080449E12|8.800160120833E12| 2702|
|8.800160153601E12|8.801372176385E12| 2288|
|8.797234200577E12|8.799375851521E12| 1934|
|8.797164437505E12|8.800160120833E12| 1444|
|8.800160186369E12|8.802909716481E12| 1444|
|8.797304815617E12|8.800160284673E12| 1412|
|8.797329063937E12|8.800160120833E12| 1152|
|8.797220864001E12|8.799375851521E12| 1024|
|8.797220962305E12|8.797277388801E12| 1006|
|8.797231284225E12|8.797272408065E12|  994|
|8.797277388801E12|8.801370537985E12|  978|
|8.797982916609E12|8.800160153601E12|  968|
|8.797164437505E12|8.797983080449E12|  928|
|8.801370636289E12|8.802941698049E12|  928|
|8.797329293313E12|8.801371226113E12|  896|
|8.797983014913E12|8.797983080449E12|  888|
|8.797145726977E12|8.801372176385E12|  886|
|8.797246619649E12|8.800160153601E12|  876|
|8.800160120833E12|8.800160284673E12|  874|
|8.797234200577E12|8.80016087449

In [0]:
# Quais produtos tiveram um aumento de abandono?
#Segue abaixo


In [0]:
# Extrai mês e ano da data de criação do carrinho
entries_with_month_year = entries_in_abandoned_carts.withColumn("year", year(col("tb_carts.createdTS"))).withColumn("month", month(col("tb_carts.createdTS")))


In [0]:
# Conta abandonos por produto e mês/ano
monthly_abandon_counts = entries_with_month_year.groupBy("p_product", "year", "month").count()

In [0]:
# Ordena e calcular a diferença de abandono entre os meses
windowSpec = Window.partitionBy("p_product").orderBy("year", "month")
monthly_abandon_counts = monthly_abandon_counts.withColumn("prev_month_count", lag("count").over(windowSpec))
monthly_abandon_counts = monthly_abandon_counts.withColumn("diff", col("count") - col("prev_month_count"))

In [0]:
# Filtra produtos que tiveram aumento no abandono
increased_abandon = monthly_abandon_counts.filter(col("diff") > 0)

In [0]:
# Mostra resultados
increased_abandon.show()

+-----------------+----+-----+-----+----------------+----+
|        p_product|year|month|count|prev_month_count|diff|
+-----------------+----+-----+-----+----------------+----+
|8.797141630977E12|2021|   10|    6|               1|   5|
|8.797141630977E12|2021|   11|   33|               6|  27|
|8.797141630977E12|2021|   12|  202|              33| 169|
|8.797141630977E12|2022|    1|  208|             202|   6|
|8.797141630977E12|2022|    5|  134|              17| 117|
|8.797141630977E12|2022|    6|  231|             134|  97|
|8.797142351873E12|2020|   12|    2|               1|   1|
|8.797142351873E12|2021|    2|    3|               2|   1|
|8.797142351873E12|2021|    4|   12|               2|  10|
|8.797142351873E12|2021|    5|   18|              12|   6|
|8.797143531521E12|2020|    5|    2|               1|   1|
|8.797143531521E12|2020|    9|    4|               1|   3|
|8.797143531521E12|2020|   11|    5|               1|   4|
|8.797143531521E12|2020|   12|   16|               5|  1

In [0]:
# Quais produtos tiveram um aumento de abandono?
#Segue abaixo

In [0]:
# Determina o mês de lançamento de cada produto
product_launch_month = entries_in_abandoned_carts.groupBy("p_product").agg(min(col("tb_cartentries.createdTS")).alias("launch_date"))
product_launch_month = product_launch_month.withColumn("launch_year", year(col("launch_date"))).withColumn("launch_month", month(col("launch_date")))


In [0]:
# Junta com as entradas do carrinho para contar quantos carrinhos incluíram o produto durante seu mês de lançamento
launch_month_entries = entries_in_abandoned_carts.join(product_launch_month, "p_product")
launch_month_entries = launch_month_entries.filter((col("launch_year") == year(col("tb_cartentries.createdTS"))) & (col("launch_month") == month(col("tb_cartentries.createdTS"))))


In [0]:
# Conta carrinhos por produto durante seu mês de lançamento
product_counts = launch_month_entries.groupBy("p_product").count().orderBy("count", ascending=False)

In [0]:
# Mostra resultados
product_counts.show()

+-----------------+-----+
|        p_product|count|
+-----------------+-----+
|8.797231284225E12| 1773|
|8.803303194625E12| 1137|
|8.806227116033E12| 1095|
|8.806415695873E12|  997|
|8.810544365569E12|  658|
|8.803803037697E12|  594|
|8.802941566977E12|  562|
|8.797239410689E12|  544|
|8.804189536257E12|  538|
|8.797220405249E12|  530|
|8.810874339329E12|  495|
|8.797576757249E12|  476|
|8.810511007745E12|  470|
|8.804192780289E12|  456|
|8.805407326209E12|  396|
|8.798170808321E12|  382|
|8.806483427329E12|  347|
|8.803303227393E12|  334|
|8.808349663233E12|  327|
|8.802941796353E12|  318|
+-----------------+-----+
only showing top 20 rows



In [0]:
# Quais estados tiveram mais abandonos?
#Segue abaixo

In [0]:
# Carrega as tabelas
carts = spark.table("default.tb_carts")
addresses = spark.table("default.tb_addresses")
regions = spark.table("default.tb_regions")  # Supondo que esta tabela contém os nomes dos estados

In [0]:
# Junta carrinhos abandonados com endereços para obter informações do estado
abandoned_with_address = abandoned_carts_df.join(addresses, abandoned_carts_df.p_deliveryaddress == addresses.PK)


In [0]:
# Junta com a tabela de regiões para obter o nome do estado
abandoned_with_state = abandoned_with_address.join(regions, abandoned_with_address.p_region == regions.PK)


In [0]:
# Agrupa por estado e contar abandonos
state_abandon_counts = abandoned_with_state.groupBy(regions.p_isocode).count().orderBy("count", ascending=False)  # Supondo que p_isocode seja o nome do estado


In [0]:
# Mostra os estados com mais abandonos
state_abandon_counts.show()

+---------+-----+
|p_isocode|count|
+---------+-----+
|    BR-SP|34489|
|    BR-MG|23590|
|    BR-RJ|12139|
|    BR-BA| 9361|
|    BR-RS| 8311|
|    BR-PR| 8238|
|    BR-SC| 7282|
|    BR-GO| 6932|
|    BR-ES| 6041|
|    BR-PE| 4471|
|    BR-CE| 4323|
|    BR-DF| 3466|
|    BR-RN| 2456|
|    BR-PB| 2184|
|    BR-MT| 2000|
|    BR-AL| 1634|
|    BR-MA| 1591|
|    BR-SE| 1387|
|    BR-MS| 1386|
|    BR-PI| 1339|
+---------+-----+
only showing top 20 rows



In [0]:
# Relatorios

# 1-Gere um relatório dos produtos, mês a mês informando a quantidade de carrinhos abandonados, 
# quantidade de itens abandonados e o valor não faturado;

# 2- Gere também, um relatório por data informando a quantidade de carrinhos abandonados, 
#quantidade de itens abandonados e o valor não faturado.

In [0]:
# Carrega as tabelas
carts = spark.table("default.tb_carts")
cart_entries = spark.table("default.tb_cartentries")

In [0]:
# Filtra carrinhos que consideramos como "abandonados"
abandoned_carts_df = carts.filter(carts.p_deliverystatus.isNull())

In [0]:
# Junta carrinhos com suas entradas
entries_in_abandoned_carts = abandoned_carts_df.join(cart_entries, carts.PK == cart_entries.p_order)

In [0]:
# Extrai mês e ano da data de criação do carrinho
entries_with_month_year = entries_in_abandoned_carts.withColumn("year", year(col("tb_carts.createdTS"))).withColumn("month", month(col("tb_carts.createdTS")))


In [0]:
# Agrupa por produto, mês e ano e calcular as métricas desejadas
report_1 = entries_with_month_year.groupBy("p_product", "year", "month") \
                                .agg(sum("tb_cartentries.p_quantity").alias("total_items_abandoned"),
                                     countDistinct("tb_carts.PK").alias("total_carts_abandoned"),
                                     sum("tb_cartentries.p_totalprice").alias("unbilled_amount")) \
                                .orderBy("p_product", "year", "month")

In [0]:
# Mostra o relatório 1
report_1.show()

+-----------------+----+-----+---------------------+---------------------+------------------+
|        p_product|year|month|total_items_abandoned|total_carts_abandoned|   unbilled_amount|
+-----------------+----+-----+---------------------+---------------------+------------------+
|             NULL|2021|   11|                  2.0|                    1|           3399.03|
|8.797141630977E12|NULL| NULL|                 17.0|                   17|         25489.845|
|8.797141630977E12|2021|    7|                  3.0|                    3|           4227.39|
|8.797141630977E12|2021|    8|                  1.0|                    1|           1303.38|
|8.797141630977E12|2021|    9|                  1.0|                    1|1499.4900000000002|
|8.797141630977E12|2021|   10|                  6.0|                    6|           8408.61|
|8.797141630977E12|2021|   11|                 33.0|                   33|43629.795000000006|
|8.797141630977E12|2021|   12|                202.0|        

In [0]:
# Agrupar por data e calcular as métricas desejadas
report_2 = entries_in_abandoned_carts.groupBy("tb_carts.createdTS") \
                                           .agg(sum("tb_cartentries.p_quantity").alias("total_items_abandoned"),
                                                countDistinct("tb_carts.PK").alias("total_carts_abandoned"),
                                                sum("tb_cartentries.p_totalprice").alias("unbilled_amount")) \
                                           .orderBy("tb_carts.createdTS")


In [0]:
# Mostra o relatório
report_2.show()

+--------------------+---------------------+---------------------+--------------------+
|           createdTS|total_items_abandoned|total_carts_abandoned|     unbilled_amount|
+--------------------+---------------------+---------------------+--------------------+
|                NULL|              78529.0|                28758|2.1040910370000228E8|
|2019-12-16 21:19:...|                  4.0|                    1|             6125.76|
|2019-12-18 07:47:...|                  4.0|                    1|   8647.199999999999|
|2019-12-18 20:51:...|                  8.0|                    1|             43847.1|
|2019-12-22 22:09:...|                  4.0|                    1|            15317.28|
|2019-12-23 16:23:...|                  4.0|                    1|             9189.54|
|2019-12-23 16:57:...|                  6.0|                    1|            19327.05|
|2019-12-25 10:01:...|                 21.0|                    1|          141883.515|
|2019-12-27 09:48:...|          

In [0]:
# No final, exporte um arquivo .txt com os 50 carrinhos com os maiores carts.p_totalprice no seguinte  layout:

##  carts.PK|carts.createdTS|carts.p_totalprice|user.p_uid|paymentmodes.p_code|paymentinfos.p_installments|cmss
##  itelp.p_name|addresses.p_postalcode|sum(cartentries.p_quantity)|count(cartentries.PK)
##  cartentries.p_product|cartentries.p_quantity|cartentries.p_totalprice|
##  cartentries.p_product|cartentries.p_quantity|cartentries.p_totalprice|
##  cartentries.p_product|cartentries.p_quantity|cartentries.p_totalprice|
##  cartentries.p_product|cartentries.p_quantity|cartentries.p_totalprice|
##  carts.PK|carts.createdTS|carts.p_totalprice|user.p_uid|paymentmodes.p_code|paymentinfos.p_installments|cmss
##  itelp.p_name|addresses.p_postalcode
##  cartentries.p_product|cartentries.p_quantity|cartentries.p_totalprice|
##  cartentries.p_product|cartentries.p_quantity|cartentries.p_totalprice|
##  carts.PK|carts.createdTS|carts.p_totalprice|user.p_uid|paymentmodes.p_code|paymentinfos.p_installments|cmss
##  itelp.p_name|addresses.p_postalcode
##  cartentries.p_product|cartentries.p_quantity|cartentries.p_totalprice|
##  cartentries.p_product|cartentries.p_quantity|cartentries.p_totalprice|
##  cartentries.p_product|cartentries.p_quantity|cartentries.p_totalprice|


In [0]:
# Carrega as tabelas
carts = spark.table("default.tb_carts")
cart_entries = spark.table("default.tb_cartentries")
users = spark.table("default.tb_users")
paymentmodes = spark.table("default.tb_paymentmodes")
paymentinfos = spark.table("default.tb_paymentinfos")
cmssitelp = spark.table("default.tb_cmssitelp")
addresses = spark.table("default.tb_addresses")

In [0]:
# Junta as tabelas
joined_data = (carts.join(cart_entries, carts.PK == cart_entries.p_order)
               .join(users, carts.OwnerPkString == users.PK)
               .join(paymentmodes, carts.PK == paymentmodes.PK)
               .join(paymentinfos, carts.PK == paymentinfos.PK)
               .join(cmssitelp, carts.PK == cmssitelp.ITEMPK)
               .join(addresses, carts.p_deliveryaddress == addresses.PK))

In [0]:
# Agrega e formata os dados
aggregated_data = (joined_data.groupBy(carts.PK, carts.createdTS, carts.p_totalprice, users.p_uid, paymentmodes.p_code, paymentinfos.p_installments, cmssitelp.p_name, addresses.p_postalcode)
                  .agg(sum(cart_entries.p_quantity).alias("total_quantity"),
                       count(cart_entries.PK).alias("count_entries"),
                       collect_list(struct(cart_entries.p_product, cart_entries.p_quantity, cart_entries.p_totalprice)).alias("entries_data"))
                  .orderBy(carts.p_totalprice, ascending=False)
                  .limit(50))

In [0]:
# Converte a coluna entries_data para uma string formatada
def format_entries(entries):
    return "|".join([f"{entry.p_product}|{entry.p_quantity}|{entry.p_totalprice}" for entry in entries])

format_entries_udf = udf(format_entries, StringType())

formatted_data = aggregated_data.withColumn("formatted_entries", format_entries_udf(col("entries_data"))).drop("entries_data")


In [0]:
# Exporta para um arquivo .txt
(formatted_data.write
 .option("delimiter", "|")
 .option("header", "false")
 .csv("/path/to/output.txt"))