# Procesamiento y Análisis de Datos con Apache Spark

En este proyecto, se utiliza Apache Spark para resolver una serie de tareas de análisis de datos relacionadas con una empresa global del sector retail, que tiene tanto tiendas físicas como ventas online.

El análisis se implementa utilizando [Apache Spark][Apache Spark], un motor de procesamiento de datos distribuido y de alto rendimiento, particularmente mediante [PySpark][PySpark], la API de Apache Spark para el entorno de Python.

<div align="center">
<img src="../assets/logos/tools_apache_spark.svg" height="225" width="225"/>
</div>

[Apache Spark]: https://spark.apache.org
[PySpark]: https://spark.apache.org/docs/latest/api/python/index.html

## Descripción de la tarea

Habéis sido contratados por una empresa perteneciente al sector del Retail.

Es una empresa con presencia a nivel mundial con sede en España. Tiene tanto tiendas físicas, como venta on-line.

Todos los días recibe un archivo llamado purchases.json con compras realizadas en todo el mundo.

Cada línea del fichero es una compra de una unidad del producto correspondiente.

<div align="center">
<img src="../assets/images/purchases.png" height="600" width="600"/>
</div>

La plataforma logística envía todos los días un archivo stock.csv con el stock de cada producto:

<div align="center">
<img src="../assets/images/stocks.png" height="600" width="600"/>
</div>

**IMPORTANTE**

Los datos se han generado de forma aleatoria.

## Cargando Spark

Primeramente, importamos las librerías necesarias para ejecutar nuestro código, principalmente aquellas relacionadas con la API de PySpark.

In [1]:
# Importing libraries

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

Luego, inicializamos una sesión de Spark utilizando la API de PySpark. La función `SparkSession.builder` se usa para configurar y crear una nueva instancia de Spark.

El parámetro `appName` establece el nombre de la aplicación, que será útil para identificar la sesión de Spark en el monitor de Spark. Finalmente, el método `getOrCreate` se asegura de que se obtenga una sesión existente si ya está en ejecución, o se cree una nueva si no existe.

Esta sesión de Spark será el punto de entrada para trabajar con los datos y ejecutar las operaciones de análisis de datos.

In [22]:
spark = (
    SparkSession.builder
    .appName("RetailAnalytics")
    .getOrCreate()
)

spark

## Cargando la data

In [3]:
# Setting parameters, data file paths
PURCHASES_DATA_PATH = "../data/purchases.json"
STOCKS_DATA_PATH = "../data/stocks.csv"

# Loading the purchases json into a dataframe
purchases_df = spark.read.json(PURCHASES_DATA_PATH)
# Loading the stocks csv into a dataframe
stock_df = spark.read.csv(STOCKS_DATA_PATH, header=True, inferSchema=True)

In [4]:
purchases_df.show(5)

+---------+--------------------+------------+-------+----------+-------+-----+
|item_type|            location|payment_type|  price|product_id|shop_id|  way|
+---------+--------------------+------------+-------+----------+-------+-----+
|     shoe|   {7.0814, 56.5147}|      paypal|58.6024|        12|     36|store|
|    shirt| {20.7594, 169.8936}|        card|29.6226|        14|     15|store|
|     shoe| {93.5029, 159.9378}|        cash|33.0585|        11|     37|store|
|     shoe|{59.1923, -178.1721}|      paypal|97.9282|        81|     90|store|
|     jean| {62.8714, 128.6758}|      paypal| 1.9822|        14|     61|  web|
+---------+--------------------+------------+-------+----------+-------+-----+
only showing top 5 rows



In [5]:
stock_df.show(5)

+----------+--------+
|product_id|quantity|
+----------+--------+
|         1|      34|
|         2|      97|
|         3|      70|
|         4|      66|
|         5|      75|
+----------+--------+
only showing top 5 rows



## Solución de la tarea

