## Starting Spark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/15 11:03:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


25/02/15 11:03:48 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Reading data with Spark

In [2]:
df_order_items = spark.read.csv("/home/jeanlr/projetos/olist/data/raw/olist_order_items_dataset.csv",
                               header=True,
                               inferSchema=True)

df_order_products = spark.read.csv("/home/jeanlr/projetos/olist/data/raw/olist_products_dataset.csv",
                               header=True,
                               inferSchema=True)

df_orders = spark.read.csv("/home/jeanlr/projetos/olist/data/raw/olist_orders_dataset.csv",
                               header=True,
                               inferSchema=True)

In [3]:
## Habilitando uso do SparkSQL
df_order_items.createOrReplaceTempView("df_order_items")
df_order_products.createOrReplaceTempView("df_order_products")
df_orders.createOrReplaceTempView("df_orders")

In [4]:
df_order_items.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

In [5]:
df_order_products.show(5)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [6]:
df_orders.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

## Join table orders_product and orders_items

In [7]:
df_join_order_items = spark.sql("""
SELECT
    t1.*,
    t2.product_category_name,
    t2.product_photos_qty,
    t2.product_weight_g, 
    t2.product_length_cm,
    t2.product_height_cm,   
    t2.product_width_cm                                                                            
FROM df_order_items AS t1
LEFT JOIN df_order_products AS t2 
ON t1.product_id = t2.product_id
ORDER BY order_id;                 
""")
df_join_order_items.createOrReplaceTempView("df_join_order_items")
df_join_order_items.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+---------------------+------------------+----------------+-----------------+-----------------+----------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|product_category_name|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+---------------------+------------------+----------------+-----------------+-----------------+----------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|           cool_stuff|                 4|             650|               28|                9|              14|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|20

## Sumarize product_category_name

In [17]:
from pyspark.sql import functions as F

df_category_counts = df_join_order_items.groupBy("product_category_name").count()
df_category_counts.orderBy(F.desc("count")).show(3, False)  # Exibe as 50 categorias mais comuns

+---------------------+-----+
|product_category_name|count|
+---------------------+-----+
|cama_mesa_banho      |11115|
|beleza_saude         |9670 |
|esporte_lazer        |8641 |
+---------------------+-----+
only showing top 3 rows



In [16]:
threshold = 12000  # Ajuste conforme necess�rio

rare_categories = (
    df_category_counts.filter(F.col("count") < threshold)
    .select("product_category_name")
    .rdd.flatMap(lambda x: x)
    .collect()
)

print(rare_categories)  # Lista das categorias menos frequentes


