In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DoubleType,
)

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("BMW_Sales_Analysis_DEV")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")
# Tip to reader: use WARN for development, ERROR in prod

In [4]:
schema = StructType(
    [
        StructField("model", StringType(), nullable=False),
        StructField("year", IntegerType(), nullable=False),
        StructField("region", StringType(), nullable=False),
        StructField("color", StringType(), nullable=True),
        StructField("fuel_type", StringType(), nullable=True),
        StructField("transmission", StringType(), nullable=True),
        StructField("engine_size_l", DoubleType(), nullable=True),
        StructField("mileage_km", IntegerType(), nullable=True),
        StructField("price_usd", DoubleType(), nullable=False),
        StructField("sales_volume", IntegerType(), nullable=True),
        StructField("sales_classification", StringType(), nullable=True),
    ]
)

In [5]:
df = spark.read.csv(
    path="dataset/BMW sales data (2010-2024).xls",
    schema=schema,
    header=True,
    mode="PERMISSIVE",
)

In [6]:
df.count()  # number of rows.

50000

In [7]:
df.printSchema()

root
 |-- model: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- color: string (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- engine_size_l: double (nullable = true)
 |-- mileage_km: integer (nullable = true)
 |-- price_usd: double (nullable = true)
 |-- sales_volume: integer (nullable = true)
 |-- sales_classification: string (nullable = true)



In [8]:
df.createOrReplaceTempView("BMW_dataset")

In [9]:
avg_price_per_region = spark.sql(
    """
    select region, 
           round(avg(price_usd), 2) as avg_price,
           count(*) as num_cars
    from bmw_dataset
    group by region
    order by avg_price desc
    """
)
avg_price_per_region.show()

+-------------+---------+--------+
|       region|avg_price|num_cars|
+-------------+---------+--------+
|         Asia| 75554.93|    8454|
|North America| 75070.05|    8335|
|       Europe| 74988.36|    8334|
|South America|  74973.6|    8251|
|       Africa| 74885.77|    8253|
|  Middle East| 74726.79|    8373|
+-------------+---------+--------+



In [10]:
top_selling_models = spark.sql(
    """
    select model, 
           sum(sales_volume) as sales_volume,
           rank() over(order by sum(sales_volume) desc) as sales_rank
    from bmw_dataset
    group by model
    order by sales_rank
    """
)
top_selling_models.show()

+--------+------------+----------+
|   model|sales_volume|sales_rank|
+--------+------------+----------+
|7 Series|    23786466|         1|
|      i8|    23423891|         2|
|      X1|    23406060|         3|
|3 Series|    23281303|         4|
|      i3|    23133849|         5|
|5 Series|    23097519|         6|
|      M5|    22779688|         7|
|      X3|    22745529|         8|
|      X5|    22709749|         9|
|      X6|    22661986|        10|
|      M3|    22349694|        11|
+--------+------------+----------+



In [11]:
count_by_fuel_type = spark.sql(
    """
    select fuel_type,
           count(model) as no_of_cars
    from bmw_dataset
    where fuel_type is not null
    group by fuel_type
    order by no_of_cars desc
    """
)
count_by_fuel_type.show()

+---------+----------+
|fuel_type|no_of_cars|
+---------+----------+
|   Hybrid|     12716|
|   Petrol|     12550|
| Electric|     12471|
|   Diesel|     12263|
+---------+----------+



In [12]:
high_mil_low_price = spark.sql(
    """
    select *, 
           rank() over(order by price_usd desc) as price_rank
    from bmw_dataset
    where mileage_km >= 100000 
          and
          price_usd <= 40000 
    order by price_usd desc
    """
)
high_mil_low_price.show(5)

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+----------+
|   model|year|       region| color|fuel_type|transmission|engine_size_l|mileage_km|price_usd|sales_volume|sales_classification|price_rank|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+----------+
|      X3|2021|  Middle East| Black|   Diesel|      Manual|          3.8|    169642|  39999.0|        5319|                 Low|         1|
|5 Series|2011|       Europe| White| Electric|      Manual|          4.7|    103523|  39997.0|        8048|                High|         2|
|      X5|2013|       Europe|  Blue|   Hybrid|      Manual|          3.7|    132002|  39993.0|        9070|                High|         3|
|7 Series|2017|  Middle East|  Blue|   Diesel|   Automatic|          4.5|    135422|  39992.0|        6711|                 Low|         4|
|7 Series|2010|South

In [13]:
transmission_distr_per_region = spark.sql(
    """
    select region, 
           sum(case when transmission = 'Manual' then 1 else 0 end) as manuals,
           sum(case when transmission = 'Automatic' then 1 else 0 end) as automatics,
           count(*) as total_cars
    from bmw_dataset
    group by region    
    """
)
transmission_distr_per_region.show()

+-------------+-------+----------+----------+
|       region|manuals|automatics|total_cars|
+-------------+-------+----------+----------+
|       Europe|   4217|      4117|      8334|
|       Africa|   4132|      4121|      8253|
|North America|   4163|      4172|      8335|
|South America|   4218|      4033|      8251|
|  Middle East|   4228|      4145|      8373|
|         Asia|   4196|      4258|      8454|
+-------------+-------+----------+----------+



In [14]:
engine_size_analysis = spark.sql(
    """
    select fuel_type,
        cast(year as int) as year,
        round(avg(cast(engine_size_l as double)), 2) as avg_engine_size,
        count(*) as num_cars
    from bmw_dataset
    where cast(year as int) >= 2015
    group by fuel_type, cast(year as int)
    order by year asc, fuel_type
    """
)
engine_size_analysis.show(5)

+---------+----+---------------+--------+
|fuel_type|year|avg_engine_size|num_cars|
+---------+----+---------------+--------+
|   Diesel|2015|           3.31|     803|
| Electric|2015|           3.18|     826|
|   Hybrid|2015|           3.27|     869|
|   Petrol|2015|           3.23|     860|
|   Diesel|2016|           3.22|     842|
+---------+----+---------------+--------+
only showing top 5 rows


Since I group by both fuel_type and year, I am expected to get multiple rows per year (one per fuel type). If you want only one row per year, you need to drop `fuel_type` from group by.

Besides this, I also have casted them to double and integers. I had already provided schema, but who knows what can happen?:) 

