# Vehicle Sales Data Analysis
This notebook outlines the code and results of the ten data analysis tasks for the BS3220 Parallel Programming assignment. 


### Import libraries and load data

In [181]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count, max, min, avg, col, rank, to_date, date_format, regexp_extract, row_number, first, last, format_number, substring, year, expr, quarter
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
spark = SparkSession.builder \
    .appName("VehicleSalesCleaning") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

df = spark.read.csv("car_prices.csv", header=True, inferSchema=True)


### Data cleaning

In [182]:
print("Original Data:")
df.show(5)

df = df.filter(df.sellingprice != 1)

df = df.dropna(subset=["make", "model", "trim", "body", "vin", "state", "condition", "odometer", "color", "interior", "seller", "mmr"])

# Extract the date part excluding time and timezone using regular expressions
df = df.withColumn("date_part", regexp_extract(col("saledate"), r"(\w+\s\w+\s\d+\s\d+)", 1))

print("Extracted Date Part:")
df.select("saledate", "date_part").show(5)

# Convert extracted string to date and format it to MMddyyyy
df = df.withColumn("formatted_saledate", date_format(to_date(col("date_part"), "EEE MMM dd yyyy"), "MMddyyyy"))
df = df.withColumn("sale_year", expr("substring(formatted_saledate, 5, 4)"))

df = df.withColumn("sale_month", expr("substring(formatted_saledate, 1, 2)"))


print("Transformed Data with Quarters:")
df.select("saledate", "formatted_saledate", "sale_year", "sale_month").show(5)

Original Data:
+----+-----+-------------------+----------+-----+------------+-----------------+-----+---------+--------+-----+--------+--------------------+-----+------------+--------------------+
|year| make|              model|      trim| body|transmission|              vin|state|condition|odometer|color|interior|              seller|  mmr|sellingprice|            saledate|
+----+-----+-------------------+----------+-----+------------+-----------------+-----+---------+--------+-----+--------+--------------------+-----+------------+--------------------+
|2015|  Kia|            Sorento|        LX|  SUV|   automatic|5xyktca69fg566472|   ca|        5|   16639|white|   black|kia motors americ...|20500|       21500|Tue Dec 16 2014 1...|
|2015|  Kia|            Sorento|        LX|  SUV|   automatic|5xyktca69fg561319|   ca|        5|    9393|white|   beige|kia motors americ...|20800|       21500|Tue Dec 16 2014 1...|
|2014|  BMW|           3 Series|328i SULEV|Sedan|   automatic|wba3c1c51ek11

### Task 1: Find the total sales for each item, both the number of units and the total price/cost

In [183]:
# df.groupBy("make", "model").agg(
#     count("vin").alias("units_sold"),
#     sum("sellingprice").alias("total_revenue")
# ).orderBy(col("units_sold").desc()).show(20)

### Task 2: Summarise the total sales of all items at each location

In [184]:
# salesPerState = df.groupBy("state").agg(
#     sum("sellingprice").alias("total_revenue"), count("vin").alias("total_sales")
# ).orderBy("total_revenue", ascending=False)

# salesPerState.show(55)

### Task 3: List all products and their combined sales, grouped by their location of sale.

In [185]:
# combined_sales_by_state_and_product = df.groupBy("state", "make", "model").agg(
#     count("vin").alias("total_sales"), sum("sellingprice").alias("totalrevenue")
# ).orderBy("state")

# combined_sales_by_state_and_product.show()

### Task 4: Show the sales numbers for the item which sold the most units at each location

In [186]:
# item_sales_by_state = df.groupBy("state", "make", "model", "trim").agg(
#     count("vin").alias("units_sold"),
#     sum("sellingprice").alias("total_revenue")
# )

# windowSpec = Window.partitionBy("state").orderBy(col("units_sold").desc())

# # Rank items within each state and select top item
# top_item_by_state = item_sales_by_state.withColumn("rank", rank().over(windowSpec)).filter(col("rank") == 1)
# top_item_by_state.show(55)

### Task 5: List all items that were sold within two months of your choosing

In [187]:
df_filtered = df.filter((col("formatted_saledate").substr(1,2) == "12") | (col("formatted_saledate").substr(1,2)=="06"))

salesByMonth = df_filtered.groupBy(col("formatted_saledate").substr(1,2).alias("month")).agg(count("vin").alias("total_sales")).orderBy("month")

df_filtered.select("make", "model", "saledate").show()
salesByMonth.show()

