In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year
from pyspark.sql.functions import month
from pyspark.sql.functions import col, sum, row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import date_format
from pyspark.sql import functions as F

In [5]:
spark = SparkSession.builder.appName("MiApp").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [6]:
spark = SparkSession.builder \
    .appName("Cargar Archivos CSV") \
    .config("spark.sql.warehouse.dir", "file:///C:/tmp/spark-warehouse") \
    .config("spark.security.manager.enabled", "false") \
    .config("spark.hadoop.security.authentication", "none") \
    .getOrCreate()

In [7]:
# Ruta a los archivos
sales_path = "../data/raw/sales.csv"
product_hierarchy_path = "../data/raw/product_hierarchy.csv"
store_cities_path = "../data/raw/store_cities.csv"

# Cargar los archivos
df_sales = spark.read.csv(sales_path, header=True, inferSchema=True)
df_product_hierarchy = spark.read.csv(product_hierarchy_path, header=True, inferSchema=True)
df_store_cities = spark.read.csv(store_cities_path, header=True, inferSchema=True)

                                                                                

In [8]:
df_sales.show(5)

+----------+--------+----------+-----+-------+-----+-----+------------+-----------+------------+-----------+----------------+---------------------+
|product_id|store_id|      date|sales|revenue|stock|price|promo_type_1|promo_bin_1|promo_type_2|promo_bin_2|promo_discount_2|promo_discount_type_2|
+----------+--------+----------+-----+-------+-----+-----+------------+-----------+------------+-----------+----------------+---------------------+
|     P0001|   S0002|2017-01-02|  0.0|    0.0|  8.0| 6.25|        PR14|       NULL|        PR03|       NULL|            NULL|                 NULL|
|     P0001|   S0012|2017-01-02|  1.0|    5.3|  0.0| 6.25|        PR14|       NULL|        PR03|       NULL|            NULL|                 NULL|
|     P0001|   S0013|2017-01-02|  2.0|  10.59|  0.0| 6.25|        PR14|       NULL|        PR03|       NULL|            NULL|                 NULL|
|     P0001|   S0023|2017-01-02|  0.0|    0.0|  6.0| 6.25|        PR14|       NULL|        PR03|       NULL|    

In [9]:
df_product_hierarchy.show(5)

+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+
|product_id|product_length|product_depth|product_width|cluster_id|hierarchy1_id|hierarchy2_id|hierarchy3_id|hierarchy4_id|hierarchy5_id|
+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+
|     P0000|           5.0|         20.0|         12.0|      NULL|          H00|        H0004|      H000401|    H00040105|  H0004010534|
|     P0001|          13.5|         22.0|         20.0| cluster_5|          H01|        H0105|      H010501|    H01050100|  H0105010006|
|     P0002|          22.0|         40.0|         22.0| cluster_0|          H03|        H0315|      H031508|    H03150800|  H0315080028|
|     P0004|           2.0|         13.0|          4.0| cluster_3|          H03|        H0314|      H031405|    H03140500|  H0314050003|
|     P0005|          16.0|         30.0|

In [10]:
df_store_cities.show(5)

+--------+------------+----------+-------+
|store_id|storetype_id|store_size|city_id|
+--------+------------+----------+-------+
|   S0091|        ST04|        19|   C013|
|   S0012|        ST04|        28|   C005|
|   S0045|        ST04|        17|   C008|
|   S0032|        ST03|        14|   C019|
|   S0027|        ST04|        24|   C022|
+--------+------------+----------+-------+
only showing top 5 rows



## **Productos más populares cada mes durante el año 2018**

In [20]:
#Filtrar año deseado
df_sales_2018 = df_sales.filter(year(df_sales["date"]) == 2018)

In [34]:
#Extraer el mes
df_sales_2018 = df_sales_2018.withColumn("month_num", month(df_sales_2018["date"]))

# Añadir el nombre del mes
df_sales_2018 = df_sales_2018.withColumn("month", date_format(df_sales_2018["date"], "MMMM"))

