In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    DoubleType,
)

In [14]:
spark = SparkSession.builder.appName("BMW Sales Analysis").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [17]:
schema = StructType(
    [
        StructField("Year", IntegerType(), True),
        StructField("Month", IntegerType(), True),
        StructField("Model", StringType(), True),
        StructField("Units_Sold", IntegerType(), True),
        StructField("Price", DoubleType(), True),
        StructField("Revenue", DoubleType(), True),
        StructField("Dealer", StringType(), True),
        StructField("Region", StringType(), True),
        StructField("Fuel_Type", StringType(), True),
        StructField("Transmission", StringType(), True),
    ]
)

df = spark.read.csv(
    path="dataset/bmw_car_sales_2010_2024.csv",
    schema=schema,
    header=True,
    mode="PERMISSIVE",
)

In [19]:
df.show(10)
df.printSchema()

+----+-----+--------+----------+---------+------------------+---------+-------------+---------+------------+
|Year|Month|   Model|Units_Sold|    Price|           Revenue|   Dealer|       Region|Fuel_Type|Transmission|
+----+-----+--------+----------+---------+------------------+---------+-------------+---------+------------+
|2020|    2|3 Series|        24| 47143.54|        1131444.96|Dealer_16|       Europe|   Diesel|      Manual|
|2020|   12|      i8|        16|129651.43|        2074422.88|Dealer_28|North America|   Petrol|      Manual|
|2013|    4|      i8|        22|151284.79|3328265.3800000004|Dealer_36|       Europe| Electric|      Manual|
|2017|   10|      X3|        26|  54855.8|         1426250.8| Dealer_1|       Europe| Electric|   Automatic|
|2014|    3|      X1|        16| 41550.74|         664811.84|Dealer_22|North America|   Petrol|   Automatic|
|2011|    6|      X5|        18| 70272.66|1264907.8800000001| Dealer_3|       Africa| Electric|      Manual|
|2024|    7|5 Serie

In [25]:
df.createOrReplaceTempView("BMW")

In [26]:
sales_per_year = spark.sql(
    """
    select year, sum(units_sold) as units_per_year from bmw 
    group by year 
    order by year desc
    """
)
sales_per_year.show()

+----+--------------+
|year|units_per_year|
+----+--------------+
|2024|          8151|
|2023|          7250|
|2022|          8074|
|2021|          7868|
|2020|          8031|
|2019|          6922|
|2018|          7562|
|2017|          7737|
|2016|          7710|
|2015|          7278|
|2014|          7621|
|2013|          7311|
|2012|          7657|
|2011|          8261|
|2010|          7983|
+----+--------------+



In [39]:
revenue_region_year = spark.sql(
    """
    select year, region, round(sum(revenue)) as total_revenue_per_region 
    from BMW
    group by year, region
    order by year
    """
)
revenue_region_year.show()

+----+-------------+------------------------+
|year|       region|total_revenue_per_region|
+----+-------------+------------------------+
|2010|       Europe|             9.2958486E7|
|2010|North America|             9.3113978E7|
|2010|       Africa|             9.9904804E7|
|2010|         Asia|             7.9502164E7|
|2010|South America|             8.1351226E7|
|2010|  Middle East|            1.01642761E8|
|2011|  Middle East|             8.6705063E7|
|2011|North America|             9.5960755E7|
|2011|       Africa|             8.4748241E7|
|2011|       Europe|            1.04979648E8|
|2011|South America|            1.12666376E8|
|2011|         Asia|             9.7598354E7|
|2012|South America|             9.4403674E7|
|2012|  Middle East|             8.1509936E7|
|2012|         Asia|             9.2250921E7|
|2012|       Africa|             9.6003338E7|
|2012|North America|            1.02305952E8|
|2012|       Europe|             8.3969034E7|
|2013|       Europe|             6

In [43]:
models_units = spark.sql(
    """
    select model, sum(units_sold) units_sold from BMW
    group by model
    order by units_sold desc
    limit 5
    """
)
models_units.show()

+--------+----------+
|   model|units_sold|
+--------+----------+
|      M3|     10203|
|      X6|      9933|
|      X3|      9714|
|      i8|      9713|
|5 Series|      9662|
+--------+----------+



In [45]:
monthly_season_check = spark.sql(
    """
    select month, year, sum(units_sold) as units_sold
    from bmw
    group by month, year
    order by units_sold desc
    """
)
monthly_season_check.show()

+-----+----+----------+
|month|year|units_sold|
+-----+----+----------+
|   12|2020|       975|
|   10|2022|       944|
|    9|2011|       934|
|    8|2022|       893|
|    6|2018|       891|
|   10|2016|       852|
|    8|2021|       850|
|    8|2024|       847|
|    1|2024|       841|
|    4|2012|       836|
|    4|2010|       823|
|   12|2018|       817|
|   11|2016|       814|
|    3|2023|       806|
|    3|2011|       797|
|   10|2024|       794|
|    8|2014|       794|
|    7|2011|       790|
|    3|2017|       781|
|    8|2011|       780|
+-----+----+----------+
only showing top 20 rows


In [51]:
avg_price_per_year = spark.sql(
    """
    select year, round(avg(price)) as avg_price 
    from bmw
    group by year
    order by year
    """
)
avg_price_per_year.show()

+----+---------+
|year|avg_price|
+----+---------+
|2010|  69186.0|
|2011|  70760.0|
|2012|  71607.0|
|2013|  73974.0|
|2014|  71214.0|
|2015|  69887.0|
|2016|  69444.0|
|2017|  70845.0|
|2018|  70872.0|
|2019|  72052.0|
|2020|  71094.0|
|2021|  72136.0|
|2022|  72139.0|
|2023|  69374.0|
|2024|  72584.0|
+----+---------+