Debéis crear un programa Spark 2.x utilizando el lenguaje Python y resolver las siguientes tareas (usando tanto del DataFrame API como Spark SQL):

### 1. Los 10 productos más comprados.

#### Spark DataFrame:

In [None]:
## TODO : Solve this using Spark

#### SQL:

Primeramente, debemos crear una vista temporal:

In [6]:
purchases_df.createOrReplaceTempView("purchases")

Una vez hecho esto, ejecutamos la consulta en SQL nativo:

In [7]:
spark.sql("""
    SELECT product_id, COUNT(*) as count
    FROM purchases
    GROUP BY product_id
    ORDER BY count DESC
    LIMIT 10
""").show()

+----------+-----+
|product_id|count|
+----------+-----+
|        64|   50|
|        81|   45|
|         3|   44|
|        61|   43|
|        31|   43|
|        36|   43|
|        60|   43|
|        69|   42|
|        30|   42|
|         7|   41|
+----------+-----+



### 2. Porcentaje de compra de cada tipo de producto (`item_type`).

#### Spark DataFrame:

In [None]:
## TODO : Solve this using Spark

#### SQL:

In [8]:
spark.sql("""
    SELECT item_type,
           (COUNT(*) / (SELECT COUNT(*) FROM purchases)) * 100 as percentage
    FROM purchases
    GROUP BY item_type
    ORDER BY percentage DESC
""").show()

+---------+------------------+
|item_type|        percentage|
+---------+------------------+
|     shoe| 20.87978306718891|
|     jean| 20.75926483880687|
|    shirt|19.584212112081953|
|  trouser| 19.55408255498644|
|   jacket|19.222657426935825|
+---------+------------------+



### 3. Obtener los 3 productos más comprados por cada tipo de producto.

#### Spark DataFrame:

In [None]:
## TODO : Solve this using Spark

#### Spark SQL Nativo:

In [9]:
spark.sql("""
    WITH product_counts AS (
        SELECT
            product_id,
            item_type,
            COUNT(*) AS purchase_count
        FROM
            purchases
        GROUP BY
            item_type, product_id
    ),
    ranked_products AS (
        SELECT
            product_id,
            item_type,
            purchase_count,
            ROW_NUMBER() OVER (PARTITION BY item_type ORDER BY purchase_count DESC) AS rank
        FROM
            product_counts
    )
    SELECT
        product_id,
        item_type,
        purchase_count,
        rank
    FROM
        ranked_products
    WHERE
        rank <= 3
    ORDER BY
        item_type, rank
""").show()

+----------+---------+--------------+----+
|product_id|item_type|purchase_count|rank|
+----------+---------+--------------+----+
|        64|   jacket|            12|   1|
|        12|   jacket|            12|   2|
|        92|   jacket|            11|   3|
|        76|     jean|            14|   1|
|        90|     jean|            12|   2|
|        47|     jean|            11|   3|
|         3|    shirt|            13|   1|
|        59|    shirt|            12|   2|
|        54|    shirt|            12|   3|
|        85|     shoe|            16|   1|
|        96|     shoe|            15|   2|
|        69|     shoe|            15|   3|
|        23|  trouser|            13|   1|
|        25|  trouser|            12|   2|
|        68|  trouser|            12|   3|
+----------+---------+--------------+----+



### 4. Obtener los productos que son más caros que la media del precio de los productos.

#### Spark DataFrame:

In [None]:
## TODO : Solve this using Spark

#### Spark SQL Nativo:

In [10]:
spark.sql("""
    WITH average_price AS (
        SELECT AVG(price) AS avg_price FROM purchases
    )
    SELECT
        DISTINCT p.product_id,
        p.item_type,
        p.price,
        ROUND(a.avg_price, 2) AS average_price
    FROM
        purchases p
    CROSS JOIN
        average_price a
    WHERE
        p.price > a.avg_price
    ORDER BY
        p.price ASC
""").show()

