**Resolución Transitiva de Claves 1:N**

In [0]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window

In [0]:
spark = SparkSession.builder.getOrCreate()

In [0]:
data = [("A1", "202401", "100"),
        ("A2", "202401", "200")]

columns = ["sku_a", "mes", "venta"]

ventas = spark.createDataFrame(data, columns)
ventas.show()

+-----+------+-----+
|sku_a|   mes|venta|
+-----+------+-----+
|   A1|202401|  100|
|   A2|202401|  200|
+-----+------+-----+



In [0]:
data = [("A1", "B10"),
        ("A1", "B11"),
        ("A2", "B20")]

columns = ["sku_a", "sku_b"]

productos = spark.createDataFrame(data, columns)
productos.show()

+-----+-----+
|sku_a|sku_b|
+-----+-----+
|   A1|  B10|
|   A1|  B11|
|   A2|  B20|
+-----+-----+



In [0]:
data = [("B10", "202401", "90"),
        ("B11", "202401", "110"),
        ("B20", "202401", "210")]

columns = ["sku_b", "mes", "forecast"]

forecast = spark.createDataFrame(data, columns)
forecast.show()

+-----+------+--------+
|sku_b|   mes|forecast|
+-----+------+--------+
|  B10|202401|      90|
|  B11|202401|     110|
|  B20|202401|     210|
+-----+------+--------+



In [0]:
ventas = ventas.withColumn("id_mono", F.monotonically_increasing_id())
ventas.show()

+-----+------+-----+-------+
|sku_a|   mes|venta|id_mono|
+-----+------+-----+-------+
|   A1|202401|  100|      0|
|   A2|202401|  200|      1|
+-----+------+-----+-------+



In [0]:
ventas_transitiva = ventas.join(productos, "sku_a", how="left")
ventas_transitiva.show()

+-----+------+-----+-------+-----+
|sku_a|   mes|venta|id_mono|sku_b|
+-----+------+-----+-------+-----+
|   A1|202401|  100|      0|  B11|
|   A1|202401|  100|      0|  B10|
|   A2|202401|  200|      1|  B20|
+-----+------+-----+-------+-----+



In [0]:
joined_df = ventas_transitiva.join(forecast, ["sku_b", "mes"], how="left")
joined_df.show()

+-----+------+-----+-----+-------+--------+
|sku_b|   mes|sku_a|venta|id_mono|forecast|
+-----+------+-----+-----+-------+--------+
|  B11|202401|   A1|  100|      0|     110|
|  B10|202401|   A1|  100|      0|      90|
|  B20|202401|   A2|  200|      1|     210|
+-----+------+-----+-----+-------+--------+



In [0]:
grouped_df = joined_df.groupBy("id_mono", "sku_a", "mes", "venta") \
                    .agg(F.collect_list(F.struct("sku_b","forecast")).alias("opciones"))

grouped_df.show(truncate=False) 

+-------+-----+------+-----+-----------------------+
|id_mono|sku_a|mes   |venta|opciones               |
+-------+-----+------+-----+-----------------------+
|0      |A1   |202401|100  |[{B11, 110}, {B10, 90}]|
|1      |A2   |202401|200  |[{B20, 210}]           |
+-------+-----+------+-----+-----------------------+



In [0]:
grouped_df.printSchema()

root
 |-- id_mono: long (nullable = false)
 |-- sku_a: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- venta: string (nullable = true)
 |-- opciones: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- sku_b: string (nullable = true)
 |    |    |-- forecast: string (nullable = true)



**Supongamos regla:**<br>
_Elegir el forecast más cercano a la venta_<br>
> Para A1:<br>
Venta = 100<br>
Opciones = 90 y 110 <br>
Ambos están a distancia 10 (empate) <br>
Supongamos que elegimos el mayor.

In [0]:
expanded_df = grouped_df.selectExpr(
    "id_mono",
    "sku_a",
    "mes",
    "venta",
    "explode(opciones) as opcion"
)
expanded_df.show(truncate=False)

+-------+-----+------+-----+----------+
|id_mono|sku_a|mes   |venta|opcion    |
+-------+-----+------+-----+----------+
|0      |A1   |202401|100  |{B11, 110}|
|0      |A1   |202401|100  |{B10, 90} |
|1      |A2   |202401|200  |{B20, 210}|
+-------+-----+------+-----+----------+



In [0]:
expanded_df.printSchema()

root
 |-- id_mono: long (nullable = false)
 |-- sku_a: string (nullable = true)
 |-- mes: string (nullable = true)
 |-- venta: string (nullable = true)
 |-- opcion: struct (nullable = false)
 |    |-- sku_b: string (nullable = true)
 |    |-- forecast: string (nullable = true)



In [0]:
window = Window.partitionBy("id_mono") \
               .orderBy(F.abs(F.col("opcion.forecast").cast("int") - F.col("venta").cast("int")) \
                        ,F.col("opcion.forecast").cast("int").desc())

final_df = expanded_df.withColumn("rn", F.row_number().over(window)) \
                .filter("rn = 1") \
                .drop("rn", "id_mono")

final_df.show()

+-----+------+-----+----------+
|sku_a|   mes|venta|    opcion|
+-----+------+-----+----------+
|   A1|202401|  100|{B11, 110}|
|   A2|202401|  200|{B20, 210}|
+-----+------+-----+----------+



> ¿Qué resolvimos? <br>

- La apertura 1→N <br>
- La ambigüedad del mapeo <br>
- Sin duplicar métricas <br>
-  Conservando la granularidad original