In [52]:
# Agrupar por mes y producto, sumar las ventas
sales_group_2018 = df_sales_2018.groupBy("month", "month_num", "product_id").agg(sum("sales").alias("total_sales"))

In [63]:
# Definir una ventana para clasificar los productos
window = Window.partitionBy("month").orderBy(col("total_sales").desc())

# Añadir una columna de ranking
sales_ranked = sales_group_2018.withColumn("rank", row_number().over(window))

# Filtrar solo los productos más populares (5)
top_5_products_by_month = sales_ranked.filter(col("rank") <= 5).select("month", "month_num", "product_id", "total_sales")\
.orderBy("month_num", F.col("total_sales").desc())

In [64]:
top_5_products_by_month.select("month", "product_id", "total_sales").show()



+--------+----------+-----------+
|   month|product_id|total_sales|
+--------+----------+-----------+
| January|     P0438|    21326.0|
| January|     P0103|    19046.0|
| January|     P0364|     7984.0|
| January|     P0051|     7754.0|
| January|     P0590|     5692.0|
|February|     P0438|    21485.0|
|February|     P0103|    16474.0|
|February|     P0051|     7555.0|
|February|     P0388|     6212.0|
|February|     P0590|     5952.0|
|   March|     P0438|    23154.0|
|   March|     P0103|    17923.0|
|   March|     P0388|    11840.0|
|   March|     P0364|    10505.0|
|   March|     P0051|     9635.0|
|   April|     P0438|    24392.0|
|   April|     P0103|    19166.0|
|   April|     P0388|    10965.0|
|   April|     P0051|     7448.0|
|   April|     P0131|     6788.0|
+--------+----------+-----------+
only showing top 20 rows



                                                                                

## **En cuántas tiendas se implementaron promociones del tipo 1 en el canal 1 durante el año 2019**

In [66]:
# Filtrar las filas donde promo_type_1 no es nulo y la fecha corresponde a 2019
promotions_filtered = df_sales.filter(
    (F.col("promo_type_1").isNotNull()) &  # promo_type_1 no debe ser nulo
    (F.year(F.col("date")) == 2019)        # El año de la fecha debe ser 2019
)

In [69]:
# Contar cuántas tiendas únicas implementaron esas promociones
stores_count = promotions_filtered.select("store_id").distinct().count()

print(f"Cantidad de tiendas que implementaron promociones del tipo 1 en 2019: {stores_count}")



Cantidad de tiendas que implementaron promociones del tipo 1 en 2019: 144


                                                                                

## **Cuál fue la ciudad que tuvo el mayor volumen de ventas en general durante todo el periodo de 2017 a 2019**

In [70]:
# Filtrar el DataFrame de ventas para el periodo 2017-2019
sales_filtered = df_sales.filter(
    (F.year(F.col("date")) >= 2017) & (F.year(F.col("date")) <= 2019)
)

In [71]:
# Agrupar por store_id y sumar el total de ventas 
sales_by_store = sales_filtered.groupBy("store_id").agg(
    F.sum("sales").alias("total_sales")
)

In [72]:
# Eliminar duplicados de df_store_cities
df_store_cities_unique = df_store_cities.dropDuplicates(["store_id"])

In [73]:
# Unir el DataFrame de ventas con el DataFrame de ciudades usando store_id
sales_with_cities = sales_by_store.join(df_store_cities_unique, on="store_id", how="left")

In [74]:
# Agrupar por city_id y calcular el total de ventas por ciudad
sales_by_city = sales_with_cities.groupBy("city_id").agg(
    F.sum("total_sales").alias("total_sales_by_city")
)

In [75]:
# Ordenar las ciudades por el total de ventas de forma descendente y seleccionar la ciudad con más ventas
top_city = sales_by_city.orderBy(F.col("total_sales_by_city").desc()).limit(1)

# Mostrar el resultado
top_city.show()

                                                                                

+-------+-------------------+
|city_id|total_sales_by_city|
+-------+-------------------+
|   C014|        2573439.552|
+-------+-------------------+



In [8]:
df_sales.show(5)