+----------+---------+-------+-------------+
|product_id|item_type|  price|average_price|
+----------+---------+-------+-------------+
|         1|     shoe|49.7916|        49.78|
|        94|  trouser|49.7948|        49.78|
|        10|     jean|49.8007|        49.78|
|        86|  trouser|49.8073|        49.78|
|        54|  trouser|49.8356|        49.78|
|        94|  trouser|49.8365|        49.78|
|        40|     shoe|49.8797|        49.78|
|        51|   jacket|49.8858|        49.78|
|        52|    shirt|49.8867|        49.78|
|        60|  trouser|49.9317|        49.78|
|         6|    shirt|49.9319|        49.78|
|        92|   jacket|49.9374|        49.78|
|        30|     shoe| 49.963|        49.78|
|        85|   jacket|49.9758|        49.78|
|        15|    shirt|50.0076|        49.78|
|        71|    shirt|50.0145|        49.78|
|         8|    shirt|50.0415|        49.78|
|        38|  trouser|  50.06|        49.78|
|        46|   jacket|50.0828|        49.78|
|        9


### 5. Indicar la tienda que ha vendido más productos.

#### Spark DataFrame:

In [13]:
(
    purchases_df.groupBy("shop_id")
    .count()
    .orderBy(F.col("count").desc())
    .limit(1)
    .show()
)

+-------+-----+
|shop_id|count|
+-------+-----+
|     69|   47|
+-------+-----+



#### Spark SQL Nativo:

In [None]:
## TODO : Solve this using Spark

### 6. Indicar la tienda que ha facturado más dinero.

#### Spark DataFrame:

In [14]:
(
    purchases_df.groupBy("shop_id")
    .agg(F.sum("price").alias("revenue"))
    .orderBy(F.col("revenue").desc())
    .limit(1)
    .show()
)

+-------+------------------+
|shop_id|           revenue|
+-------+------------------+
|     69|2444.8898000000013|
+-------+------------------+



#### Spark SQL Nativo:

In [None]:
## TODO : Solve this using Spark

### 7. Dividir el mundo en 5 áreas geográficas iguales según la longitud (location.lon) y agregar una columna con el nombre del área geográfica (Area1: - 180 a - 108, Area2: - 108 a - 36, Area3: - 36 a 36, Area4: 36 a 108, Area5: 108 a 180), ...

In [None]:
## TODO : Solve this using Spark

#### 7.1. ¿En qué área se utiliza más PayPal?

#### Spark DataFrame:

In [None]:
## TODO : Solve this using Spark

#### Spark SQL Nativo:

In [None]:
## TODO : Solve this using Spark

#### 7.2. ¿Cuáles son los 3 productos más comprados en cada área?

#### Spark DataFrame:

In [None]:
## TODO : Solve this using Spark

#### Spark SQL Nativo:

In [None]:
## TODO : Solve this using Spark

#### 7.3. ¿Qué área ha facturado menos dinero?

#### Spark DataFrame:

In [None]:
## TODO : Solve this using Spark

#### Spark SQL Nativo:

In [None]:
## TODO : Solve this using Spark

### 8. Indicar los productos que no tienen stock suficiente para las compras realizadas.

#### Spark DataFrame:

In [21]:
purchases_count_df = (
    purchases_df
    .groupBy("product_id")
    .count()
    .withColumnRenamed("count", "purchases_made")
)

insufficient_stock_df = (
    stock_df
    .join(purchases_count_df, "product_id")
    .filter(F.col("quantity") < F.col("purchases_made"))
    .withColumnRenamed("quantity", "quantity_in_stock")
)

insufficient_stock_df.show()

+----------+-----------------+--------------+
|product_id|quantity_in_stock|purchases_made|
+----------+-----------------+--------------+
|        29|               25|            38|
|         1|               34|            39|
|        37|               22|            38|
+----------+-----------------+--------------+



#### Spark SQL Nativo:

In [None]:
## TODO : Solve this using Spark