# Caso 1
- Preparar el fichero `orders_data.parquet` de modo que pueda ser usado para contruir un 'forecasting model'.  
- Limpiar la dataset para que cumpla los requerimientos del equipo de Data y Machine Learning.  
- Guardar el archivo actualizado (limpio) como `orders_data_clean.parquet`

Como ingeniero de datos de una empresa de comercio electrónico llamada Voltmart, un equipo de aprendizaje automático le ha pedido que limpie los datos que contienen información sobre los pedidos realizados el año pasado. Tienen previsto utilizar estos datos depurados para crear un modelo de previsión de la demanda (Forecasting Model). Para ello, han compartido sus requisitos sobre el formato de tabla de salida deseado.

Un analista ha compartido un archivo parquet llamado `orders_data.parquet` para que usted los limpie y los preprocese.

A continuación puede ver el esquema del conjunto de datos junto con los requisitos de limpieza de los perezosos analistas de datos:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [0]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

In [0]:
df = spark.read.parquet('dbfs:/FileStore/pyspark-exam/case_1/dataset/orders_data.parquet')
df.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


In [0]:
# Mostrar la tabla, para ver como esta antes de realizar la limpieza.
df.show(truncate=5)

+----------+--------+-------+----------+--------+----------------+----------------+----------+----------+--------+------+
|order_date|order_id|product|product_id|category|purchase_address|quantity_ordered|price_each|cost_price|turnover|margin|
+----------+--------+-------+----------+--------+----------------+----------------+----------+----------+--------+------+
|     20...|   14...|  iP...|     5....|   Vê...|           94...|               1|     700.0|     231.0|   700.0| 469.0|
|     20...|   14...|  Li...|     5....|   Al...|           18...|               1|     14.95|     7.475|   14.95| 7.475|
|     20...|   14...|  Wi...|     2....|   Vê...|           53...|               2|     11.99|     5.995|   23.98| 11.99|
|     20...|   14...|  27...|     3....|   Sp...|           73...|               1|     14...|     97...|   14...| 52...|
|     20...|   14...|  Wi...|     9....|   Él...|           38...|               1|     11.99|     5.995|   11.99| 5.995|
|     20...|   14...|  A

### Respuestas:

**1. Modificar:** Eliminar pedidos realizados entre las 12 a.m. (00:00) y las 5 a.m. (inclusive); convertir de marca de tiempo a fecha

In [0]:
from pyspark.sql.functions import col, hour, to_date, hour

# Separar la columna 'order_date' en dos columnas: 'date' y 'hour'
df_filter = df.withColumn('hour', hour(col('order_date'))) \
                .withColumn('order_date', to_date(col('order_date')))

# Filtrar los registros que están entre las 00:00 y las 05:00 (inclusive)
df_filtered = df_filter.filter(~((col('hour') >= 0) & (col('hour') <= 5)))

# Ver el dataframe resultante después del filtrado
df_filtered.select('order_date', 'hour').show()


+----------+----+
|order_date|hour|
+----------+----+
|2023-01-22|  21|
|2023-01-28|  14|
|2023-01-17|  13|
|2023-01-05|  20|
|2023-01-25|  11|
|2023-01-29|  20|
|2023-01-26|  12|
|2023-01-05|  12|
|2023-01-01|  10|
|2023-01-22|  21|
|2023-01-07|  11|
|2023-01-31|  10|
|2023-01-09|  18|
|2023-01-25|  19|
|2023-01-03|  21|
|2023-01-05|  17|
|2023-01-10|  11|
|2023-01-24|   8|
|2023-01-30|   9|
|2023-01-08|  11|
+----------+----+
only showing top 20 rows



Para facilitar la eliminacion por horas, agregamos una columna con horas, y transformamos `order_date` en formato `date`, asi al momento de eliminar solo evaluamos la columna `hour` que nos facilita la eliminación de los registros por horas.

**2. Nueva columna** que contiene (límite inferior inclusivo, límite superior exclusivo):
- "morning" para pedidos realizados de 5 a 12 a.m.,
- "afternoon" para pedidos realizados de 12 a 6 p.m., y
- "evening" para pedidos realizados de 6 a 12 a.m.


