# Consultas propuestas por el enunciado

#### Imports y definición de funciones

In [143]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext

import re

import warnings
# suprimir future warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [144]:
spark = SparkSession.builder\
    .master("local[*]") \
    .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [145]:
def parse_to_int(some_value):
    if some_value is None:
        return None
    try:
        return int(some_value)
    except ValueError:
        return None

### 1. ¿Cuál es el Estado que más descuentos tiene en total? ¿y en promedio?

#### Hipótesis
- Los estados con más descuentos son aquellos que tienen más órdenes registradas con un `discount_amount` no nulo y mayor a ceros.
- Se tienen en cuenta los estados de la columna `billing_address` de la tabla `orders`.
- Se considera que el estado está definido por un conjunto de dos letras mayúsculas seguido de un espacio y cinco dígitos (código postal).
- No se consideran los estados que no están definidos por el anterior patrón, ni tampoco las filas con un valor nulo en `billing_address`.

#### Limpieza de datos para la tabla de órdenes

In [146]:
status_dict = {}
with open("status.txt", "r") as f:
    valid_statuses = [line.strip() for line in f.readlines()]
    id = 0
    for status in valid_statuses:
        status_dict[status] = id
        id += 1

statuses = sc.broadcast(status_dict)

In [147]:
state_dict = {}
with open("states.txt", "r") as f:
    valid_states = [line.strip() for line in f.readlines()]
    id = 0
    for state in valid_states:
        state_dict[state] = id
        id += 1

states = sc.broadcast(state_dict)

state_id_to_name = {v: k for k, v in state_dict.items()}

In [148]:
payment_dict = {}
with open("payment_methods.txt", "r") as f:
    valid_payments = [line.strip() for line in f.readlines()]
    id = 0
    for payment in valid_payments:
        payment_dict[payment] = id
        id += 1

payments = sc.broadcast(payment_dict)

payments_id_to_name = sc.broadcast({v: k for k, v in payment_dict.items()})

In [149]:
def state_and_postal_code(address):
    if address is None:
        return "UNDEFINED", None
    pattern = r'([A-Z]{2})\s(\d{5})'
    match = re.search(pattern, address)
    if match:
        return match.group(1), int(match.group(2))
    return "UNDEFINED", None

def retain_orders_columns(row: Row):
    state_str, postal_code = state_and_postal_code(row.billing_address)
    status_str = "UNDEFINED" if row.status is None else row.status.strip().upper()
    payment_str = "UNDEFINED" if row.payment_method is None else row.payment_method.strip().upper()
    return (
        parse_to_int(row.customer_id),
        row.discount_amount,
        statuses.value[status_str],
        states.value[state_str],
        postal_code,
        payments.value[payment_str]
    )
    
ordersIdx = {
    "customer_id": 0,
    "discount_amount": 1,
    "status": 2,
    "state": 3,
    "postal_code": 4,
    "payment_method": 5,
}

In [150]:
orders = sqlContext.read.csv(
    'data/orders.csv',
    header=True, inferSchema=True
)
ordersRDD = orders.rdd.map(retain_orders_columns).cache()

                                                                                

#### Resolución

In [151]:
discountAndTotalOrdersByState = ordersRDD \
    .filter(lambda x: x[ordersIdx["state"]] != states.value["UNDEFINED"]) \
    .map(
        lambda row: (
            row[ordersIdx["state"]], 
            (1 if (row[ordersIdx["discount_amount"]] is not None and row[ordersIdx["discount_amount"]] > 0) else 0, 1)
        )) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
    .cache()
# (estado, (#ordenes con descuentos, #ordenes totales))

In [152]:
highestCount = discountAndTotalOrdersByState\
    .reduce(lambda a, b: a if a[1][0] > b[1][0] else b)

highestAvg = discountAndTotalOrdersByState\
    .reduce(lambda a, b: a if a[1][0]/a[1][1] > b[1][0]/b[1][1] else b)

25/10/06 15:46:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_id, customer_id, order_date, status, payment_method, shipping_address, billing_address, discount_amount, tax_amount, shipping_cost, total_amount, currency, created_at, updated_at, subtotal
 Schema: _c0, order_id, customer_id, order_date, status, payment_method, shipping_address, billing_address, discount_amount, tax_amount, shipping_cost, total_amount, currency, created_at, updated_at, subtotal
Expected: _c0 but found: 
CSV file: file:///home/pat/Documents/GitHub/datos-tp2/data/orders.csv
                                                                                

In [153]:
most_discounted_state = state_id_to_name[highestCount[0]]
print(f"El Estado con más órdenes con descuentos es {most_discounted_state} con {highestCount[1][0]} órdenes con descuentos.")

