# Data Engineer Assessment
## UC01: TTD_DE_UC01_EDA: Perform `Exploratory Data Analysis (EDA)` on provided CSV data

## Summary
This assessments evaluate the capabilities of the candidate in solving data engineering use cases. The candidate is required to solve the below assessment questions using a Jupyter notebook and post the solutions in the notebook in the assessment section.  

Each assessment is structured as a collection of one or more scenarios that need to be addressed by the data engineer.


* __Problem Statement__ - Business users have asked the data engineers to assist with exploratory data analysis to enable business make informed decisions.
* __Description__ - Business would like to perform `Exploratory Data Analysis` on the dataset as part of reporting and also to prepare data for Machine Learning purposes.  
The business user has recently joined the organization and is unfamiliar with the data and has asked the data engineer to just assist with the review of the data so that they generate reports together.

The business user would first like to explore the data and see if there are any patterns in the data that can be used for reporting.


## Code Complexity
- Low / Medium


## `Diagram - Also refer PDF in folder`

![Exploratory Data Analysis](./TTD_UC01_EDA.png "Exploratory Data")



## Datasets:

`File Location`: Refer to the attached `data` folder for information. 
The `data` folder is at the following location. - [data.zip]( 
https://drive.google.com/file/d/1NBXP1nFhyuZGgO8YHLL6yQqAPuM1zNca/view?usp=sharing)

* Vehicles (vehicles.csv)  at the plants (plants.csv) are built to order (orders.csv) placed - order_number
* Vehicles are manufactured at different Company plants (plants.csv)-  (plant_code_id)
* Customer (customers.csv) provides reviews(welcome_call.csv) 60 to 80 days after the vehicles are delivered(vin)
* Orders (orders.csv) are logged by sales_rep_number at various BMW dealerships.
* Sales (sales_rep.csv) representatives are linked to dealership (dealers.csv) and have dealership names




## Perform the following joins:

* Link all the data based on the statements made above to create a larger dataset that answers the below questions.
* Identify any duplicates in the data and perform cleanup of the duplicates. Just drop the duplicates columns.
* The Dataset must contain vehicles linked to the order, sentiments, sales people, plants
* Provide the name of the sales person (first_name, last_name and sales_number the dealership)



## Questions: `Exploratory Data Analysis - Provide graphs for options below and document your observations in markdown. `

1. Perform `Exploratory data analysis` and provide insights into the data.
2.  Provide the distribution by brand, model, iso_country.
3. Provide the percentage of customers that have purchased more than 1 car.
4.  Provide the distribution of the vehicles manufactured by the plants and provide information brand, model  etc.
5.  Provide the top sales peoples per dealership - 10 top sales people
6.  Indicate the total sales per dealership.
7.  Get the models of the cars that had the most positive reviews (reviews greater than 3.5)
8.  Provide a distribution of the vehicles by different status.
9.  List all the dealerships that have sold the Rolls-Royce brand.




## Libraries or Options used
* Jupyter Notebook - Install and run locally on your laptop or device.
* PySpark, Pandas and matplot lib or similar plotting libraries.
* Other Python libraries required for Exploratory Data Analysis



## `Acceptance Criteria`
The following acceptance criteria must be met:

1. Perform Exploratory data Analysis and present your results as observations.
2. Python Graph libraries must be used to plot graphs to support your findings.
3. Comment your notebook file with markdown indicating observations: and write statements to indicate your observations.
4. Perform Analysis fo the Data using Spark or Pandas

# Implementation

Provide all the implementation steps in the sections that follow. Ensure that you provide detailed explanations of the approach.


### Import the libraries that you need for EDA

In [None]:
# Import any relevant libraries
import os
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import when as when
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import numpy as np

In [None]:

# initialize SparkSession object
spark = SparkSession.builder.appName("de_assessment_eda").getOrCreate()

#### List of expected dataframes to be loaded


  * Vehicles (vehicles.csv) at the plants (plants.csv) are built to order (orders.csv) placed - order_number
  * Vehicles are manufactured at different Company plants (plants.csv)- (plant_code_id)
  * Customer (customers.csv) provides reviews(welcome_call.csv) 60 to 80 days after the vehicles are delivered(vin)
  * Orders (orders.csv) are logged by sales_rep_number at various BMW dealerships.
  * Sales (sales_rep.csv) representatives are linked to dealership (dealers.csv) and have dealership names



### Load the data from the data folder into the data frame.

In [None]:
# Write your code below to load the relevant data into a data frame
# Perform any Clean up operations if required. remove duplicates etc.


## Customers Spark DataFrame converted to a TempView in order to enable SparkSQL

spark_customers_df = spark.read.option("delimiter",'|').option("header","true").csv("data/customers.csv").dropDuplicates().dropna(how='all')

spark_customers_df.createOrReplaceTempView("customerTempView")
spark_customers_df_filtered = spark.sql(sqlQuery="SELECT * FROM customerTempView WHERE left(customer_id,3) like 'CNI%'") #filtering invalid CustomerID values

## Vehicle Orders Spark DataFrame
spark_orders_df = spark.read.option("delimiter",'|').option("header","true").csv("data/orders.csv")

## Dealership Spark DataFrame converted to a TempView in order to enable SparkSQL
spark_dealers_df = spark.read.option("delimiter",'|').option("header","true").csv("data/dealers.csv").dropDuplicates()
spark_dealers_df.createOrReplaceTempView("dealershipTempView")
spark_dealers_df_filtered = spark.sql(sqlQuery="SELECT * FROM dealershipTempView WHERE left(dealer_code,3) like 'DNI%'") #filtering invalid DealershipNumber values

## BMW Plant Spark DataFrame
spark_plants_df = spark.read.option("delimiter",'|').option("header","true").csv("data/plants.csv")


## SalesRep Spark DataFrame converted to a TempView in order to enable SparkSQL
spark_sales_person_df = spark.read.option("delimiter",'|').option("header","true").csv("data/sales_person.csv").dropDuplicates()
spark_sales_person_df.createOrReplaceTempView("salesTempView")
spark_sales_df_filtered = spark.sql(sqlQuery="SELECT * FROM salesTempView WHERE left(sales_number,3) like 'SNI%'") #filtering invalid SalesNumber values

## Vehicle Make Spark DataFrame
spark_vehicle_make_df  = spark.read.option("delimiter",'|').option("header","true").csv("data/vehicle_make.csv").dropDuplicates()

## Vehicle Spark DataFrame with a slight valiue correction on some bad data on "fuel_type"
spark_vehicles_df = spark.read.option("delimiter",'|').option("header","true").csv("data/vehicles.csv")
spark_vehicles_df = spark_vehicles_df.withColumn("fuel_type", when(spark_vehicles_df["fuel_type"] == "hyrdogen", "hydrogen").otherwise(spark_vehicles_df["fuel_type"])) #corrected fuel_type value that is bad data .i.e "hyrdogen" and not "hydrogen" 

## Welcome call Spark DataFrame
spark_welcome_call_df = spark.read.option("delimiter",'|').option("header","true").csv("data/welcome_call.csv")

## Welcome Call (Deltas) Spark DataFrame
spark_welcome_call_deltas_df = spark.read.option("delimiter",'|').option("header","true").csv("data/welcome_call_deltas.csv")





#### Question: Did you need to perform any clean up on the dataframes. If yes. What cleanup operations did you perform

#### *Answer*: 


# 1  Removing rows with the invalid customer ID values 
>> spark_customers_df.createOrReplaceTempView("customerTempView")
>> spark_customers_df = spark.sql(sqlQuery="SELECT * FROM customerTempView WHERE left(customer_id,3) like 'CNI%'")


# 2 Removing rows with the invalid Dealer Code values
>> spark_dealers_df.createOrReplaceTempView("dealershipTempView")
>> spark_dealers_df_filtered = spark.sql(sqlQuery="SELECT * FROM dealershipTempView WHERE left(dealer_code,3) like 'DNI%'") #filtering invalid DealershipNumber values

# 3 Removing rows with the invalid SalesID values
>> spark_sales_person_df.createOrReplaceTempView("salesTempView")
>> spark_sales_df_filtered = spark.sql(sqlQuery="SELECT * FROM salesTempView WHERE left(sales_number,3) like 'SNI%'") #filtering invalid SalesNumber values

# 4 Making sure that the fuel_type column values are void of typos
>> spark_vehicles_df = spark_vehicles_df.withColumn("fuel_type", when(spark_vehicles_df["fuel_type"] == "hyrdogen", "hydrogen").otherwise(spark_vehicles_df["fuel_type"]))

# 5 Dropped Duplicates on 4 Datasets, namely:
 
>> "dealership"
   - Overall Count:	2598  
   - Without Duplicates:	2590
>> sales_person"
   - Overall Count:	30827 
   - Without Duplicates:	30602
>> vehicle_make"
   - Overall Count:	481 
   - Without Duplicates:	448
>> customer"
   - Overall Count:	288251 
   - Without Duplicates:	280989

# 6 Dropped Rows that were bad (specifically only having missing fields) for the customers Dataframe

   Overall Count:	288251 
   DropNa('Any') function:	15544
   DropNa('All') function:	288251




### Provide some statistical information about the data you just loaded


In [None]:
# Write code to provide statistical information about each dataframe that you just loaded.

# Using the Pandas.describe() function
spark_customers_df_filtered.toPandas().describe()
spark_welcome_call_df.toPandas().describe()
spark_vehicles_df.toPandas().describe()
spark_vehicle_make_df.toPandas().describe()
spark_sales_df_filtered.toPandas().describe()
spark_dealers_df_filtered.toPandas().describe()
spark_orders_df.toPandas().describe()
spark_plants_df.toPandas().describe()




### Perform all the relevant join operations between the datasets.

Hint! - Relationship between the datasets is mentioned above


In [None]:
# Perform the queries to perform the relevant dataframe join operations.
vehicles_list = ["order_number","vin","fuel_type","transmission_type","prod_plant_id","production_country","dealer_number","export_country","steering_wheel_position","cabin_door_count","engine_series","drive_type","price","currency"]

#Joining plant table to orders table

vehicle_plant_df = spark_vehicles_df[vehicles_list].join(spark_plants_df["plant_id","plant_name","country","iso_country_code"],spark_vehicles_df['prod_plant_id'] == spark_plants_df['plant_id'],"leftouter" ).drop('prod_plant_id','brand','production_country')

#  #Joining vehicle table to orders table
vehicle_plant_order_df = spark_vehicles_df.join(spark_orders_df,vehicle_plant_df['order_number']==spark_orders_df['order_number'],"inner" ).drop('iso_country_code','order_number')

#  #Joining orders table to welcome_call table
vehicle_order_reviews = spark_welcome_call_df["review_id","vin","order_number","review","ratings"].join(spark_orders_df,spark_welcome_call_df['order_number']==spark_orders_df['order_number'],"left").drop('order_number')
# vehicle_order_reviews.sort('ratings').show()

vehicle_order_reviews.createOrReplaceTempView("dealer_sentiment")
sentiment_per_dealership = spark.sql(sqlQuery="SELECT avg(ratings) over (partition by dealer_number order by review_id) average_rating_dealer, dealer_number  FROM dealer_sentiment group by dealer_number, ratings ,review_id")

# #Joining SalesRep Dataframe to Orders Dataframe based on the sales_rep reference ID
order_sales_rep = spark_orders_df.join(spark_sales_df_filtered['sales_number',"dealer_number","sex","employment_type"],spark_sales_df_filtered['sales_number']==spark_orders_df['sales_rep_number'],"left").drop('sales_number','dealer_number').drop('iso_country_code')

# ##Joining SalesRep data to dealership data
sales_rep_dealership = spark_sales_df_filtered["sales_number","first_name","last_name","personnel_number","employment_type","department_code","dealer_number","sex","date_of_birth","customer_address"].join(spark_dealers_df_filtered["dealer_code","dealer_company","dealer_address"],spark_sales_df_filtered['dealer_number']==spark_dealers_df_filtered['dealer_code'],"inner").drop('dealer_code')


#### Question: Did you perform any joins on the datasets. If yes, what joins. Also what information was available after the joins were performed.

#### *Answer*: 

# Joined Vehicle Dataframe with Plant Dataframe in order to map the Production Plant ID to a country and also to understand a plant's vehicle production patterns

# Joined Vehicle Dataframe with Orders Dataframe in order to get a Vehicle's Order status, CustomerID in order

# Joined Welcome Call Dataframe with Dealership Dataframe in order to get more info on customer sentiment at dealerships 

# The JOINS that were used were all LEFT JOINS, with the bigger table on the left side of the JOINS (in case Spark's Cost Based Optimization does not kick in as it should)

### Perform All the standard Exploratory Data Analysis in the sections that follow to provide information to the Business users about the data. Report your findings in the form of Graphs or Response statements

In [None]:
# Example: Write down the distribution of Vehicles by plant and iso_country and plot a bar graph

vehicle_plant_df.createOrReplaceTempView("vehicle_by_plant_dist")
vehicle_by_plant_dist = spark.sql("SELECT plant_id, iso_country_code, count(vin) cars_sold FROM vehicle_by_plant_dist group by 1,2 order by 1,2") #VIN distrbution by plant_id and iso_country_code

pandas_vehicle_by_plant_dist = vehicle_by_plant_dist.toPandas()

pivot_df = pandas_vehicle_by_plant_dist.pivot( columns=["iso_country_code","plant_id"], values="cars_sold").fillna(0)

# Plotting the g bar chart
pivot_df.plot(kind='bar', figsize=(12, 8), width=0.8)

# # Set labels and title
plt.xlabel("ID")
plt.ylabel("Units")
plt.title("Vehicles Sold by plant and country")
# Show the plot
plt.xticks(rotation=0)  # Keep x-axis labels horizontal
plt.legend(title='Vehicles Sold', bbox_to_anchor=(1.05, 1), loc='upper left')  # Position the legend outside the plot
plt.tight_layout()  # Adjust the plot to ensure everything fits
plt.show()

vehicle_order_reviews.createOrReplaceTempView("dealer_sentiment")
sentiment_per_dealership = spark.sql(sqlQuery="SELECT avg(ratings) over (partition by dealer_number order by review_id) average_rating_dealer, dealer_number, ratings, review_id  FROM dealer_sentiment group by dealer_number, ratings ,review_id")


sentiment_per_dealership_pandas = sentiment_per_dealership.toPandas()

pivot_df = sentiment_per_dealership_pandas.pivot(index="review_id" ,columns=["dealer_number"], values="ratings").fillna(0)

# Plotting the g bar chart
pivot_df.plot(kind='bar', figsize=(12, 8), width=0.8)

# # Set labels and title
plt.xlabel("ID")
plt.ylabel("Units")
plt.title("Vehicles Sold by plant and country")
# Show the plot
plt.xticks(rotation=0)  # Keep x-axis labels horizontal
plt.legend(title='Vehicles Sold', bbox_to_anchor=(1.05, 1), loc='upper left')  # Position the legend outside the plot
plt.tight_layout()  # Adjust the plot to ensure everything fits
plt.show()


In [None]:
# Write your own exploratory data analysis on the ingested dataframe and report on the different findings.
# also provide visual aids for each finding.


#  fixing matplotlib

In [None]:
# Use the matplotlib libraries or other graphing libraries and create charts to support your findings
import matplotlib

#  fixing matplotlib


### Report all your Findings:

Report your findings in bullet points.
Example: For illustration purpose only - replace below with your own findings and support with Evidence
1. The US plant manufactured the most number of vehicles in 2023 etc. There were 30,000 vehicles manufactured at the plant etc.

## `Acceptance Criteria`
The following acceptance criteria must be met:

1. Perform Exploratory data Analysis and present your results as observations.
2. Python Graph libraries must be used to plot graphs to support your findings.
3. Comment your notebook file with markdown indicating observations: and write statements to indicate your observations.
4. Perform Analysis fo the Data using Spark or Pandas