+----------+--------+----------+-----+-------+-----+-----+------------+-----------+------------+-----------+----------------+---------------------+
|product_id|store_id|      date|sales|revenue|stock|price|promo_type_1|promo_bin_1|promo_type_2|promo_bin_2|promo_discount_2|promo_discount_type_2|
+----------+--------+----------+-----+-------+-----+-----+------------+-----------+------------+-----------+----------------+---------------------+
|     P0001|   S0002|2017-01-02|  0.0|    0.0|  8.0| 6.25|        PR14|       NULL|        PR03|       NULL|            NULL|                 NULL|
|     P0001|   S0012|2017-01-02|  1.0|    5.3|  0.0| 6.25|        PR14|       NULL|        PR03|       NULL|            NULL|                 NULL|
|     P0001|   S0013|2017-01-02|  2.0|  10.59|  0.0| 6.25|        PR14|       NULL|        PR03|       NULL|            NULL|                 NULL|
|     P0001|   S0023|2017-01-02|  0.0|    0.0|  6.0| 6.25|        PR14|       NULL|        PR03|       NULL|    

## **Cuál fue el promedio de stock disponible en todas las tiendas durante el año 2017**

In [9]:
# Filtrar el año 2017
sales_2017 = df_sales.filter(F.year(F.col("date")) == 2017)

In [19]:
# Calcular el promedio de stock por tienda
avg_stock_per_store = sales_2017.groupBy("store_id").agg(
    F.avg("stock").alias("avg_stock")
)

In [20]:
# Calcular el promedio de stock por tienda
overall_avg_stock = avg_stock_per_store.agg(
    F.round(F.avg("avg_stock"), 2).alias("overall_avg_stock")
)

In [21]:
overall_avg_stock.show()



+-----------------+
|overall_avg_stock|
+-----------------+
|            15.85|
+-----------------+



                                                                                

## **Cuántas veces se aplicaron descuentos en el canal 2 durante el mes de diciembre de 2018**

In [22]:
# Filtrar el DataFrame para el mes de diciembre de 2018 y condiciones de descuentos
discounts_diciembre_2018 = df_sales.filter(
    (F.year(F.col("date")) == 2018) & 
    (F.month(F.col("date")) == 12) & 
    (F.col("promo_type_2").isNotNull()) & 
    (F.col("promo_discount_2").isNotNull())
)

In [23]:
# Contar cuántas veces se aplicaron descuentos
num_descuentos = discounts_diciembre_2018.count()

# Generar el texto con el resultado
texto_resultado = f"Se aplicaron descuentos en el canal 2 en diciembre 2018 {num_descuentos} veces"
print(texto_resultado)




Se aplicaron descuentos en el canal 2 en diciembre 2018 0 veces


                                                                                

## **Cuál fue el producto con el precio de venta más alto en cada tienda durante el tercer trimestre de 2019**

In [24]:
# Filtrar las ventas para el tercer trimestre de 2019
sales_q3_2019 = df_sales.filter(
    (F.year(F.col("date")) == 2019) &
    (F.month(F.col("date")).between(7, 9))
)

In [25]:
# Definir una ventana para ordenar
window = Window.partitionBy("store_id").orderBy(F.col("price").desc())

# Añadir una columna para el ranking 
sales_ranked = sales_q3_2019.withColumn("rank", F.row_number().over(window))

In [26]:
# Filtrar solo el producto con el precio más alto por tienda
top_product_per_store = sales_ranked.filter(F.col("rank") == 1).select("store_id", "product_id", "price")

top_product_per_store.show()



+--------+----------+------+
|store_id|product_id| price|
+--------+----------+------+
|   S0001|     P0498| 299.9|
|   S0002|     P0708|1599.0|
|   S0003|     P0498| 299.9|
|   S0004|     P0498| 299.9|
|   S0005|     P0498| 299.9|
|   S0006|     P0680|139.95|
|   S0007|     P0391| 179.9|
|   S0008|     P0498| 299.9|
|   S0009|     P0680|139.95|
|   S0010|     P0680|139.95|
|   S0011|     P0391| 179.9|
|   S0012|     P0708|1349.0|
|   S0013|     P0708|1499.0|
|   S0014|     P0391| 179.9|
|   S0015|     P0391| 179.9|
|   S0016|     P0498| 299.9|
|   S0017|     P0517|199.95|
|   S0018|     P0391| 179.9|
|   S0019|     P0680|139.95|
|   S0020|     P0474| 499.0|
+--------+----------+------+
only showing top 20 rows



                                                                                

