- Preparar el fichero `orders_data.parquet` de modo que pueda ser usado para contruir un 'forecasting model'.  
- Limpiar la dataset para que cumpla los requerimientos del equipo de Data y Machine Learning.  
- Guardar el archivo actualizado (limpio) como `orders_data_clean.parquet`

  
![](https://community.cloud.databricks.com/files/EP_3/1.png)

Como ingeniero de datos de una empresa de comercio electrónico llamada Voltmart, un equipo de aprendizaje automático le ha pedido que limpie los datos que contienen información sobre los pedidos realizados el año pasado. Tienen previsto utilizar estos datos depurados para crear un modelo de previsión de la demanda (Forecasting Model). Para ello, han compartido sus requisitos sobre el formato de tabla de salida deseado.

Un analista ha compartido un archivo parquet llamado `orders_data.parquet` para que usted los limpie y los preprocese.

A continuación puede ver el esquema del conjunto de datos junto con los requisitos de limpieza de los perezosos analistas de datos:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [0]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

In [0]:
df = spark.read.parquet('dbfs:/FileStore/tables/orders_data.parquet')
df.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


### Respuestas:

##### 1. Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date 

In [0]:
from pyspark.sql.functions import hour, col

df_filtered_hours = df.filter(~((hour(col("order_date")) >= 0) & (hour(col("order_date")) <= 5)))

df_export = df_filtered_hours

df_filtered_hours.show(5)

+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+
|         order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|cost_price|turnover| margin|
+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+
|2023-01-22 21:25:00|  141234|              iPhone|5.638008983335E12|   Vêtements|944 Walnut St, Bo...|               1|     700.0|     231.0|   700.0|  469.0|
|2023-01-28 14:15:00|  141235|Lightning Chargin...|5.563319511488E12|Alimentation|185 Maple St, Por...|               1|     14.95|     7.475|   14.95|  7.475|
|2023-01-17 13:33:00|  141236|    Wired Headphones| 2.11397339522E12|   Vêtements|538 Adams St, San...|               2|     11.99|     5.995|   23.98|  11.99|
|2023-01-05 20:33:00|  141237|    27in F

##### 2. New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm

El problema principal en tu código está en cómo se están sobreescribiendo las columnas dentro del DataFrame. Estás creando una columna time_of_day dos veces, pero al hacerlo en secuencia, la segunda sobreescribe el valor asignado por la primera. Específicamente, la segunda asignación de valores para time_of_day está reemplazando la que habías hecho para la "mañana" (morning) con la lógica para la "tarde" (afternoon), y en consecuencia solo ves las filas con "afternoon" (siempre que estén en ese rango horario).


````python
df_orders = df.withColumn(
    'time_of_day',
    F.when((hour(F.col("order_date")) >= 0) & (hour(F.col("order_date")) <= 5), "morning")
    .otherwise(None)  
).withColumn(
    'time_of_day',
    F.when((hour(F.col("order_date")) >= 15) & (hour(F.col("order_date")) < 18), "afternoon")
    .otherwise(None)  
)
````

In [0]:
from pyspark.sql import functions as F

df_time_of_day = df.withColumn(
    'time_of_day',
    F.when((hour(F.col("order_date")) >= 0) & (hour(F.col("order_date")) <= 5), "morning")
    .when((hour(F.col("order_date")) >= 15) & (hour(F.col("order_date")) < 18), "afternoon")
    .otherwise(None)
)

df_export = df_export.withColumn(
    'time_of_day',
    F.when((hour(F.col("order_date")) >= 0) & (hour(F.col("order_date")) <= 5), "morning")
    .when((hour(F.col("order_date")) >= 15) & (hour(F.col("order_date")) < 18), "afternoon")
    .otherwise(None)
)

#df_export.filter(df_export['time_of_day'] == 'morning').show(5)
#df_export.filter(df_export['time_of_day'] == 'afternoon').show(5)

df_time_of_day.filter(df_time_of_day['time_of_day'] == 'morning').show(5)
df_time_of_day.filter(df_time_of_day['time_of_day'] == 'afternoon').show(5)

+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+------+-----------+
|         order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|cost_price|turnover|margin|time_of_day|
+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+------+-----------+
|2023-01-17 00:09:00|  141253|AA Batteries (4-p...|6.741495725758E12|Alimentation|385 11th St, Atla...|               1|      3.84|      1.92|    3.84|  1.92|    morning|
|2023-01-20 00:21:00|  141296|USB-C Charging Cable|9.557772918548E12|Électronique|889 Cedar St, Atl...|               1|     11.95|     5.975|   11.95| 5.975|    morning|
|2023-01-10 01:32:00|  141315|USB-C Charging Cable|1.997185248353E12|Électronique|842 8th St, Seatt...|               1|     11.95|     5.975|   

##### 3. Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase

In [0]:
df_no_tv = df.filter(~(F.lower(F.col('product')).contains('tv')))
df_with_tv = df.filter((F.lower(F.col('product')).contains('tv')))

df_export = df_export.filter(~(F.lower(F.col('product')).contains('tv')))

print('Eliminado las columnas que contengan tv')
df_no_tv.show(3)

print('Filas eliminadas')
df_with_tv.show(3)

Eliminado las columnas que contengan tv
+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+------+
|         order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|cost_price|turnover|margin|
+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+------+
|2023-01-22 21:25:00|  141234|              iPhone|5.638008983335E12|   Vêtements|944 Walnut St, Bo...|               1|     700.0|     231.0|   700.0| 469.0|
|2023-01-28 14:15:00|  141235|Lightning Chargin...|5.563319511488E12|Alimentation|185 Maple St, Por...|               1|     14.95|     7.475|   14.95| 7.475|
|2023-01-17 13:33:00|  141236|    Wired Headphones| 2.11397339522E12|   Vêtements|538 Adams St, San...|               2|     11.99|     5.995|   23.98| 11.99|
+-----

##### 4. Ensure all values are lowercase

In [0]:
from pyspark.sql import functions as F

for col_name in df.columns:
    # Si la columna es de tipo string, aplica la transformación
    if dict(df.dtypes)[col_name] == 'string':
        df_lowercase = df.withColumn(col_name, F.lower(F.col(col_name)))

for col_name in df_export.columns:
    if dict(df_export.dtypes)[col_name] == 'string':
        df_export = df_export.withColumn(col_name, F.lower(F.col(col_name)))

df_lowercase.show(5)

+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+
|         order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|cost_price|turnover| margin|
+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+
|2023-01-22 21:25:00|  141234|              iPhone|5.638008983335E12|   Vêtements|944 walnut st, bo...|               1|     700.0|     231.0|   700.0|  469.0|
|2023-01-28 14:15:00|  141235|Lightning Chargin...|5.563319511488E12|Alimentation|185 maple st, por...|               1|     14.95|     7.475|   14.95|  7.475|
|2023-01-17 13:33:00|  141236|    Wired Headphones| 2.11397339522E12|   Vêtements|538 adams st, san...|               2|     11.99|     5.995|   23.98|  11.99|
|2023-01-05 20:33:00|  141237|    27in F

##### 5. New column containing: the State that the purchase was ordered from

In [0]:
df_state_column = df.withColumn("State", F.split(F.split(df["purchase_address"], ",")[2], "\s")[1])

df_export = df_export.withColumn("State", F.split(F.split(df_export["purchase_address"], ",")[2], "\s")[1])

df_state_column.show(5)

+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+-----+
|         order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|cost_price|turnover| margin|State|
+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+-----+
|2023-01-22 21:25:00|  141234|              iPhone|5.638008983335E12|   Vêtements|944 Walnut St, Bo...|               1|     700.0|     231.0|   700.0|  469.0|   MA|
|2023-01-28 14:15:00|  141235|Lightning Chargin...|5.563319511488E12|Alimentation|185 Maple St, Por...|               1|     14.95|     7.475|   14.95|  7.475|   OR|
|2023-01-17 13:33:00|  141236|    Wired Headphones| 2.11397339522E12|   Vêtements|538 Adams St, San...|               2|     11.99|     5.995|   23.98|  11.99|   CA|
|202

##### 6. Guardar archivo final limpio con nombre `orders_data_clean.parquet` 

In [0]:
df.coalesce(1).write.parquet("/output/orders_data_clean.parquet")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4207689823290210>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mcoalesce[49m[43m([49m[38;5;241;43m1[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mparquet[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43m/output/orders_data_clean.parquet[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[4

##### 7. Exportar archivo limpio en formato CSV 

In [0]:
df.coalesce(1).write.csv("/output/orders_data_clean.csv")

In [0]:
df_export.display()

order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day,State
2023-01-22T21:25:00.000+0000,141234,iphone,5638008983335.0,vêtements,"944 walnut st, boston, ma 02215",1,700.0,231.0,700.0,469.0,,ma
2023-01-28T14:15:00.000+0000,141235,lightning charging cable,5563319511488.0,alimentation,"185 maple st, portland, or 97035",1,14.95,7.475,14.95,7.475,,or
2023-01-17T13:33:00.000+0000,141236,wired headphones,2113973395220.0,vêtements,"538 adams st, san francisco, ca 94016",2,11.99,5.995,23.98,11.99,,ca
2023-01-05T20:33:00.000+0000,141237,27in fhd monitor,3069156759167.0,sports,"738 10th st, los angeles, ca 90001",1,149.99,97.4935,149.99,52.4965,,ca
2023-01-25T11:59:00.000+0000,141238,wired headphones,9692680938163.0,électronique,"387 10th st, austin, tx 73301",1,11.99,5.995,11.99,5.995,,tx
2023-01-29T20:22:00.000+0000,141239,aaa batteries (4-pack),2953868554188.0,alimentation,"775 willow st, san francisco, ca 94016",1,2.99,1.495,2.99,1.495,,ca
2023-01-26T12:16:00.000+0000,141240,27in 4k gaming monitor,5173670800988.0,vêtements,"979 park st, los angeles, ca 90001",1,389.99,128.69670000000002,389.99,261.2933,,ca
2023-01-05T12:04:00.000+0000,141241,usb-c charging cable,8051736777568.0,vêtements,"181 6th st, san francisco, ca 94016",1,11.95,5.975,11.95,5.975,,ca
2023-01-01T10:30:00.000+0000,141242,bose soundsport headphones,1508418177978.0,électronique,"867 willow st, los angeles, ca 90001",1,99.99,49.995,99.99,49.995,,ca
2023-01-22T21:20:00.000+0000,141243,apple airpods headphones,1386344211590.0,électronique,"657 johnson st, san francisco, ca 94016",1,150.0,97.5,150.0,52.5,,ca
