# Data Engineer Technical Assigment
### Author: Konrad Wronski
Description: 
Technical assignment description
You are a data engineer working for a retail company. The company’s database handles large
volumes of data. You are tasked with creating a datalake in ADLS for reporting purposes.
For simplicity, instead of tables, assume files are in a source folder “Sales_Data” in csv format and
instead of ADLS, target location is “cleansed” folder (Create it yourself).
Source folder contains monthly sales data files.
1. Design the process to read all the files from source folder using PySpark, combine them
as a single file, and write it to the cleansed folder.
2. Which file format would you choose to write in the cleansed folder and why?
3. Mention data partitioning strategy you would propose for this table and justify your choice
of partitioning method.
4. Additionally, outline the steps you would take to implement this partitioning strategy,
considering both the technical aspects and potential challenges.
Note: Process all the files in a single run. Ensure that there is no data duplication.
[Hint: Make use of Window function for deduplication]
### Expected results of assignment:
- A notebook or python script.
- A separate documentation file with a brief explanation of the approach, data exploration,
assumptions/considerations, and instructions on how to run the application (if any).
- Output dataset in cleansed folder in your preferred file format.
- Data quality checks (like input/output dataset validation)
### Metyis development guidelines
- We appreciate a combination of Software and Data Engineering good practices.
- Proper logging and exception handling
### Evaluation criteria for results of technical assignment
We use following criteria to evaluate results:
- Well-structured code: we expect maintainability, readability.
- Scalability: Should be able to handle high volumes of data.
- Documentation

## 1. Data Ingestion 

### 1.1 Importing packages

In [55]:
from pyspark.sql import SparkSession
import pandas as pd 
import numpy as np 
import seaborn as sns 
import matplotlib.pyplot as plt
import os 
from pyspark.sql.types import *
from pyspark.sql.functions import col,isnan, when, count, to_timestamp, to_date, date_format, row_number, col, year, month
from pyspark.sql.window import Window


### 1.2 Initializing Session

In [34]:
spark = SparkSession.builder.appName("SalesETL").getOrCreate()

In [35]:
source = "/Users/konradwronski/Desktop/Projects/Grind/DataScienceJungle/MetyisTask/Sales_Data"
endpoint = "/Users/konradwronski/Desktop/Projects/Grind/DataScienceJungle/MetyisTask/Cleansed"

### 1.2.1 Checking the file 

In [36]:
df = spark.read.options(header = True, inferSchema = True).csv("/Users/konradwronski/Desktop/Projects/Grind/DataScienceJungle/MetyisTask/Sales_Data/Sales_April_2019.csv")

In [37]:
df.show()

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|    NULL|                NULL|            NULL|      NULL|          NULL|                NULL|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|     600.0|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99|04/30/19 09:27|333 8th St, Los A...|
|  176562|USB-C Charging Cable|               1|     11.95|04/29/19 13:03|381 Wilson St, Sa...|
|  176563|Bose SoundSport H...|         

In [38]:
df.printSchema()