## **Cuántas tiendas experimentaron un aumento de ventas del 20% o más en comparación con el año anterior**

In [29]:
# Filtrar los datos para los años 2017, 2018 y 2019
df_2017 = df_sales.filter(F.year(F.col("date")) == 2017)
df_2018 = df_sales.filter(F.year(F.col("date")) == 2018)
df_2019 = df_sales.filter(F.year(F.col("date")) == 2019)

In [30]:
# Agrupar por tienda y calcular las ventas totales por tienda para cada año
ventas_2017 = df_2017.groupBy("store_id").agg(F.sum("sales").alias("ventas_2017"))
ventas_2018 = df_2018.groupBy("store_id").agg(F.sum("sales").alias("ventas_2018"))
ventas_2019 = df_2019.groupBy("store_id").agg(F.sum("sales").alias("ventas_2019"))

In [33]:
# Hacer un left join entre los DataFrames 
ventas_comparadas = ventas_2017.join(ventas_2018, on="store_id", how="left") \
    .join(ventas_2019, on="store_id", how="left")

In [36]:
# Calcular el porcentaje de cambio de las ventas entre 2017 y 2018, y 2018 y 2019
ventas_comparadas = ventas_comparadas.withColumn(
    "cambio_2017_2018", 
    F.round((F.col("ventas_2018") - F.col("ventas_2017")) / F.col("ventas_2017") * 100, 2)
).withColumn(
    "cambio_2018_2019", 
    F.round((F.col("ventas_2019") - F.col("ventas_2018")) / F.col("ventas_2018") * 100, 2)
).withColumn(
    "ventas_2017", F.round(F.col("ventas_2017"), 2)
).withColumn(
    "ventas_2018", F.round(F.col("ventas_2018"), 2)
).withColumn(
    "ventas_2019", F.round(F.col("ventas_2019"), 2)
)

In [38]:
ventas_comparadas.show(5)



+--------+-----------+-----------+-----------+----------------+----------------+
|store_id|ventas_2017|ventas_2018|ventas_2019|cambio_2017_2018|cambio_2018_2019|
+--------+-----------+-----------+-----------+----------------+----------------+
|   S0049|   23754.11|   24944.66|   19910.79|            5.01|          -20.18|
|   S0010|   27075.43|   27598.37|   19261.51|            1.93|          -30.21|
|   S0030|     5153.0|     4267.0|     4552.0|          -17.19|            6.68|
|   S0120|    5090.52|     4957.0|     3971.0|           -2.62|          -19.89|
|   S0086|    4279.26|    4451.45|    4212.03|            4.02|           -5.38|
+--------+-----------+-----------+-----------+----------------+----------------+
only showing top 5 rows



                                                                                

In [39]:
# Filtrar las tiendas que tienen un aumento de ventas del 20%
tiendas_aumento_20_2017_2018 = ventas_comparadas.filter(F.col("cambio_2017_2018") >= 20)
tiendas_aumento_20_2018_2019 = ventas_comparadas.filter(F.col("cambio_2018_2019") >= 20)

In [40]:
# Contar el número de tiendas que cumplen con la condición
numero_tiendas_aumento_20_2017_2018 = tiendas_aumento_20_2017_2018.count()
numero_tiendas_aumento_20_2018_2019 = tiendas_aumento_20_2018_2019.count()

print(f"El número de tiendas que experimentaron un aumento de ventas del 20% o más entre 2017 y 2018 es: {numero_tiendas_aumento_20_2017_2018}")
print(f"El número de tiendas que experimentaron un aumento de ventas del 20% o más entre 2018 y 2019 es: {numero_tiendas_aumento_20_2018_2019}")