In [0]:
from pyspark.sql.functions import col, when

# Crear la nueva columna 'time_of_day' basada en la hora de la orden
df_filtered = df_filter.withColumn(
  'time_of_day',
    when((col('hour') >= 5) & (col('hour') < 12), 'morning')  # 5am a 12pm -> "morning"
    .when((col('hour') >= 12) & (col('hour') < 18), 'afternoon')  # 12pm a 6pm -> "afternoon"
    .when((col('hour') >= 18) & (col('hour') < 24), 'evening')  # 6pm a 12am -> "evening"
    .otherwise('night')  # Si no cae en ninguno de los rangos anteriores, asignar "night"
)

# Mostrar el DataFrame resultante con la nueva columna
df_filtered.select('order_date', 'hour', 'time_of_day').show()

+----------+----+-----------+
|order_date|hour|time_of_day|
+----------+----+-----------+
|2023-01-22|  21|    evening|
|2023-01-28|  14|  afternoon|
|2023-01-17|  13|  afternoon|
|2023-01-05|  20|    evening|
|2023-01-25|  11|    morning|
|2023-01-29|  20|    evening|
|2023-01-26|  12|  afternoon|
|2023-01-05|  12|  afternoon|
|2023-01-01|  10|    morning|
|2023-01-22|  21|    evening|
|2023-01-07|  11|    morning|
|2023-01-31|  10|    morning|
|2023-01-09|  18|    evening|
|2023-01-25|  19|    evening|
|2023-01-03|  21|    evening|
|2023-01-05|  17|  afternoon|
|2023-01-10|  11|    morning|
|2023-01-24|   8|    morning|
|2023-01-30|   9|    morning|
|2023-01-17|   0|      night|
+----------+----+-----------+
only showing top 20 rows



**3. Elimina filas:** que contengan "TV" ya que la empresa ha dejado de vender este producto; asegurarse de que todos los valores esten en ninúsculas.

In [0]:
from pyspark.sql.functions import col, lower

# Cambiar a minúsculas la columna 'product' en el DataFrame df_filtered
df_filtered = df_filtered.withColumn("product", lower(col("product")))

# Contar los productos que contienen 'tv' antes de filtrar
count_tv_before = df_filtered.filter(col("product").like("%tv%")).count()
print(f"Productos con TV antes de filtrar: {count_tv_before}")

# Filtrar las filas que contienen 'tv' en la columna 'product' (eliminar las que contienen 'tv')
df_filtered = df_filtered.filter(~col("product").like("%tv%"))

# Contar los productos que contienen 'tv' después de filtrar
count_tv_after = df_filtered.filter(col("product").like("%tv%")).count()
print(f"Productos con TV después de filtrar: {count_tv_after}")
df_filtered.select('product').show()


