As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called `"orders_data.parquet"` for you to clean and preprocess. 

You can see the dataset schema below along with the **cleaning requirements**:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

# Necessary Prerequisites in order to run spark from a Notebook locally

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

In [5]:
orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


# 1.- Addressing required changes on the order_date column

In [6]:
# First I will add an extra column to be able to filter order_date more easily
orders_data_w_date_column = orders_data.withColumn('date', F.date_format(orders_data.order_date, 'HH:mm:ss'))

# Printing to see the result
orders_data_w_date_column.show()

+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+--------+
|         order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|        cost_price|turnover|  margin|    date|
+-------------------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+--------+
|2023-01-22 21:25:00|  141234|              iPhone|5.638008983335E12|   Vêtements|944 Walnut St, Bo...|               1|     700.0|             231.0|   700.0|   469.0|21:25:00|
|2023-01-28 14:15:00|  141235|Lightning Chargin...|5.563319511488E12|Alimentation|185 Maple St, Por...|               1|     14.95|             7.475|   14.95|   7.475|14:15:00|
|2023-01-17 13:33:00|  141236|    Wired Headphones| 2.11397339522E12|   Vêtements|538 Adams St, San...|       

In [7]:
# Now I will filter out orders placed between 12am and 5am (inclusive)
orders_data_filtered = orders_data_w_date_column.filter(F.col('date') > '05:00:00')

# Converting order_date to date
orders_data_filtered = orders_data_filtered.withColumn('order_date', F.to_date('order_date'))

# Printing to see results
orders_data_filtered.show()
orders_data_filtered.printSchema()

+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+--------+
|order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|        cost_price|turnover|  margin|    date|
+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+--------+
|2023-01-22|  141234|              iPhone|5.638008983335E12|   Vêtements|944 Walnut St, Bo...|               1|     700.0|             231.0|   700.0|   469.0|21:25:00|
|2023-01-28|  141235|Lightning Chargin...|5.563319511488E12|Alimentation|185 Maple St, Por...|               1|     14.95|             7.475|   14.95|   7.475|14:15:00|
|2023-01-17|  141236|    Wired Headphones| 2.11397339522E12|   Vêtements|538 Adams St, San...|               2|     11.99|             5.995|   23.98|   11

# 2.- Creating 'time_of_day' column

In [8]:
# Creating new column and dropping 'date'

orders_data_time = orders_data_filtered.withColumn('time_of_day', 
                                F.when((F.col('date') >= '06:00:00') & (F.col('date') < '12:00:00'), 'morning')
                                 .when((F.col('date') >= '12:00:00') & (F.col('date') < '18:00:00'), 'afternoon')
                                 .otherwise('evening')).drop('date')

# Printing to see results
orders_data_time.show()

+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|        cost_price|turnover|  margin|time_of_day|
+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|2023-01-22|  141234|              iPhone|5.638008983335E12|   Vêtements|944 Walnut St, Bo...|               1|     700.0|             231.0|   700.0|   469.0|    evening|
|2023-01-28|  141235|Lightning Chargin...|5.563319511488E12|Alimentation|185 Maple St, Por...|               1|     14.95|             7.475|   14.95|   7.475|  afternoon|
|2023-01-17|  141236|    Wired Headphones| 2.11397339522E12|   Vêtements|538 Adams St, San...|               2|     11.99|             5.995

# 3.- Removing 'TV' from 'product' column

In [9]:
# Making all the 'product' column lowercase
orders_data_time = orders_data_time.withColumn('product', F.lower('product'))

# Filtering out TV
orders_no_tv = orders_data_time.filter(~(orders_data_time.product.contains('tv')))

# Printing to see results
orders_no_tv.show()

+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|        cost_price|turnover|  margin|time_of_day|
+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|2023-01-22|  141234|              iphone|5.638008983335E12|   Vêtements|944 Walnut St, Bo...|               1|     700.0|             231.0|   700.0|   469.0|    evening|
|2023-01-28|  141235|lightning chargin...|5.563319511488E12|Alimentation|185 Maple St, Por...|               1|     14.95|             7.475|   14.95|   7.475|  afternoon|
|2023-01-17|  141236|    wired headphones| 2.11397339522E12|   Vêtements|538 Adams St, San...|               2|     11.99|             5.995

# 4.- Making all values of 'category' lowercase

In [10]:
orders_no_tv = orders_no_tv.withColumn('category', F.lower('category'))

# Printing results
orders_no_tv.show()

+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|        cost_price|turnover|  margin|time_of_day|
+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|2023-01-22|  141234|              iphone|5.638008983335E12|   vêtements|944 Walnut St, Bo...|               1|     700.0|             231.0|   700.0|   469.0|    evening|
|2023-01-28|  141235|lightning chargin...|5.563319511488E12|alimentation|185 Maple St, Por...|               1|     14.95|             7.475|   14.95|   7.475|  afternoon|
|2023-01-17|  141236|    wired headphones| 2.11397339522E12|   vêtements|538 Adams St, San...|               2|     11.99|             5.995

# 5.- Creating 'purchase_state' column

In [11]:
# Will create new column 'purchase_state' by spliting and trimming to get read of unwanted whitespace
final_df = orders_no_tv.withColumn('purchase_state', F.trim(F.split(orders_no_tv.purchase_address, ',')[2]))

# Will update column 'purchase_state' to only maintain state
final_df = final_df.withColumn('purchase_state', F.split(final_df.purchase_state, ' ')[0])

# Printing the result
final_df.show()

+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+--------------+
|order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|        cost_price|turnover|  margin|time_of_day|purchase_state|
+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+--------------+
|2023-01-22|  141234|              iphone|5.638008983335E12|   vêtements|944 Walnut St, Bo...|               1|     700.0|             231.0|   700.0|   469.0|    evening|            MA|
|2023-01-28|  141235|lightning chargin...|5.563319511488E12|alimentation|185 Maple St, Por...|               1|     14.95|             7.475|   14.95|   7.475|  afternoon|            OR|
|2023-01-17|  141236|    wired headphones| 2.11397339522E12|   vê

# 6.- Saving to 'orders_data_clean.parquet' file

In [None]:
final_df.write.mode('overwrite').parquet('orders_data_clean.parquet')