El número de tiendas que experimentaron un aumento de ventas del 20% o más entre 2017 y 2018 es: 24
El número de tiendas que experimentaron un aumento de ventas del 20% o más entre 2018 y 2019 es: 1


                                                                                

## **Cuál fue el producto que experimentó la mayor variación de precio a lo largo del periodo de 2017 a 2019**

In [41]:
# Filtrar los datos para los años especificados
df_sales_filtered = df_sales.filter((F.year("date") >= 2017) & (F.year("date") <= 2019))

In [42]:
# Agrupar por producto y calcular el precio máximo y mínimo
price_variation = df_sales_filtered.groupBy("product_id").agg(
    F.min("price").alias("min_price"),
    F.max("price").alias("max_price")
)

In [43]:
# Calcular la variación de precio
price_variation = price_variation.withColumn("price_variation", F.col("max_price") - F.col("min_price"))

In [46]:
# Ordenar por la variación de precio en orden descendente y obtener el producto con la mayor variación
max_price_variation_product = price_variation.orderBy(F.col("price_variation").desc()).limit(1)

# Mostrar el producto con la mayor variación de precio
max_price_variation_product.show()



+----------+---------+---------+---------------+
|product_id|min_price|max_price|price_variation|
+----------+---------+---------+---------------+
|     P0632|    549.9|    849.9|          300.0|
+----------+---------+---------+---------------+



                                                                                

## **En qué mes y año se registró la mayor cantidad de ingresos generados en todas las tiendas**

In [58]:
df_sales = df_sales.withColumn("year_month", F.date_format("date", "yyyy-MM"))

In [64]:
# Agrupar por 'year_month' y sumar las ventas
revenue_per_month = df_sales.groupBy("year_month").agg(F.sum("revenue").alias("total_revenue"))

In [66]:
# Ordenar ingresos
month_max_revenue = revenue_per_month.orderBy(F.col("total_revenue").desc()).first()

                                                                                

In [67]:
 print(f"El mes y año con mayor cantidad de ingresos fue {month_max_revenue['year_month']} con un total de ingresos de {month_max_revenue['total_revenue']}.")

El mes y año con mayor cantidad de ingresos fue 2019-08 con un total de ingresos de 2619132.638999991.


## **Cuál fue el tamaño promedio de las tiendas en cada ciudad durante el año 2018**

In [72]:
# Filtrar las ventas del año 2018 y obtener los datos únicos de tiendas
df_sales_2018_unique_stores = df_sales.filter(year(df_sales['date']) == 2018).select('store_id').distinct()

In [73]:
# Unir df_sales_2018 con df_store_cities
df_stores_with_size = df_sales_2018_unique_stores.join(df_store_cities, on="store_id", how="inner").select('store_id', 'city_id', 'store_size')

In [74]:
# Agrupar por city_id y calcular el tamaño promedio de las tiendas en cada ciudad
avg_store_size_per_city = df_stores_with_size.groupBy("city_id").agg(F.avg("store_size").alias("avg_store_size"))

In [81]:
avg_store_size_per_city_sorted = avg_store_size_per_city.orderBy("city_id")
avg_store_size_per_city_rounded = avg_store_size_per_city_sorted.withColumn("avg_store_size", F.round("avg_store_size", 2))
avg_store_size_per_city_rounded.show()

[Stage 162:>                                                        (0 + 1) / 1]

+-------+--------------+
|city_id|avg_store_size|
+-------+--------------+
|   C001|          20.0|
|   C002|          47.0|
|   C003|          13.0|
|   C004|          63.0|
|   C005|          19.0|
|   C006|         29.67|
|   C007|          39.0|
|   C008|         23.67|
|   C009|          20.0|
|   C010|          23.0|
|   C011|          31.0|
|   C012|          22.0|
|   C013|          13.5|
|   C014|         23.65|
|   C015|          42.0|
|   C016|          35.0|
|   C017|          34.0|
|   C018|          27.0|
|   C019|          24.5|
|   C020|          22.0|
+-------+--------------+
only showing top 20 rows



                                                                                