['pcs', 'bebes', 'artes', 'cine_foto', 'moveis_decoracao', 'pc_gamer', 'construcao_ferramentas_construcao', 'tablets_impressao_imagem', 'artigos_de_festas', 'fashion_roupa_masculina', 'artigos_de_natal', 'la_cuisine', 'flores', 'livros_tecnicos', None, 'telefonia_fixa', 'construcao_ferramentas_seguranca', 'cool_stuff', 'eletrodomesticos', 'livros_importados', 'pet_shop', 'casa_construcao', 'livros_interesse_geral', 'instrumentos_musicais', 'moveis_colchao_e_estofado', 'fashion_roupa_feminina', 'portateis_cozinha_e_preparadores_de_alimentos', 'industria_comercio_e_negocios', 'beleza_saude', 'fraldas_higiene', 'fashion_roupa_infanto_juvenil', 'fashion_underwear_e_moda_praia', 'artes_e_artesanato', 'moveis_escritorio', 'eletronicos', 'malas_acessorios', 'informatica_acessorios', 'moveis_cozinha_area_de_servico_jantar_e_jardim', 'moveis_quarto', 'automotivo', 'agro_industria_e_comercio', 'brinquedos', 'construcao_ferramentas_jardim', 'audio', 'seguros_e_servicos', 'market_place', 'relogios

In [19]:
from pyspark.sql import functions as F

category_mapping = {
    "pcs": "tecnologia",
    "pc_gamer": "tecnologia",
    "tablets_impressao_imagem": "tecnologia",
    "telefonia": "tecnologia",
    "telefonia_fixa": "tecnologia",
    "eletronicos": "tecnologia",
    "informatica_acessorios": "tecnologia",
    "audio": "tecnologia",
    "cine_foto": "tecnologia",
    "consoles_games": "tecnologia",
    "cds_dvds_musicais": "tecnologia",
    "dvds_blu_ray": "tecnologia",
    
    "moveis_decoracao": "casa_construcao",
    "moveis_quarto": "casa_construcao",
    "moveis_sala": "casa_construcao",
    "moveis_cozinha_area_de_servico_jantar_e_jardim": "casa_construcao",
    "moveis_colchao_e_estofado": "casa_construcao",
    "moveis_escritorio": "casa_construcao",
    "casa_construcao": "casa_construcao",
    "construcao_ferramentas_construcao": "casa_construcao",
    "construcao_ferramentas_seguranca": "casa_construcao",
    "construcao_ferramentas_jardim": "casa_construcao",
    "construcao_ferramentas_iluminacao": "casa_construcao",
    "construcao_ferramentas_ferramentas": "casa_construcao",
    "casa_conforto": "casa_construcao",
    "casa_conforto_2": "casa_construcao",
    "sinalizacao_e_seguranca": "casa_construcao",
    
    "eletrodomesticos": "eletrodomesticos",
    "eletrodomesticos_2": "eletrodomesticos",
    "portateis_cozinha_e_preparadores_de_alimentos": "eletrodomesticos",
    "portateis_casa_forno_e_cafe": "eletrodomesticos",
    "eletroportateis": "eletrodomesticos",
    "climatizacao": "eletrodomesticos",
    "utilidades_domesticas": "eletrodomesticos",

    "fashion_roupa_feminina": "moda",
    "fashion_roupa_masculina": "moda",
    "fashion_roupa_infanto_juvenil": "moda",
    "fashion_calcados": "moda",
    "fashion_bolsas_e_acessorios": "moda",
    "fashion_underwear_e_moda_praia": "moda",
    "fashion_esporte": "moda",
    "malas_acessorios": "moda",
    "relogios_presentes": "moda",

    "beleza_saude": "beleza_saude",
    "perfumaria": "beleza_saude",
    "fraldas_higiene": "beleza_saude",

    "esporte_lazer": "esporte_lazer",
    "brinquedos": "esporte_lazer",
    "instrumentos_musicais": "esporte_lazer",
    "musica": "esporte_lazer",
    "agro_industria_e_comercio": "esporte_lazer",

    "alimentos": "alimentos_bebidas",
    "alimentos_bebidas": "alimentos_bebidas",
    "bebidas": "alimentos_bebidas",
    "la_cuisine": "alimentos_bebidas",

    "pet_shop": "pets_bebes",
    "bebes": "pets_bebes",

    "livros_tecnicos": "papelaria_cultura",
    "livros_importados": "papelaria_cultura",
    "livros_interesse_geral": "papelaria_cultura",
    "papelaria": "papelaria_cultura",
    "artes": "papelaria_cultura",
    "artes_e_artesanato": "papelaria_cultura",

    "seguros_e_servicos": "outros",
    "market_place": "outros",
    "cool_stuff": "outros",
    "artigos_de_festas": "outros",
    "artigos_de_natal": "outros",
    "industria_comercio_e_negocios": "outros",
    "flores": "outros",
    None: "outros"
}

# Criar um dicion�rio Spark com F.create_map
category_map_expr = F.create_map(
    *sum([[F.lit(k), F.lit(v)] for k, v in category_mapping.items()], [])
)

# Aplicar a transforma��o
df_join_order_items = df_join_order_items.withColumn(
    "product_category_group",
    F.coalesce(category_map_expr[F.col("product_category_name")], F.lit("outros"))
)

# Verificar os grupos criados
df_join_order_items.groupBy("product_category_group").count().orderBy(F.desc("count")).show()


+----------------------+-----+
|product_category_group|count|
+----------------------+-----+
|                outros|25906|
|            tecnologia|17349|
|       casa_construcao|13991|
|         esporte_lazer|13688|
|          beleza_saude|13128|
|                  moda| 9725|
|      eletrodomesticos| 9040|
|            pets_bebes| 5012|
|     papelaria_cultura| 3630|
|     alimentos_bebidas| 1181|
+----------------------+-----+



In [20]:
df_join_order_items.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+---------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|product_category_name|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_category_group|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+---------------------+------------------+----------------+-----------------+-----------------+----------------+----------------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|           cool_stuff|                 4|             650|               28|                9|              14|            

In [24]:
df_join_order_items.createOrReplaceTempView("df_join_order_items")

25/02/15 11:44:00 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [25]:
df_orders.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

## Join order_items and orders

In [38]:
df_join_order_items_and_orders = spark.sql("""
SELECT
    t1.*,
    t2.seller_id,                                      
    t2.product_category_group,
    t2.product_photos_qty,
    ROUND((t2.price + t2.freight_value), 2) AS total_price,
    ROUND((t2.product_weight_g / NULLIF((t2.product_length_cm * t2.product_height_cm * t2.product_width_cm), 0)), 2) AS density,
    ROUND((t2.price / NULLIF((t2.product_weight_g / 1000), 0)), 2) AS price_per_kg,
    ROUND((t2.product_length_cm * t2.product_height_cm * t2.product_width_cm), 2) AS volume_cm3,
    ROUND((t2.freight_value / NULLIF(t2.price, 0)), 2) AS freight_ratio,
    DATEDIFF(t1.order_delivered_customer_date, t1.order_approved_at) AS days_to_delivery,
    DATEDIFF(t1.order_estimated_delivery_date, t1.order_purchase_timestamp) AS days_to_estimated_delivery,
    DATEDIFF(t1.order_delivered_customer_date, t1.order_estimated_delivery_date) AS delivery_delay                                                                                    
FROM df_orders AS t1
LEFT JOIN df_join_order_items AS t2 
ON t1.order_id = t2.order_id
WHERE order_status = 'delivered'
ORDER BY order_id;                 
""")

df_join_order_items_and_orders.createOrReplaceTempView("df_join_order_items_and_orders")
df_join_order_items_and_orders.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+----------------------+------------------+-----------+-------+------------+----------+-------------+----------------+--------------------------+--------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|           seller_id|product_category_group|product_photos_qty|total_price|density|price_per_kg|volume_cm3|freight_ratio|days_to_delivery|days_to_estimated_delivery|delivery_delay|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+----------------------+------------------+-----

In [161]:
df_seller_orders = spark.sql("""
SELECT
    seller_id,
    to_date(order_purchase_timestamp) AS order_purchase_date, 
    product_category_group,
    product_photos_qty, 
    total_price,
    density,
    price_per_kg,
    volume_cm3,
    freight_ratio,
    days_to_delivery,
    days_to_estimated_delivery,
    delivery_delay,
    date_format(to_date(order_purchase_timestamp), 'yyyyMM') as PK_DATREF,
    CURRENT_DATE as PK_DAT_PROC                                                                                                                                                                                                                                                                                                                                                                                                                
FROM df_join_order_items_and_orders
ORDER BY 1,2;                 
""")

df_seller_orders.createOrReplaceTempView("df_seller_orders")
df_seller_orders.show(5)

+--------------------+-------------------+----------------------+------------------+-----------+-------+------------+----------+-------------+----------------+--------------------------+--------------+---------+-----------+
|           seller_id|order_purchase_date|product_category_group|product_photos_qty|total_price|density|price_per_kg|volume_cm3|freight_ratio|days_to_delivery|days_to_estimated_delivery|delivery_delay|PK_DATREF|PK_DAT_PROC|
+--------------------+-------------------+----------------------+------------------+-----------+-------+------------+----------+-------------+----------------+--------------------------+--------------+---------+-----------+
|0015a82c2db000af6...|         2017-09-26|      eletrodomesticos|                 2|     916.02|   0.19|       75.85|     61920|         0.02|              10|                        34|           -23|   201709| 2025-02-15|
|0015a82c2db000af6...|         2017-10-12|      eletrodomesticos|                 2|     916.02|   0.19|

In [None]:
from pyspark.sql.functions import max, min
df_seller_orders.select(
    min("PK_DATREF").alias("min_PK_DATREF"),  
    max("PK_DATREF").alias("max_PK_DATREF")
).show()


+-------------+-------------+
|min_PK_DATREF|max_PK_DATREF|
+-------------+-------------+
|       201609|       201808|
+-------------+-------------+



In [304]:
# Definir as datas de referência
datref_min = 201802  
datref = 201808  

df_safra_01 = spark.sql(f"""
SELECT
    seller_id,
    order_purchase_date, 
    product_category_group,
    product_photos_qty, 
    total_price,
    density,
    price_per_kg,
    volume_cm3,
    freight_ratio,
    days_to_delivery,
    days_to_estimated_delivery,
    delivery_delay,
    PK_DATREF
FROM df_seller_orders
WHERE PK_DATREF BETWEEN {datref_min} AND {datref} 
AND seller_id IN (
    SELECT DISTINCT seller_id 
    FROM df_seller_orders 
    WHERE PK_DATREF = {datref}
)
ORDER BY PK_DATREF DESC, seller_id ASC;
""")

df_safra_01.createOrReplaceTempView("df_safra_01")
df_safra_01.show(5)


+--------------------+-------------------+----------------------+------------------+-----------+-------+------------+----------+-------------+----------------+--------------------------+--------------+---------+
|           seller_id|order_purchase_date|product_category_group|product_photos_qty|total_price|density|price_per_kg|volume_cm3|freight_ratio|days_to_delivery|days_to_estimated_delivery|delivery_delay|PK_DATREF|
+--------------------+-------------------+----------------------+------------------+-----------+-------+------------+----------+-------------+----------------+--------------------------+--------------+---------+
|00720abe85ba08598...|         2018-08-03|      eletrodomesticos|                 2|     141.67|   0.04|       220.0|     13875|         0.07|              10|                         5|             5|   201808|
|00ee68308b45bc5e2...|         2018-08-09|                outros|                 1|     115.96|   0.03|        82.5|     34639|         0.17|          

## Creating window flags for history:

In [305]:
df_temp_01 = spark.sql("""
SELECT
    *,
      CASE
        WHEN order_purchase_date BETWEEN DATE_ADD(MAX(order_purchase_date) OVER (PARTITION BY seller_id), -30) AND MAX(order_purchase_date) OVER (PARTITION BY seller_id) THEN 1
        ELSE 0
    END AS U1M,                       
      CASE
        WHEN order_purchase_date BETWEEN DATE_ADD(MAX(order_purchase_date) OVER (PARTITION BY seller_id), -90) AND MAX(order_purchase_date) OVER (PARTITION BY seller_id) THEN 1
        ELSE 0
    END AS U3M,
    CASE
        WHEN order_purchase_date BETWEEN DATE_ADD(MAX(order_purchase_date) OVER (PARTITION BY seller_id), -180) AND MAX(order_purchase_date) OVER (PARTITION BY seller_id) THEN 1
        ELSE 0
    END AS U6M
FROM df_safra_01
ORDER BY seller_id, order_purchase_date;
""")
df_temp_01.createOrReplaceTempView("df_temp_01")
df_temp_01.count()


45068

In [306]:
df_temp_01.show(5)

+--------------------+-------------------+----------------------+------------------+-----------+-------+------------+----------+-------------+----------------+--------------------------+--------------+---------+---+---+---+
|           seller_id|order_purchase_date|product_category_group|product_photos_qty|total_price|density|price_per_kg|volume_cm3|freight_ratio|days_to_delivery|days_to_estimated_delivery|delivery_delay|PK_DATREF|U1M|U3M|U6M|
+--------------------+-------------------+----------------------+------------------+-----------+-------+------------+----------+-------------+----------------+--------------------------+--------------+---------+---+---+---+
|00720abe85ba08598...|         2018-02-06|      eletrodomesticos|                 3|      87.36|   0.06|       91.76|     13500|         0.12|               9|                        16|            -7|   201802|  0|  0|  1|
|00720abe85ba08598...|         2018-03-02|      eletrodomesticos|                 2|       28.6|   0.03|

## Creating first-layer explanatory variables

In [307]:
df_temp_02 = spark.sql("""
SELECT
    seller_id,
    max(date_format(to_date(order_purchase_date), 'yyyyMM')) as PK_DATREF,                   
    round(sum(total_price),2) as VL_TOT_PRICE,
    round(avg(total_price),2) as VL_MED_PRICE,
    round(max(total_price),2) as VL_MAX_PRICE,
    round(min(total_price),2) as VL_MIN_PRICE,
    round(sum(product_photos_qty),2) as VL_TOT_QTD_PHOTOS,
    round(avg(product_photos_qty),2) as VL_MED_QTD_PHOTOS,
    round(max(product_photos_qty),2) as VL_MAX_QTD_PHOTOS,
    round(min(product_photos_qty),2) as VL_MIN_QTD_PHOTOS,
    round(sum(density),2) as VL_TOT_DENSITY,
    round(avg(density),2) as VL_MED_DENSITY,
    round(max(density),2) as VL_MAX_DENSITY,
    round(min(density),2) as VL_MIN_DENSITY,
    round(sum(price_per_kg),2) as VL_TOT_PRC_PER_KG,
    round(avg(price_per_kg),2) as VL_MED_PRC_PER_KG,
    round(max(price_per_kg),2) as VL_MAX_PRC_PER_KG,
    round(min(price_per_kg),2) as VL_MIN_PRC_PER_KG, 
    round(sum(volume_cm3),2) as VL_TOT_VOL_CM3,
    round(avg(volume_cm3),2) as VL_MED_VOL_CM3,
    round(max(volume_cm3),2) as VL_MAX_VOL_CM3,
    round(min(volume_cm3),2) as VL_MIN_VOL_CM3,
    round(sum(freight_ratio),2) as VL_TOT_COST_REL_FRETE,
    round(avg(freight_ratio),2) as VL_MED_COST_REL_FRETE,
    round(max(freight_ratio),2) as VL_MAX_COST_REL_FRETE,
    round(min(freight_ratio),2) as VL_MIN_COST_REL_FRETE,  
    round(sum(days_to_delivery),2) as VL_TOT_DAYS_DELIVERY,
    round(avg(days_to_delivery),2) as VL_MED_DAYS_DELIVERY,
    round(max(days_to_delivery),2) as VL_MAX_DAYS_DELIVERY,
    round(min(days_to_delivery),2) as VL_MIN_DAYS_DELIVERY,    
    round(sum(days_to_estimated_delivery),2) as VL_TOT_DAYS_ESTIMATED_DELIVERY,
    round(avg(days_to_estimated_delivery),2) as VL_MED_DAYS_ESTIMATED_DELIVERY,
    round(max(days_to_estimated_delivery),2) as VL_MAX_DAYS_ESTIMATED_DELIVERY,
    round(min(days_to_estimated_delivery),2) as VL_MIN_DAYS_ESTIMATED_DELIVERY,  
    round(sum(delivery_delay),2) as VL_TOT_DELIVERY_DELAY,
    round(avg(delivery_delay),2) as VL_MED_DELIVERY_DELAY,
    round(max(delivery_delay),2) as VL_MAX_DELIVERY_DELAY,
    round(min(delivery_delay),2) as VL_MIN_DELIVERY_DELAY,                                                                                                                                                                                              
    round(sum(case when product_category_group = 'outros' then total_price else 0 end),2) as VL_TOT_CONS_OUTROS,
    round(avg(case when product_category_group = 'outros' then total_price else NULL end),2) as VL_MED_CONS_OUTROS,
    round(avg(case when product_category_group = 'outros' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_OUTROS,                   
    round(avg(case when product_category_group = 'outros' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_OUTROS,
    round(avg(case when product_category_group = 'outros' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_OUTROS,
    round(sum(case when product_category_group = 'tecnologia' then total_price else 0 end),2) as VL_TOT_CONS_TECN,
    round(avg(case when product_category_group = 'tecnologia' then total_price else NULL end),2) as VL_MED_CONS_TECN,
    round(avg(case when product_category_group = 'tecnologia' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_TECN,                   
    round(avg(case when product_category_group = 'tecnologia' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_TECN,
    round(avg(case when product_category_group = 'tecnologia' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_TECN,
    round(sum(case when product_category_group = 'casa_construcao' then total_price else 0 end),2) as VL_TOT_CONS_CASA_CONSTR,
    round(avg(case when product_category_group = 'casa_construcao' then total_price else NULL end),2) as VL_MED_CONS_CASA_CONSTR,
    round(avg(case when product_category_group = 'casa_construcao' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_CASA_CONSTR,                   
    round(avg(case when product_category_group = 'casa_construcao' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_CASA_CONSTR,
    round(avg(case when product_category_group = 'casa_construcao' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_CASA_CONSTR,
    round(sum(case when product_category_group = 'esporte_lazer' then total_price else 0 end),2) as VL_TOT_CONS_ESP_LZR,
    round(avg(case when product_category_group = 'esporte_lazer' then total_price else NULL end),2) as VL_MED_CONS_ESP_LZR,
    round(avg(case when product_category_group = 'esporte_lazer' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_ESP_LZR,                   
    round(avg(case when product_category_group = 'esporte_lazer' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_ESP_LZR,
    round(avg(case when product_category_group = 'esporte_lazer' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_ESP_LZR, 
    round(sum(case when product_category_group = 'beleza_saude' then total_price else 0 end),2) as VL_TOT_CONS_BLZ_SAUDE,
    round(avg(case when product_category_group = 'beleza_saude' then total_price else NULL end),2) as VL_MED_CONS_BLZ_SAUDE,
    round(avg(case when product_category_group = 'beleza_saude' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_BLZ_SAUDE,                   
    round(avg(case when product_category_group = 'beleza_saude' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_BLZ_SAUDE,
    round(avg(case when product_category_group = 'beleza_saude' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_BLZ_SAUDE, 
    round(sum(case when product_category_group = 'moda' then total_price else 0 end),2) as VL_TOT_CONS_MODA,
    round(avg(case when product_category_group = 'moda' then total_price else NULL end),2) as VL_MED_CONS_MODA,
    round(avg(case when product_category_group = 'moda' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_MODA,                   
    round(avg(case when product_category_group = 'moda' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_MODA,
    round(avg(case when product_category_group = 'moda' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_MODA,
    round(sum(case when product_category_group = 'eletrodomesticos' then total_price else 0 end),2) as VL_TOT_CONS_ELETRODOMESTC,
    round(avg(case when product_category_group = 'eletrodomesticos' then total_price else NULL end),2) as VL_MED_CONS_ELETRODOMESTC,
    round(avg(case when product_category_group = 'eletrodomesticos' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_ELETRODOMESTC,                   
    round(avg(case when product_category_group = 'eletrodomesticos' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_ELETRODOMESTC,
    round(avg(case when product_category_group = 'eletrodomesticos' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_ELETRODOMESTC,
    round(sum(case when product_category_group = 'pets_bebes' then total_price else 0 end),2) as VL_TOT_CONS_PETS_AND_BEBES,
    round(avg(case when product_category_group = 'pets_bebes' then total_price else NULL end),2) as VL_MED_CONS_PETS_AND_BEBES,
    round(avg(case when product_category_group = 'pets_bebes' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_PETS_AND_BEBES,                   
    round(avg(case when product_category_group = 'pets_bebes' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_PETS_AND_BEBES,
    round(avg(case when product_category_group = 'pets_bebes' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_PETS_AND_BEBES,
    round(sum(case when product_category_group = 'papelaria_cultura' then total_price else 0 end),2) as VL_TOT_CONS_PAPEL_AND_CULT,
    round(avg(case when product_category_group = 'papelaria_cultura' then total_price else NULL end),2) as VL_MED_CONS_PAPEL_AND_CULT,
    round(avg(case when product_category_group = 'papelaria_cultura' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_PAPEL_AND_CULT,                   
    round(avg(case when product_category_group = 'papelaria_cultura' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_PAPEL_AND_CULT,
    round(avg(case when product_category_group = 'papelaria_cultura' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_PAPEL_AND_CULT, 
    round(sum(case when product_category_group = 'alimentos_bebidas' then total_price else 0 end),2) as VL_TOT_CONS_ALIM_AND_BEBIDAS,
    round(avg(case when product_category_group = 'alimentos_bebidas' then total_price else NULL end),2) as VL_MED_CONS_ALIM_AND_BEBIDAS,
    round(avg(case when product_category_group = 'alimentos_bebidas' and U1M = 1 then total_price else NULL end),2) as VL_MED_U1M_CONS_ALIM_AND_BEBIDAS,                   
    round(avg(case when product_category_group = 'alimentos_bebidas' and U3M = 1 then total_price else NULL end),2) as VL_MED_U3M_CONS_ALIM_AND_BEBIDAS,
    round(avg(case when product_category_group = 'alimentos_bebidas' and U6M = 1 then total_price else NULL end),2) as VL_MED_U6M_CONS_ALIM_AND_BEBIDAS                                                                                                                                                                                                                           
FROM df_temp_01
GROUP BY seller_id
ORDER BY seller_id
""")
df_temp_02.createOrReplaceTempView("df_temp_02")
df_temp_02.count()

1261

In [308]:
df_temp_02.show(5)

+--------------------+---------+------------+------------+------------+------------+-----------------+-----------------+-----------------+-----------------+--------------+--------------+--------------+--------------+-----------------+-----------------+-----------------+-----------------+--------------+--------------+--------------+--------------+---------------------+---------------------+---------------------+---------------------+--------------------+--------------------+--------------------+--------------------+------------------------------+------------------------------+------------------------------+------------------------------+---------------------+---------------------+---------------------+---------------------+------------------+------------------+----------------------+----------------------+----------------------+----------------+----------------+--------------------+--------------------+--------------------+-----------------------+-----------------------+----------------

## Creating second-layer explanatory variables

In [309]:
df_temp_03 = spark.sql("""
SELECT
*,
round(VL_MED_U1M_CONS_OUTROS/VL_MED_U3M_CONS_OUTROS,2) as VL_RAZ_MED_U1M_U3M_CONS_OUTROS,
round(VL_MED_U3M_CONS_OUTROS/VL_MED_U6M_CONS_OUTROS,2) as VL_RAZ_MED_U3M_U6M_CONS_OUTROS,
round(VL_MED_U1M_CONS_TECN/VL_MED_U3M_CONS_TECN,2) as VL_RAZ_MED_U1M_U3M_CONS_TECN,
round(VL_MED_U3M_CONS_TECN/VL_MED_U6M_CONS_TECN,2) as VL_RAZ_MED_U3M_U6M_CONS_TECN,  
round(VL_MED_U1M_CONS_CASA_CONSTR/VL_MED_U3M_CONS_CASA_CONSTR,2) as VL_RAZ_MED_U1M_U3M_CONS_CASA_CONSTR,
round(VL_MED_U3M_CONS_CASA_CONSTR/VL_MED_U6M_CONS_CASA_CONSTR,2) as VL_RAZ_MED_U3M_U6M_CONS_CASA_CONSTR,     
round(VL_MED_U1M_CONS_ESP_LZR/VL_MED_U3M_CONS_ESP_LZR,2) as VL_RAZ_MED_U1M_U3M_CONS_ESP_LZR,
round(VL_MED_U3M_CONS_ESP_LZR/VL_MED_U6M_CONS_ESP_LZR,2) as VL_RAZ_MED_U3M_U6M_CONS_ESP_LZR, 
round(VL_MED_U1M_CONS_BLZ_SAUDE/VL_MED_U3M_CONS_BLZ_SAUDE,2) as VL_RAZ_MED_U1M_U3M_CONS_BLZ_SAUDE,
round(VL_MED_U3M_CONS_BLZ_SAUDE/VL_MED_U6M_CONS_BLZ_SAUDE,2) as VL_RAZ_MED_U3M_U6M_CONS_BLZ_SAUDE, 
round(VL_MED_U1M_CONS_MODA/VL_MED_U3M_CONS_MODA,2) as VL_RAZ_MED_U1M_U3M_CONS_MODA,
round(VL_MED_U3M_CONS_MODA/VL_MED_U6M_CONS_MODA,2) as VL_RAZ_MED_U3M_U6M_CONS_MODA,     
round(VL_MED_U1M_CONS_ELETRODOMESTC/VL_MED_U3M_CONS_ELETRODOMESTC,2) as VL_RAZ_MED_U1M_U3M_CONS_ELETRODOMESTC,
round(VL_MED_U3M_CONS_ELETRODOMESTC/VL_MED_U6M_CONS_ELETRODOMESTC,2) as VL_RAZ_MED_U3M_U6M_CONS_ELETRODOMESTC,   
round(VL_MED_U1M_CONS_PETS_AND_BEBES/VL_MED_U3M_CONS_PETS_AND_BEBES,2) as VL_RAZ_MED_U1M_U3M_CONS_PETS_AND_BEBES,
round(VL_MED_U3M_CONS_PETS_AND_BEBES/VL_MED_U6M_CONS_PETS_AND_BEBES,2) as VL_RAZ_MED_U3M_U6M_CONS_PETS_AND_BEBES, 
round(VL_MED_U1M_CONS_PAPEL_AND_CULT/VL_MED_U3M_CONS_PAPEL_AND_CULT,2) as VL_RAZ_MED_U1M_U3M_CONS_PAPEL_AND_CULT,
round(VL_MED_U3M_CONS_PAPEL_AND_CULT/VL_MED_U6M_CONS_PAPEL_AND_CULT,2) as VL_RAZ_MED_U3M_U6M_CONS_PAPEL_AND_CULT,  
round(VL_MED_U1M_CONS_ALIM_AND_BEBIDAS/VL_MED_U3M_CONS_ALIM_AND_BEBIDAS,2) as VL_RAZ_MED_U1M_U3M_CONS_ALIM_AND_BEBIDAS,
round(VL_MED_U3M_CONS_ALIM_AND_BEBIDAS/VL_MED_U6M_CONS_ALIM_AND_BEBIDAS,2) as VL_RAZ_MED_U3M_U6M_CONS_ALIM_AND_BEBIDAS                                                                                                                                                                                                           
FROM df_temp_02

""")

df_temp_03.createOrReplaceTempView("df_temp_03")
df_temp_03.count()

1261

In [310]:
df_temp_03.show(5)

+--------------------+---------+------------+------------+------------+------------+-----------------+-----------------+-----------------+-----------------+--------------+--------------+--------------+--------------+-----------------+-----------------+-----------------+-----------------+--------------+--------------+--------------+--------------+---------------------+---------------------+---------------------+---------------------+--------------------+--------------------+--------------------+--------------------+------------------------------+------------------------------+------------------------------+------------------------------+---------------------+---------------------+---------------------+---------------------+------------------+------------------+----------------------+----------------------+----------------------+----------------+----------------+--------------------+--------------------+--------------------+-----------------------+-----------------------+----------------

## Save Book Seller

In [311]:
import sqlite3

# Conectar ao banco SQLite (cria o arquivo se não existir)
conn = sqlite3.connect('../data/processed/tb_book_seller.db')
cursor = conn.cursor()

# Gerar a estrutura da tabela a partir do schema do DataFrame
columns = ', '.join([f"{col.name} {col.dataType.simpleString().upper()}" for col in df_temp_03.schema])
create_table_sql = f"CREATE TABLE IF NOT EXISTS tb_book_seller ({columns})"

# Criar a tabela
cursor.execute(create_table_sql)

# Inserir os dados
# Convertendo o DataFrame PySpark para Pandas DataFrame
pandas_df_temp_03 = df_temp_03.toPandas()

# Inserir os dados na tabela SQLite
for index, row in pandas_df_temp_03.iterrows():
    cursor.execute(f"INSERT INTO tb_book_seller ({', '.join(pandas_df_temp_03.columns)}) VALUES ({', '.join(['?'] * len(row))})", tuple(row))

# Commit e fechar a conexão
conn.commit()
conn.close()