## Query 8

Productos desastrosos post venta. Contar la cantidad de productos desastrosos (>30% unidades retornadas) y calcular por marca el porcentaje de productos desastrosos sobre el total de productos distintos vendidos de esa marca. Mostrar las marcas con más productos desastrosos en orden descendente.

Para que la query funcione correctamente es necesario que "orders" y "order_items" se puedan joinear.

Como el primero tiene ids desde 1 a 4700000 y el segundo desde 19900399 a 20000000 podemos restarle 19900398 a los ids de order_items para que al menos matcheen.

- 1 -> 19900399
- 2 -> 19900400
- ...

1. Importar el dataset de ordenes (data/raw/orders.csv)
2. Filtrar las ordenes con datos inválidos (order_id, status) y quedarnos solo con aquellas con status "RETURNED".
3. Mapear a (order_id, 1) para usar en el join.
4. Importar el dataset de order_items (data/raw/order_items.csv)
5. Filtrar los items con datos inválidos (order_id, product_id, quantity)
6. Normalizar order_id (aplicar offset) y mapear a pares (order_id, (product_id, quantity))
7. Hacer leftOuterJoin entre items y orders_returned por order_id (items como base) para que los no retornados aporten 0 al numerador.
8. Mapear a pares (product_id, (returned_quantity, total_quantity)).
9. Agrupar por product_id y sumar las cantidades retornadas y totales.
10. Convertir en (product_id, (1, is_disastrous)) donde is_disastrous es 1 si returned/total > 0.3.
11. Importar el dataset de products (data/raw/products.csv).
12. Filtrar los productos con datos inválidos (product_id, brand).
13. Mapear a pares (product_id, brand).
14. Hacer join entre ambos datasets por product_id.
15. Mapear a pares (brand, (1, is_disastrous)).
16. Agrupar por brand y sumar las cantidades de productos y desastrosos.
17. Calcular el porcentaje de productos desastrosos por marca.
18. Ordenar las marcas por porcentaje de productos desastrosos en orden descendente.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ProductosDesastrososPostVenta").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [2]:
RETURN_STATUS = "RETURNED"
OFFSET = 19900398
DISASTER_THRESHOLD = 0.3

In [None]:
orders_df = spark.read.csv("../../data/raw/orders.csv", header=True, inferSchema=True)
orders_rdd = orders_df.rdd

orders_returned = orders_rdd.filter(
    lambda r: r.order_id and r.status and str(r.status).strip().upper() == RETURN_STATUS
).map(
    lambda r: (r.order_id, 1)
)  # (order_id: int, 1)

In [4]:
items_df = spark.read.csv(
    "../../data/raw/order_items.csv", header=True, inferSchema=True
)
items_rdd = items_df.rdd

items_clean = (
    items_rdd.filter(lambda r: r.order_id and r.product_id and r.quantity is not None)
    .map(
        lambda r: (
            r.order_id - OFFSET if r.order_id > OFFSET else r.order_id,
            (r.product_id, r.quantity),
        )
    )  # (order_id, (product_id, quantity))
    .filter(lambda kv: kv[0] > 0)
)

In [5]:
joined_items = items_clean.leftOuterJoin(
    orders_returned
)  # (order_id, ((product_id, qty), returned_or_None))

by_product_qties = joined_items.map(
    lambda kv: (
        kv[1][0][0],
        (kv[1][0][1] if kv[1][1] else 0, kv[1][0][1]),
    )  # (product_id, (returned_qty, total_qty))
)

agg_by_product = by_product_qties.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)  # (product_id, (total_returned, total_qty))

product_disastrous = agg_by_product.map(
    lambda kv: (
        kv[0],
        (1, 1 if kv[1][1] > 0 and (kv[1][0] / kv[1][1]) > DISASTER_THRESHOLD else 0),
    )  # (product_id: int, (1, is_disastrous))
)

In [None]:
products_df = spark.read.csv(
    "../../data/raw/products.csv", header=True, inferSchema=True
)
products_rdd = products_df.rdd

products_clean = products_rdd.filter(lambda r: r.product_id and r.brand).map(
    lambda r: (int(r.product_id), r.brand.strip().upper())
)  # (product_id, brand)

joined_brand = product_disastrous.join(
    products_clean
)  # (product_id, ((1, is_disastrous), brand))
by_brand = joined_brand.map(
    lambda kv: (kv[1][1], kv[1][0])
)  # (brand, (1, is_disastrous))
agg_by_brand = by_brand.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)  # (brand, (total_products, total_disastrous))

results = agg_by_brand.map(
    lambda kv: {
        "marca": kv[0],
        "productos_totales": kv[1][0],
        "productos_desastrosos": kv[1][1],
        "porcentaje_desastrosos": (
            round(kv[1][1] / kv[1][0], 4) if kv[1][0] > 0 else 0.0
        ),
    }
).collect()

In [7]:
import pandas as pd

if results:
    df = pd.DataFrame(results)
    df["porcentaje_desastrosos_%"] = df["porcentaje_desastrosos"].map(
        lambda x: f"{x*100:.2f}%"
    )
    df = df.sort_values(
        by=["porcentaje_desastrosos", "productos_desastrosos", "marca"],
        ascending=[False, False, True],
    )
    df = df[
        [
            "marca",
            "productos_totales",
            "productos_desastrosos",
            "porcentaje_desastrosos_%",
        ]
    ]
    display(df)

Unnamed: 0,marca,productos_totales,productos_desastrosos,porcentaje_desastrosos_%
20,ASHLEY FURNITURE,378,17,4.50%
138,PENGUIN,414,17,4.11%
43,BLACK+DECKER,546,22,4.03%
29,LE CREUSET,550,22,4.00%
119,ETSY SHOPS,830,33,3.98%
...,...,...,...,...
150,PAPER MATE,481,6,1.25%
109,STAPLES,507,6,1.18%
14,SHIMANO,449,5,1.11%
120,BOSE,395,4,1.01%