In [15]:
popular_color_per_region = spark.sql(
    """
    select region, color, count(model) no_of_models
    from bmw_dataset
    group by region, color
    order by no_of_models desc
"""
)
popular_color_per_region.show(5)

+-------------+------+------------+
|       region| color|no_of_models|
+-------------+------+------------+
|       Europe| Black|        1473|
|North America|   Red|        1461|
|         Asia| Black|        1460|
|North America|Silver|        1435|
|  Middle East|  Grey|        1429|
+-------------+------+------------+
only showing top 5 rows


In [16]:
yearly_sales = spark.sql(
    """
    select year, sum(sales_volume) as sales_volume
    from bmw_dataset
    group by year
    order by sales_volume desc
    """
)
yearly_sales.show(5)

+----+------------+
|year|sales_volume|
+----+------------+
|2022|    17920946|
|2024|    17527854|
|2019|    17191956|
|2015|    17010207|
|2014|    16958960|
+----+------------+
only showing top 5 rows


At this point, I finally understood that the data is **synthetic/unrealistic.** You can still analyze relative trends (which models sell more vs less in your dataset, how averages change by year, etc.), but don’t treat the absolute numbers as real-world BMW sales.

In [17]:
price_vs_mileage_corr = spark.sql(
    """
    select model, mileage_km, price_usd
    from bmw_dataset
    order by price_usd desc
    """
)
price_vs_mileage_corr.show(10)

+--------+----------+---------+
|   model|mileage_km|price_usd|
+--------+----------+---------+
|      i8|    115320| 119998.0|
|      i8|    163849| 119997.0|
|      X6|    142419| 119997.0|
|      X1|    172950| 119996.0|
|3 Series|     12264| 119994.0|
|      i8|     26622| 119992.0|
|      X6|     27540| 119988.0|
|5 Series|    181043| 119988.0|
|      X1|    146281| 119988.0|
|      X6|     95648| 119986.0|
+--------+----------+---------+
only showing top 10 rows