Productos con TV antes de filtrar: 0
Productos con TV después de filtrar: 0
+--------------------+
|             product|
+--------------------+
|              iphone|
|lightning chargin...|
|    wired headphones|
|    27in fhd monitor|
|    wired headphones|
|aaa batteries (4-...|
|27in 4k gaming mo...|
|usb-c charging cable|
|bose soundsport h...|
|apple airpods hea...|
|apple airpods hea...|
|  macbook pro laptop|
|aaa batteries (4-...|
|    27in fhd monitor|
|    27in fhd monitor|
|     vareebadd phone|
|apple airpods hea...|
|usb-c charging cable|
|aa batteries (4-p...|
|aaa batteries (4-...|
+--------------------+
only showing top 20 rows



4. Asegurate que todos los valores sean minúsculas, de la columna `category`.

In [0]:
from pyspark.sql.functions import col, lower

# Cambiamos a minúscula todo el dataset.
df_filtered = df_filtered.withColumn("category", lower(col("category")))

df_filtered.select('category').show()

+------------+
|    category|
+------------+
|   vêtements|
|alimentation|
|   vêtements|
|      sports|
|électronique|
|alimentation|
|   vêtements|
|   vêtements|
|électronique|
|électronique|
|   vêtements|
|   vêtements|
|   vêtements|
|   vêtements|
|alimentation|
|alimentation|
|alimentation|
|      sports|
|alimentation|
|électronique|
+------------+
only showing top 20 rows



**5. Nueva columna que contine:** El estado desde el cual se realizó la compra `purchase_state` a partir de mi `purchase_address`

In [0]:
from pyspark.sql.functions import regexp_extract, col

# Crear la nueva columna 'purchase_state' extrayendo el estado de la dirección
df_filtered = df_filtered.withColumn(
    "purchase_state", 
    regexp_extract(col("purchase_address"), r",\s*([^,]+)\s+\d{5}$", 1)
)

# Mostrar el resultado para verificar
df_filtered.select('purchase_address', 'purchase_state').show()


+--------------------+--------------+
|    purchase_address|purchase_state|
+--------------------+--------------+
|944 Walnut St, Bo...|            MA|
|185 Maple St, Por...|            OR|
|538 Adams St, San...|            CA|
|738 10th St, Los ...|            CA|
|387 10th St, Aust...|            TX|
|775 Willow St, Sa...|            CA|
|979 Park St, Los ...|            CA|
|181 6th St, San F...|            CA|
|867 Willow St, Lo...|            CA|
|657 Johnson St, S...|            CA|
|492 Walnut St, Sa...|            CA|
|322 6th St, San F...|            CA|
|618 7th St, Los A...|            CA|
|512 Wilson St, Sa...|            CA|
|440 Cedar St, Por...|            OR|
|471 Center St, Lo...|            CA|
|414 Walnut St, Bo...|            MA|
|220 9th St, Los A...|            CA|
|385 11th St, Atla...|            GA|
|238 Sunset St, Se...|            WA|
+--------------------+--------------+
only showing top 20 rows



##### 6. Guardar archivo final limpio con nombre `orders_data_clean.parquet` 

In [0]:
# Guardar el archivo en formato parquet
df_filtered.coalesce(1).write.mode('overwrite').parquet('dbfs:/FileStore/pyspark-exam/case_1/dataset/orders_data_clean.parquet')

Cargamos el archivo desde el parquet, para verificar que se ha cargado bien los datos.

In [0]:
df_loaded = spark.read.parquet('dbfs:/FileStore/pyspark-exam/case_1/dataset/orders_data_clean.parquet')
df_loaded.select('product', 'category', 'purchase_state').show(5)

+--------------------+------------+--------------+
|             product|    category|purchase_state|
+--------------------+------------+--------------+
|              iphone|   vêtements|            MA|
|lightning chargin...|alimentation|            OR|
|    wired headphones|   vêtements|            CA|
|    27in fhd monitor|      sports|            CA|
|    wired headphones|électronique|            TX|
+--------------------+------------+--------------+
only showing top 5 rows



##### 7. Exportar archivo limpio en formato CSV 

Imprimimos todo el dataframe: `df_filtered` para verificar que ha ido todo bien.

In [0]:
# He comentado el comando para evitar que pese tanto
# df_filtered.display()
df_filtered.select('order_date', 'product', 'category', 'purchase_state').show()

+----------+--------------------+------------+--------------+
|order_date|             product|    category|purchase_state|
+----------+--------------------+------------+--------------+
|2023-01-22|              iphone|   vêtements|            MA|
|2023-01-28|lightning chargin...|alimentation|            OR|
|2023-01-17|    wired headphones|   vêtements|            CA|
|2023-01-05|    27in fhd monitor|      sports|            CA|
|2023-01-25|    wired headphones|électronique|            TX|
|2023-01-29|aaa batteries (4-...|alimentation|            CA|
|2023-01-26|27in 4k gaming mo...|   vêtements|            CA|
|2023-01-05|usb-c charging cable|   vêtements|            CA|
|2023-01-01|bose soundsport h...|électronique|            CA|
|2023-01-22|apple airpods hea...|électronique|            CA|
|2023-01-07|apple airpods hea...|   vêtements|            CA|
|2023-01-31|  macbook pro laptop|   vêtements|            CA|
|2023-01-09|aaa batteries (4-...|   vêtements|            CA|
|2023-01

Para realizar la exportación del `csv`, hacemos un display y luego descargamos todo.