In [4]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


In [5]:
# initialize spark session
spark = SparkSession.builder \
    .appName("Car Price Analysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [6]:
# define paths
input_path = r"C:/Users/acdsa/Desktop/BDA/lab2/Lab2/lab4/car_price_dataset.csv"
output_csv = r"C:/Users/acdsa/Desktop/BDA/lab2/Lab2/lab4/filtered_car_prices.csv"
output_json = r"C:/Users/acdsa/Desktop/BDA/lab2/Lab2/lab4/top_brands.json"
output_txt = r"C:/Users/acdsa/Desktop/BDA/lab2/Lab2/lab4/summary.txt"

In [7]:
# delete old output files
for path in [output_csv, output_json, output_txt]:
    if os.path.exists(path):
        try:
            os.remove(path)
        except PermissionError:
            print(f"Warning: Cannot delete {path}. It may be open in another program.")

In [8]:

# load dataset
df = spark.read.csv(input_path, header=True, inferSchema=True)

In [9]:
# preprocessing
df = df.select(
    col("Brand").alias("brand"),
    col("Model").alias("model"),
    col("Year").alias("year"),
    col("Price").alias("price"),
    col("Mileage").alias("mileage"),
    col("Fuel_Type").alias("fuel_type"),
    col("Transmission").alias("transmission")
).dropna()

df = df.withColumn("year", col("year").cast("int"))


In [10]:
# filtering
filtered_df = df.filter(col("price") > 5000)

# SQL Queries
df.createOrReplaceTempView("car_data")

In [11]:
# extract top 5 most Frequent Car Brands
top_brands = spark.sql("""
    SELECT brand, COUNT(*) as count 
    FROM car_data 
    GROUP BY brand 
    ORDER BY count DESC 
    LIMIT 5
""")

In [12]:
# average price per brand
avg_price_per_brand = spark.sql("""
    SELECT brand, AVG(price) as avg_price 
    FROM car_data 
    GROUP BY brand 
    ORDER BY avg_price DESC
""")

In [13]:
# save results as CSV file
try:
    filtered_df.toPandas().to_csv(output_csv, index=False)
    print(f" CSV saved: {output_csv}")
except Exception as e:
    print(f" Error saving CSV: {e}")

 CSV saved: C:/Users/acdsa/Desktop/BDA/lab2/Lab2/lab4/filtered_car_prices.csv


In [14]:
# save SQL Query results as JSON
try:
    top_brands_pd = top_brands.toPandas() 
    top_brands_pd.to_json(output_json, orient="records", indent=4)
    print(f" JSON saved: {output_json}")
except Exception as e:
    print(f" Error saving JSON: {e}")

 JSON saved: C:/Users/acdsa/Desktop/BDA/lab2/Lab2/lab4/top_brands.json


In [15]:
# save summary as TXT
try:
    with open(output_txt, "w", encoding="utf-8") as f:
        f.write("Lab 4 Analysis - Car Data\n\n")
        f.write("- Top 5 Most Frequent Car Brands:\n")
        for row in top_brands.collect():
            f.write(f"{row['brand']}: {row['count']} listings\n")
        f.write("\n- Average Price Per Brand:\n")
        for row in avg_price_per_brand.collect():
            f.write(f"{row['brand']}: ${round(row['avg_price'], 2)}\n")
    print(f" TXT Summary saved: {output_txt}")
except Exception as e:
    print(f" Error saving TXT: {e}")

 TXT Summary saved: C:/Users/acdsa/Desktop/BDA/lab2/Lab2/lab4/summary.txt


In [16]:
spark.stop()