In [18]:
price_vs_mileage_corr = df.stat.corr("mileage_km", "price_usd")
print("Correlation between mileage and price:", price_vs_mileage_corr)

Correlation between mileage and price: -0.00423819457462334


- Close to -1 --> strong negative correlation (higher mileage --> lower price) which is quite **realistic**.
- Close to 0 --> no relationship.
- Close to +1 --> strong positive correlation (rare for price vs mileage).

In [19]:
ranking_expensiv_cars = spark.sql(
    """
    select *
    from (
        select region,
               model,
               price_usd,
               dense_rank() over (partition by region order by price_usd desc) as ranking
        from bmw_dataset
    ) t
    where ranking <= 3
    order by region, ranking
    """
)
ranking_expensiv_cars.show()

+-------------+--------+---------+-------+
|       region|   model|price_usd|ranking|
+-------------+--------+---------+-------+
|       Africa|      i8| 119997.0|      1|
|       Africa|      X1| 119996.0|      2|
|       Africa|      X6| 119988.0|      3|
|       Africa|      X1| 119988.0|      3|
|         Asia|      X6| 119997.0|      1|
|         Asia|5 Series| 119988.0|      2|
|         Asia|7 Series| 119978.0|      3|
|       Europe|      i8| 119985.0|      1|
|       Europe|5 Series| 119981.0|      2|
|       Europe|      X1| 119961.0|      3|
|  Middle East|      i8| 119998.0|      1|
|  Middle East|3 Series| 119994.0|      2|
|  Middle East|5 Series| 119978.0|      3|
|North America|      i8| 119992.0|      1|
|North America|3 Series| 119970.0|      2|
|North America|5 Series| 119963.0|      3|
|South America|      X6| 119986.0|      1|
|South America|3 Series| 119982.0|      2|
|South America|      X6| 119981.0|      3|
+-------------+--------+---------+-------+



In [20]:
recent_car_per_model = spark.sql(
    """
    select * from (
        select distinct model, 
            year,
            rank() over(partition by model order by year desc) 
                            as year_ranking
        from bmw_dataset) t
    where year_ranking = 1
    """
)
recent_car_per_model.show()

+--------+----+------------+
|   model|year|year_ranking|
+--------+----+------------+
|3 Series|2024|           1|
|5 Series|2024|           1|
|7 Series|2024|           1|
|      M3|2024|           1|
|      M5|2024|           1|
|      X1|2024|           1|
|      X3|2024|           1|
|      X5|2024|           1|
|      X6|2024|           1|
|      i3|2024|           1|
|      i8|2024|           1|
+--------+----+------------+



Now, I will try to find all cars whose `Price_USD` is above the overall average price of all cars.

In [21]:
above_average = spark.sql(
    """
    select model, year, price_usd
    from (
           select *, 
                avg(price_usd) over(partition by model) as avg_price
           from bmw_dataset 
    ) t
    where price_usd > avg_price
    """
)
above_average.show()

+--------+----+---------+
|   model|year|price_usd|
+--------+----+---------+
|3 Series|2012| 117995.0|
|3 Series|2023|  86402.0|
|3 Series|2010|  86660.0|
|3 Series|2020|  78509.0|
|3 Series|2017|  86684.0|
|3 Series|2020|  79774.0|
|3 Series|2024| 113482.0|
|3 Series|2019| 108637.0|
|3 Series|2024|  79650.0|
|3 Series|2012|  96429.0|
|3 Series|2013| 101189.0|
|3 Series|2022| 107968.0|
|3 Series|2023|  84635.0|
|3 Series|2015|  89156.0|
|3 Series|2021|  87186.0|
|3 Series|2017| 100678.0|
|3 Series|2023| 101460.0|
|3 Series|2021| 112265.0|
|3 Series|2012|  99478.0|
|3 Series|2017| 115369.0|
+--------+----+---------+
only showing top 20 rows


In [22]:
second_most_popular_color = spark.sql(
    """
    select color
    from (
        select color,
            rank() over(order by count(model) desc) as rank
        from bmw_dataset
        group by color
    ) t
    where rank = 2
    """
)
second_most_popular_color.show()

+------+
| color|
+------+
|Silver|
+------+