highest_avg_state = state_id_to_name[highestAvg[0]]
print(f"\nEl Estado con el mayor promedio de descuentos es {highest_avg_state} con un promedio de {highestAvg[1][0]/highestAvg[1][1]:.2f}.")

El Estado con más órdenes con descuentos es AE con 30858 órdenes con descuentos.

El Estado con el mayor promedio de descuentos es KY con un promedio de 0.22.


### 2. ¿Cuáles son los 5 códigos postales más comunes para las órdenes con estado 'Refunded'? 
###    ¿Y cuál es el nombre más frecuente entre los clientes de esas direcciones?

#### Hipótesis
- Se tienen en cuenta los códgios postales de la columna `billing_address` de la tabla `orders`.
- Se considera que el código postal está definido por cinco dígitos precedido de un espacio y un conjunto de dos letras mayúsculas (estado).
- Se buscan los códigos postales con más apariciones entre las órdenes con estado `Refunded`
- Las órdenes con estado `Refunded` son todas aquellas que tienen como valor de la columna `status` el string `REFUNDED`, sin diferenciar mayúsculas de minúsculas.
- Para la búsqueda de nombres, se consideran a todos los usuarios de la tabla `customers` que tienen como valor de la columna `postal_code` alguno de los códigos postales encontrados.

#### Limpieza de datos para la tabla de usuarios

In [154]:
segments_dict = {}
with open("segments.txt", "r") as f:
    valid_segments = [line.strip() for line in f.readlines()]
    id = 0
    for segments in valid_segments:
        segments_dict[segments] = id
        id += 1

segments = sc.broadcast(segments_dict)

segment_id_to_name = sc.broadcast({v: k for k, v in segments_dict.items()})

In [155]:
def retain_customers_columns(row: Row):
    first_name = "UNDEFINED" if row.first_name is None else row.first_name.strip().upper()
    segment_str = "UNDEFINED" if row.customer_segment is None else row.customer_segment.strip().upper()
    is_active = False if row.is_active is None else row.is_active
    marketing_consent = False if row.marketing_consent is None else row.marketing_consent
    return (
        parse_to_int(row.customer_id),
        first_name,
        parse_to_int(row.postal_code),
        segments.value[segment_str],
        is_active,
        marketing_consent,
    )
    
customersIdx = {
    "id": 0,
    "name": 1,
    "postal_code": 2,
    "segment": 3,
    "is_active": 4,
    "consent": 5,
}

In [156]:
customers = sqlContext.read.csv(
    'data/customers.csv',
    header=True, inferSchema=True
)
customersRDD = customers.rdd.map(retain_customers_columns).cache()

                                                                                

#### Resolución

