<a href="https://colab.research.google.com/github/robertoarturomc/ProgramacionConcurrente/blob/main/27_Joins_con_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Programación Concurrente
## Actividad: Joins con Spark
#### Jose Manuel Mora Balderas - 174707


In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp, year, max, sum

In [64]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

Un Join es una operación que permite combinar datos de dos o más tablas en una sola, basándose en una relación entre ellas. Usualmente, esta relación se establece a través de una columna en común entre las tablas.

El Join permite acceder y manipular datos que están distribuidos en varias tablas de manera conjunta, obteniendo así resultados más complejos y completos.

Vamos a leer tres tablas; estas se encuentran en este [enlace de Kaggle](https://www.kaggle.com/datasets/svbstan/sales-product-and-customer-insight-repository)

In [65]:
from google.colab import drive
drive.mount('/content/drive')

customer = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/Joins con Spark/customer_profile_dataset.csv', header=True)
customer.show(5)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
+-----------+----------+---------+------+-------------------+--------------------+------------+-------------------+-------------+-----------+-----+--------+
|customer_id|first_name|last_name|gender|      date_of_birth|               email|phone_number|        signup_date|      address|       city|state|zip_code|
+-----------+----------+---------+------+-------------------+--------------------+------------+-------------------+-------------+-----------+-----+--------+
|          1|    Robert|    Smith|Female|1994-06-14 21:40:27|jane.davis1@mail.com|634-106-4981|2016-10-16 17:23:25| 8465 Main St|San Antonio|   CA|   35566|
|          2|     Emily|   Garcia|Female|1989-09-21 17:56:31|robert.williams2@...|386-635-5998|2021-04-04 14:24:06|  305 Main St|   New York|   AZ|   23187|
|          3|   Jessica|    Brown|  Male|1984-01-21 21:43:13|emily.davis3@mail...|627-

In [66]:
products = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/Joins con Spark/products_dataset.csv', header=True)
products.show(5)

+----------+------------+--------+--------------+------+--------------------+
|product_id|product_name|category|price_per_unit| brand| product_description|
+----------+------------+--------+--------------+------+--------------------+
|         1|      Butter|   Dairy|         28.58|BrandB|Description for Rice|
|         2|      Butter|   Meats|         22.66|BrandB|Description for B...|
|         3|        Milk|   Meats|         26.52|BrandE|Description for B...|
|         4|      Banana|  Grains|         26.12|BrandB|Description for A...|
|         5|        Rice|  Fruits|         21.94|BrandD|Description for B...|
+----------+------------+--------+--------------+------+--------------------+
only showing top 5 rows



In [67]:
purchase = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/Joins con Spark/purchase_history_dataset.csv', header=True)
purchase.show(5)

+-----------+-----------+----------+-------------------+--------+------------------+
|purchase_id|customer_id|product_id|      purchase_date|quantity|      total_amount|
+-----------+-----------+----------+-------------------+--------+------------------+
|          1|          1|        42|2018-04-15 14:08:01|       3| 37.64207365077783|
|          2|          1|       138|2022-07-10 23:33:47|       4| 70.24710587172727|
|          3|          1|       403|2021-12-31 03:53:33|       3| 89.16889585975464|
|          4|          1|       193|2017-01-14 01:25:11|       2| 59.70505931112876|
|          5|          1|        26|2018-04-06 11:01:06|       3|101.77886387225126|
+-----------+-----------+----------+-------------------+--------+------------------+
only showing top 5 rows



Ejercicios; contesta desarrollando el correspondiente código en Pyspark:

1. Responde, ¿cuántos clientes llamados "Robert" (nota cómo hay *Males* y *Females*), compraron algún producto lácteo (Dairy) en 2022 ?

2. Eres empleado de *BrandB*. ¿En cuáles ciudades has vendido una mayor cantidad? (total_amount)

3. ¿De cuánto es la mayor cantidad (quantity) que ha sido comprado por algún hombre O cuyo producto sea pan (Bread)?

In [68]:
# 1. ¿Cuántos clientes llamados "Robert" (nota cómo hay *Males* y *Females*), compraron algún producto lácteo (Dairy) en 2022 ?

robert_dairy_2022 = purchase.join(customer, purchase.customer_id == customer.customer_id, 'inner') \
    .join(products, purchase.product_id == products.product_id, 'inner') \
    .filter((col("first_name") == "Robert") & (col("category") == "Dairy") & (year(col("purchase_date")) == 2022))

print("Número de clientes llamados 'Robert' que compraron productos lácteos en 2022:", robert_dairy_2022.count())


Número de clientes llamados 'Robert' que compraron productos lácteos en 2022: 44


In [70]:
# 2. Eres empleado de BrandB. ¿En cuáles ciudades has vendido una mayor cantidad? (total_amount)

ciudades_mayor_cantidad= purchase.join(customer, purchase.customer_id == customer.customer_id, 'inner') \
    .groupBy("city") \
    .agg(sum("total_amount").alias("total_sales")) \
    .orderBy(col("total_sales").desc())

print("\nCiudades con mayor cantidad de ventas (total_amount) para BrandB:")
ciudades_mayor_cantidad.show(3)


Ciudades con mayor cantidad de ventas (total_amount) para BrandB:
+--------+------------------+
|    city|       total_sales|
+--------+------------------+
| Chicago| 132182.1816838523|
|New York|118338.72282221109|
| Phoenix|116973.36827142647|
+--------+------------------+
only showing top 3 rows



In [71]:
# 3. ¿De cuánto es la mayor cantidad (quantity) que ha sido comprado por algún hombre O cuyo producto sea pan (Bread)?

max_cantidad_hombre_o_pan = purchase.join(customer, purchase.customer_id == customer.customer_id, 'inner') \
    .join(products, purchase.product_id == products.product_id, 'inner') \
    .filter((col("gender") == "Male") | (col("product_name") == "Bread")) \
    .agg(max("quantity"))

print("\nMayor cantidad (quantity) comprada por un hombre o cuyo producto sea pan:", max_cantidad_hombre_o_pan.collect()[0][0])


Mayor cantidad (quantity) comprada por un hombre o cuyo producto sea pan: 5