root
 |-- Order ID: integer (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: integer (nullable = true)
 |-- Price Each: double (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



### 1.3 Loading the data 

Creating an empty Data Frame to store merged files.

In [39]:
salesSchema = StructType([StructField('Order ID',
                                  IntegerType(), True),
                    StructField('Product',
                                StringType(), True),
                    StructField('Quantity Ordered',
                                IntegerType(), True),
                    StructField('Price Each',
                                DoubleType(), True),
                    StructField('Order Date',
                                StringType(), True),
                    StructField('Purchase Address',
                                StringType(), True)
                    ])
sales = spark.createDataFrame(data = [], schema = salesSchema)

In [40]:
def load(file, base):
    '''Function loading and merging all of the datasets/tables'''
    dfFile = spark.read.options(header = True).schema(salesSchema).csv(file)
    base = base.unionByName(dfFile)
    return base

Looping through all of the files in the Sales Data folder to merge them into one.

In [41]:
directory = os.fsencode(source)

for file in os.listdir(directory): 
    filename = os.path.join(source, os.fsdecode(file))
    sales = load(filename, sales)
    print(f"File {file} has been loaded and appended to the main file. Current Length of the file {sales.count()}")

File b'Sales_December_2019.csv' has been loaded and appended to the main file. Current Length of the file 25117
File b'Sales_April_2019.csv' has been loaded and appended to the main file. Current Length of the file 43500
File b'Sales_February_2019.csv' has been loaded and appended to the main file. Current Length of the file 55536
File b'Sales_March_2019.csv' has been loaded and appended to the main file. Current Length of the file 70762
File b'Sales_August_2019.csv' has been loaded and appended to the main file. Current Length of the file 82773
File b'Sales_May_2019.csv' has been loaded and appended to the main file. Current Length of the file 99408
File b'Sales_November_2019.csv' has been loaded and appended to the main file. Current Length of the file 117069
File b'Sales_October_2019.csv' has been loaded and appended to the main file. Current Length of the file 137448
File b'Sales_January_2019.csv' has been loaded and appended to the main file. Current Length of the file 147171
File

## 2. Data Exploration & Transformation

In [42]:
sales.show()

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|    1700.0|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|         

### 2.1 Handling Missing Values

In [14]:
sales.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()



+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|     900|    545|             900|       900|       545|             545|
+--------+-------+----------------+----------+----------+----------------+



                                                                                

In [15]:
missing_data_rows = sales.filter(
    (col("Order ID").isNull()) | (col("Product").isNull()) | 
    (col("Quantity Ordered").isNull()) | (col("Price Each").isNull()) |
    (col("Order Date").isNull()) | (col("Purchase Address").isNull()) |
    (isnan("Order ID")) | (isnan("Product")) | 
    (isnan("Quantity Ordered")) | (isnan("Price Each")) |
    (isnan("Order Date")) | (isnan("Purchase Address"))
)
missing_data_rows.show(truncate=False)

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|NULL    |Product|NULL            |NULL      |Order Date|Purchase Address|
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |Product|NULL            |NULL      |Order Date|Purchase Address|
|NULL    |Product|NULL            |NULL      |Order Date|Purchase Address|
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |Product|NULL   

                                                                                

In [18]:
sales = sales.dropna(how="any")

In [19]:
sales.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()



+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|       0|      0|               0|         0|         0|               0|
+--------+-------+----------------+----------+----------+----------------+



                                                                                

### 2.2 Checking for duplicates 

In [46]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define window specification
window_spec = Window.partitionBy("Order ID", "Product", "Quantity Ordered", "Price Each", "Purchase Address") \
                    .orderBy(col("Order Date").desc())

# Apply row_number for deduplication
sales_no_dup = sales.withColumn("row_num", row_number().over(window_spec))

# Keep only the first row for each partition
sales_no_dup = sales_no_dup.filter(col("row_num") == 1).drop("row_num")

sales_no_dup.show()

sales_no_dup.count()


                                                                                

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  141250|     Vareebadd Phone|               1|     400.0|01/10/19 11:20|471 Center St, Lo...|
|  141254|AAA Batteries (4-...|               1|      2.99|01/08/19 11:51|238 Sunset St, Se...|
|  141265|Apple Airpods Hea...|               1|     150.0|01/01/19 16:52|853 Ridge St, Bos...|
|  141270|    Wired Headphones|               1|     11.99|01/27/19 23:10|469 Hill St, San ...|
|  141275|    Wired Headphones|               1|     11.99|01/07/19 16:06|610 Walnut St, Au...|
|  141321|Bose SoundSport H...|               1|     99.99|01/10/19 09:07|207 8th St, Los A...|
|  141334|USB-C Charging Cable|               1|     11.95|01/15/19 02:05|459 4th St, San F...|
|  141338|AAA Batteries (4-...|         

                                                                                

185688

### 2.3 Transforming Order Date column 

Changing column data type to timestamp then spliting into two columns: Timestamp and Date

In [49]:
sales = sales.withColumn("Order Date", to_timestamp("Order Date", "MM/dd/yy HH:mm"))
sales = sales.withColumn("Order Timestamp", date_format("Order Date", "HH:mm"))
sales = sales.withColumn("Order Date", to_date("Order Date"))


In [51]:
sales.show()

+--------+--------------------+----------------+----------+----------+--------------------+---------------+
|Order ID|             Product|Quantity Ordered|Price Each|Order Date|    Purchase Address|Order Timestamp|
+--------+--------------------+----------------+----------+----------+--------------------+---------------+
|  295665|  Macbook Pro Laptop|               1|    1700.0|2019-12-30|136 Church St, Ne...|          00:01|
|  295666|  LG Washing Machine|               1|     600.0|2019-12-29|562 2nd St, New Y...|          07:03|
|  295667|USB-C Charging Cable|               1|     11.95|2019-12-12|277 Main St, New ...|          18:21|
|  295668|    27in FHD Monitor|               1|    149.99|2019-12-22|410 6th St, San F...|          15:13|
|  295669|USB-C Charging Cable|               1|     11.95|2019-12-18|43 Hill St, Atlan...|          12:38|
|  295670|AA Batteries (4-p...|               1|      3.84|2019-12-31|200 Jefferson St,...|          22:58|
|  295671|USB-C Charging Cab

## 3. Loading Data into new folder

In [57]:
sales = sales.withColumn("year", year(col("Order date")))
sales = sales.withColumn("month", month(col("Order date")))

In [58]:
sales.write.mode("overwrite").partitionBy("year", "month").parquet(endpoint)

25/02/17 18:53:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
25/02/17 18:53:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
25/02/17 18:53:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
25/02/17 18:53:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
25/02/17 18:53:46 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
25/02/17 18:53:47 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
25/02/17 18:53:47 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014

In [59]:
input_count = sales.count()
output_count = spark.read.parquet(endpoint).count()
assert input_count == output_count, "Data loss occurred!"

## 4. Data Visualization

Ideas:
- Top Selling Products
- Sales trend - line graph of sales in each month
- Top buying spots
- Day of the week - when sales are the best