In [157]:
zipCodesWithMostRefundedOrdersCount = ordersRDD \
    .filter(
        lambda row: (
            row[ordersIdx["postal_code"]] is not None
            and row[ordersIdx["status"]] == statuses.value["REFUNDED"]
        )
    ) \
    .map(lambda row: (row[ordersIdx["postal_code"]], 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .takeOrdered(5, key=lambda x: -x[1])

topRefundedZipCodes = [postal_code for postal_code, _ in zipCodesWithMostRefundedOrdersCount]

                                                                                

In [158]:
mostFrecuentNameInTopRefundedZipCodes = customersRDD \
    .filter(
        lambda row: (
            row[customersIdx["postal_code"]] in topRefundedZipCodes
            and row[customersIdx["name"]] != "UNDEFINED"
        )
    ) \
    .map(lambda row: (row[customersIdx["name"]], 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .reduce(lambda a, b: a if a[1] > b[1] else b)

25/10/06 15:47:06 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , customer_id, email, first_name, last_name, phone, date_of_birth, gender, country, city, postal_code, address, registration_date, last_login, is_active, customer_segment, marketing_consent
 Schema: _c0, customer_id, email, first_name, last_name, phone, date_of_birth, gender, country, city, postal_code, address, registration_date, last_login, is_active, customer_segment, marketing_consent
Expected: _c0 but found: 
CSV file: file:///home/pat/Documents/GitHub/datos-tp2/data/customers.csv
                                                                                

In [159]:
print("Los 5 códigos postales con más órdenes reembolsadas son:")
print("\tCodigo\tÓrdenes reembolsadas")
for postal_code, count in zipCodesWithMostRefundedOrdersCount:
    print(f"\t{postal_code}\t{count}")

print(f"\nEl nombre más frecuente entre los clientes de esos códigos postales es: {mostFrecuentNameInTopRefundedZipCodes[0]}")

Los 5 códigos postales con más órdenes reembolsadas son:
	Codigo	Órdenes reembolsadas
	31571	6
	14396	5
	9045	5
	38151	5
	91623	5

El nombre más frecuente entre los clientes de esos códigos postales es: MICHAEL


### 3. Para cada tipo de pago y segmento de cliente, <br>devolver la suma y el promedio expresado como porcentaje <br>de clientes activos y de consentimiento de marketing

#### Hipótesis
- Se consideran valores únicos por combinación (usuario, método de pago). De esta forma no contamos dos compras del mismo usuario con el mismo método de pago.

#### Resolución

In [160]:
clients_formated = customersRDD.map(lambda row: (row[customersIdx["id"]], (row[customersIdx["segment"]], row[customersIdx["is_active"]], row[customersIdx["consent"]])))
orders_formated = ordersRDD.map(lambda row: (row[ordersIdx["customer_id"]], row[ordersIdx["payment_method"]]))

clients_orders = clients_formated.join(orders_formated)\
    .map(
        lambda x: (
            (x[0], x[1][1]), # KEY = (customer_id, payment_method)
            (x[1][0][1], x[1][0][2], x[1][0][0])     # VALUE = (is_active, consent, segment)
        )
    )
# (customer_id, ((segment, is_active, consent), payment_method))

# Me quedo con filas únicas por combinación de método de pago, customer_id
# para no contar dos veces a un mismo cliente que hizo varias órdenes con el mismo método de pago
# para un mismo customer_id, el segment is_active y consent siempre son los mismos
clients_orders_unique = clients_orders.reduceByKey(lambda a, _: a)

result = clients_orders_unique.map(
        lambda x: (
            (x[0][1], x[1][2]), # KEY = (payment_method, segment)
            (1 if x[1][0] else 0, 1 if x[1][1] else 0, 1)     # VALUE = (is_active, consent, count)
        )
    ).reduceByKey(
        lambda a, b: (
            a[0] + b[0], 
            a[1] + b[1], 
            a[2] + b[2]
        )
    ).map(
        lambda x: Row(
            payment_method=payments_id_to_name.value[x[0][0]],
            customer_segment=segment_id_to_name.value[x[0][1]],
            active_count=x[1][0],
            consent_count=x[1][1],
            active_percentage=float(f"{x[1][0]/x[1][2] * 100:.2f}"), # función round() falla por alguna razón
            consent_percentage=float(f"{x[1][1]/x[1][2] * 100:.2f}"),
        )
    )


In [161]:
df_result = spark.createDataFrame(result)
df_result.show()

                                                                                

+----------------+----------------+------------+-------------+-----------------+------------------+
|  payment_method|customer_segment|active_count|consent_count|active_percentage|consent_percentage|
+----------------+----------------+------------+-------------+-----------------+------------------+
|   BANK TRANSFER|          BUDGET|       16041|        12451|            89.78|             69.69|
|     CREDIT CARD|         PREMIUM|       16434|        12807|            89.87|             70.04|
|      DEBIT CARD|         REGULAR|       49130|        38295|            89.97|             70.13|
|      DEBIT CARD|          BUDGET|       16034|        12441|            89.78|             69.66|
|CASH ON DELIVERY|         REGULAR|       49140|        38307|            89.97|             70.13|
|       UNDEFINED|          BUDGET|       15806|        12263|            89.82|             69.68|
|  DIGITAL WALLET|       UNDEFINED|        8231|         6365|            89.97|             69.57|


### 4. Para los productos que contienen en su descripción la palabra "stuff", <br> calcular el peso total de su inventario agrupado por marca, <br> mostrar sólo la marca y el peso total de las 5 más pesadas

#### Hipótesis
- Los productos que tienen un valor nulo en el peso no son considerados (peso nulo = 0kg).
- No se consideran los productos sin una marca definida

#### Limpieza de datos para la tabla de productos

In [162]:
def retain_products_columns(row: Row):
    brand = "UNDEFINED" if row.brand is None else row.brand.strip().upper()
    weight = 0.0 if row.weight_kg is None else row.weight_kg
    stock = 0 if row.stock_quantity is None else row.stock_quantity
    is_stuff = False if row.description is None else ("STUFF" in row.description.upper())
    return (
        brand,
        weight,
        stock,
        is_stuff,
    )
    
productsIdx = {
    "brand": 0,
    "weight": 1,
    "stock": 2,
    "is_stuff": 3,
}

In [163]:
products = sqlContext.read.csv(
    'data/products.csv',
    header=True, inferSchema=True
)
productsRDD = products.rdd.map(retain_products_columns).cache()

                                                                                

#### Resolución

In [164]:
stuff_products_weight_by_brand = productsRDD \
    .filter(lambda x: x[productsIdx["is_stuff"]] and x[productsIdx["brand"]] != "UNDEFINED") \
    .map(lambda x: (x[productsIdx["brand"]], x[productsIdx["weight"]])) \
    .reduceByKey(lambda a, b: a + b)
    
heaviest_brands = stuff_products_weight_by_brand.takeOrdered(5, key=lambda x: -x[1])

25/10/06 15:47:22 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , product_id, product_name, category_id, brand, price, cost, stock_quantity, weight_kg, dimensions, description, is_active, created_at
 Schema: _c0, product_id, product_name, category_id, brand, price, cost, stock_quantity, weight_kg, dimensions, description, is_active, created_at
Expected: _c0 but found: 
CSV file: file:///home/pat/Documents/GitHub/datos-tp2/data/products.csv
                                                                                

In [165]:
print("Las 5 marcas con mayor peso total en productos cuya descripción contiene 'Stuff' son:")
for brand in heaviest_brands:
    print(f"Marca: {brand[0]}, Peso total: {brand[1]}")

Las 5 marcas con mayor peso total en productos cuya descripción contiene 'Stuff' son:
Marca: 3M, Peso total: 4250.86
Marca: WAYFAIR, Peso total: 4080.17
Marca: ADIDAS, Peso total: 4057.34
Marca: NIKE, Peso total: 3614.96
Marca: HASBRO, Peso total: 3338.5799999999995


### 5. Calcular el porcentaje de productos cuyo stock es al menos 20% <br> más alto que el stock promedio de su marca

#### Hipótesis
- Solo se consideran los productos con una marca definida

#### Resolución

In [None]:
avg_stock_by_brand = productsRDD \
    .filter(lambda x: x[productsIdx["brand"]] != "UNDEFINED") \
    .map(lambda x: (x[productsIdx["brand"]], (x[productsIdx["stock"]], 1))) \
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
    .mapValues(lambda x: x[0]/x[1])

high_stock_prods_and_total = productsRDD\
    .map(lambda x: (x[productsIdx["brand"]], x[productsIdx["stock"]])) \
    .join(avg_stock_by_brand) \
    .map(lambda x: ( # (brand, (stock, avg_stock))
        1 if x[1][0] > x[1][1]*1.2 else 0,  # productos con stock > 20% del promedio
        1
    ))\
    .reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))

result = high_stock_prods_and_total[0] / high_stock_prods_and_total[1] * 100

                                                                                

In [176]:
print(f"El porcentaje de productos con stock mayor al 20% del promedio de su marca es: {result:.2f}%")

El porcentaje de productos con stock mayor al 20% del promedio de su marca es: 36.05%


### 6. Obtener la cantidad de órdenes que no hayan comprado ninguno de los 10 productos más vendidos

#### Hipótesis
- Los productos más vendidos son aquellos que tienen mayor cantidad vendida (`quantity`) entre todas las órdenes que aperece

#### Limpieza y preparación de columnas de items

In [168]:
def retain_items_columns(row: Row):
    quantity = 0 if row.quantity is None else row.quantity
    return (
        parse_to_int(row.order_id),
        parse_to_int(row.product_id),
        quantity,
    )
    
itemsIdx = {
    "order_id": 0,
    "product_id": 1,
    "quantity": 2,
}

In [169]:
items = sqlContext.read.csv(
    'data/order_items.csv',
    header=True, inferSchema=True
)
itemsRDD = items.rdd.map(retain_items_columns).cache()

#### Resolución

In [170]:
top_products_counts = itemsRDD \
    .map(lambda x: (x[itemsIdx["product_id"]], x[itemsIdx["quantity"]])) \
    .reduceByKey(lambda a, b: a + b) \
    .takeOrdered(10, key=lambda x: -x[1])
top_products_ids = [product_id for product_id, _ in top_products_counts] # son solo 10 elementos

not_top_products_orders = itemsRDD \
    .map(lambda x: (x[itemsIdx["order_id"]], True if x[itemsIdx["product_id"]] in top_products_ids else False)) \
    .reduceByKey(lambda a, b: a or b) \
    .filter(lambda x: not x[1]) \
    .count()


25/10/06 15:47:32 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , order_item_id, order_id, product_id, quantity, unit_price, line_total, discount_amount
 Schema: _c0, order_item_id, order_id, product_id, quantity, unit_price, line_total, discount_amount
Expected: _c0 but found: 
CSV file: file:///home/pat/Documents/GitHub/datos-tp2/data/order_items.csv
                                                                                

In [171]:
print(f"La cantidad de órdenes que no contienen ninguno de los 10 productos más vendidos es: {not_top_products_orders}")

La cantidad de órdenes que no contienen ninguno de los 10 productos más vendidos es: 99507