+---------+-------------------+--------------------+
|     make|              model|            saledate|
+---------+-------------------+--------------------+
|      Kia|            Sorento|Tue Dec 16 2014 1...|
|      Kia|            Sorento|Tue Dec 16 2014 1...|
|      BMW|6 Series Gran Coupe|Thu Dec 18 2014 1...|
|   Nissan|             Altima|Tue Dec 30 2014 1...|
|      BMW|                 M5|Wed Dec 17 2014 1...|
|Chevrolet|              Cruze|Tue Dec 16 2014 1...|
|     Audi|                 A4|Thu Dec 18 2014 1...|
|     Audi|                 A6|Tue Dec 16 2014 1...|
|      Kia|             Optima|Tue Dec 16 2014 1...|
|      Kia|            Sorento|Tue Dec 16 2014 1...|
|   Nissan|             Altima|Tue Dec 23 2014 1...|
|     Audi|                 Q5|Thu Dec 18 2014 1...|
|Chevrolet|             Camaro|Tue Dec 30 2014 1...|
|      BMW|           6 Series|Wed Dec 17 2014 1...|
|     Audi|                 A3|Thu Dec 18 2014 1...|
|Chevrolet|             Camaro|Thu Dec 18 2014

### Task 6: Identify the item which has the lowest overall sales, both for the dataset as a whole and for each sales location

In [188]:
# salesPerCar = df.groupBy("make", "model").agg(
#     count("*").alias("unitsSold")
# )
# salesPerCarPerState = df.groupBy("state", "make", "model").agg(
#     count("*").alias("unitsSold")
# )
# lowestCarSale = salesPerCar.orderBy("unitsSold").first()

# windowSpec = Window.partitionBy("state").orderBy("unitsSold")
# lowestCarSaleByState = salesPerCarPerState.withColumn("row_number", row_number().over(windowSpec)).filter(col("row_number") == 1)

# print("Lowest selling car:")
# print(lowestCarSale)
# print("Lowest Selling Car by State:")
# lowestCarSaleByState.show(55)

### Task 7: Find the most expensive and least expensive item for each location where sales occurred

In [189]:
# windowExpensive = Window.partitionBy("state").orderBy(col("sellingprice").desc())
# windowCheap = Window.partitionBy("state").orderBy(col("sellingprice").asc())

# dfRanked = df.withColumn("rank_desc", row_number().over(windowExpensive)).withColumn("rank_asc", row_number().over(windowCheap))

# mostExpensive = dfRanked.filter(col("rank_desc") == 1).select(
#     "state", "make", "model", "sellingprice"
# )
# leastExpensive = dfRanked.filter(col("rank_asc") == 1).select(
#     "state", "make", "model", "sellingprice"
# )

# print("Most Expensive Car Sale by State:")
# mostExpensive.show(10)
# print("Least Expensive Car Sale by State:")
# leastExpensive.show(10)




### Task 8: Calculate the average cost of an item at each location within your dataset

In [190]:
# averagePriceByState = df.groupBy("state").agg(
#     format_number(avg("sellingprice"), 2).alias("average_selling_price")
# )
# averagePriceByState = averagePriceByState.orderBy("state")

# print("Average Selling Price by State:")
# averagePriceByState.show(55)

### Task 9: Based on your individual dataset, create a set of variables which can be used as broadcast variables.

### Task 10: Complete one other query to analyse the data, based on your individual dataset.

In [191]:
# stateAverage = df.groupBy("state").agg(avg("sellingprice").alias("averageRevenuePerSale")).orderBy(col("averageRevenuePerSale").desc())

# #Formatting 2DP for revenue after ordering as was causing table to sort lowest first, not highest first for some reason
# formattedBestStateForAverageRevenue = stateAverage.withColumn(
#     "formattedAverageRevenue", format_number(col("averageRevenuePerSale"), 2)
# )
# print("Best average revenue per sale:")
# formattedBestStateForAverageRevenue.select("state", "formattedAverageRevenue").show(55)

### Machine Learning

Monthly Sales Data for 2015:
+-----+-----+
|month|sales|
+-----+-----+
+-----+-----+

+--------+-----+
|features|sales|
+--------+-----+
+--------+-----+



Py4JJavaError: An error occurred while calling o3301.fit.
: java.lang.AssertionError: assertion failed: Training dataset is empty.
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.ml.optim.WeightedLeastSquares$Aggregator.validate(WeightedLeastSquares.scala:425)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:108)
	at org.apache.spark.ml.regression.LinearRegression.trainWithNormal(LinearRegression.scala:456)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:354)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:329)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:186)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:114)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:78)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
