In [7]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("UsedCarPricePrediction").getOrCreate()
sc = spark.sparkContext

# Reading csv file in pyspark dataframe:
df = spark.read.csv(r"D:\DBDA\FINAL_PROJECT\v1\vehicles.csv",header=True, inferSchema=True)

df.printSchema()
df.count()

#Drop the independent columns :
columns_to_delete = ['id', 'url', 'region_url','VIN','image_url','description','county','lat','long','posting_date','size','state']
df1 = df.drop(*columns_to_delete)

# Show column after deleting:
df1.columns

#drop duplicated records :
df2 = df1.distinct()
df2.count()

#Calculate the percentage of null values for each column :
null_counts = df2.select([sum(col(column).isNull().cast('int')).alias(column) for column in df2.columns])
total_rows = df2.count()
null_percentages = null_counts.select([((col(column) / total_rows) * 100).alias(column + "_null_percentage") for column in df2.columns])
null_percentages.show() # show null value %

#Drop rows with null values in specified columns :
df3 = df2.na.drop(subset=['region','price','year','model','odometer','manufacturer','transmission','title_status','fuel'])
df3.count()

#Handling Missing Values with Categorical Encoding :
df4 = df3.fillna('unknown')

#handling cloumns :

#manufacturer :
# Define the list of top 20 manufacturers
manufacturer_values = ['nissan','honda','chevrolet','mercedes-benz','ram','dodge','ford','jeep','toyota','bmw','subaru','volkswagen','kia','cadillac','hyundai','lexus','audi','chrysler','acura','buick']

# Use when function to update the 'manufacturer' column
df5 = df4.withColumn('manufacturer', 
                   when(df4['manufacturer'].isin(manufacturer_values), df4['manufacturer'])
                   .otherwise('others'))

# region :
# Count the occurrences of each region value
manufacturer_counts = df5.groupBy('region').count()

# Sort the counts in descending order and select the top 50 region
top_manufacturers = manufacturer_counts.orderBy('count', ascending=False).limit(50)

# Extract the top 50 region values
manufacturer_values = [row['region'] for row in top_manufacturers.collect()]

# Use when function to update the 'region' column
df6 = df5.withColumn('region', 
                     when(df5['region'].isin(manufacturer_values), df5['region'])
                     .otherwise('others'))

#model :
# Count the occurrences of each model value
manufacturer_counts = df6.groupBy('model').count()

# Sort the counts in descending order and select the top 50 model
top_manufacturers = manufacturer_counts.orderBy('count', ascending=False).limit(50)

# Extract the top 50 model values
manufacturer_values = [row['model'] for row in top_manufacturers.collect()]

# Use when function to update the 'model' column
df7 = df6.withColumn('model',
                     when(df6['model'].isin(manufacturer_values), df6['model'])
                     .otherwise('others'))


# transmission :
names_to_match = ['automatic','manual','other','unknown'] 
df8 = df7.filter((col("transmission").isin(names_to_match)) )

#year :
#converting year, odometer, price column type to integer type:

df9 = df8.withColumn("year", col('year').cast("int"))
df10 = df9.withColumn("odometer", col('odometer').cast("int"))
df11 = df10.withColumn("price", col('price').cast("int"))
df11.printSchema()

# handling outliers :
#price:
# Calculate quartiles

price_percentiles = df11.approxQuantile("price", [0.15, 0.75], 0.01)
price_percentile15 = price_percentiles[0]
price_percentile75 = price_percentiles[1]

# Calculate IQR and upper/lower limits

price_iqr = price_percentile75 - price_percentile15
price_upper_limit = price_percentile75 + 1.5 * price_iqr
price_lower_limit = price_percentile15

# Filter DataFrame based on limits
df12 = df11.filter((col("price") < price_upper_limit) & (col("price") > price_lower_limit))


#odometer:
# Calculate percentiles
odometer_percentiles = df12.approxQuantile("odometer", [0.05, 0.25, 0.75], 0.01)
odometer_percentile05 = odometer_percentiles[0]
odometer_percentile25 = odometer_percentiles[1]
odometer_percentile75 = odometer_percentiles[2]

# Calculate IQR and upper/lower limits
odometer_iqr = odometer_percentile75 - odometer_percentile25
odometer_upper_limit = odometer_percentile75 + 1.5 * odometer_iqr
odometer_lower_limit = odometer_percentile05

# Filter DataFrame based on limits
df13 = df12.filter((col("odometer") < odometer_upper_limit) & (col("odometer") > odometer_lower_limit))

#year : removing year before 1996 based on barplot distribution :
# Filter DataFrame based on the condition
df14 = df13.where(df13['year'] > 1996)

# Drop records where year column has a value of 2022
df15 = df14.filter(df['year'] != 2022)

# adding new column 'car_age' based on purchase year and till 2022
df16 = df15.withColumn('car_age', 2024 - col('year'))

#droping year :
df17 = df16.drop('year')

root
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- posting_date: string (nu

In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler,StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

train, test = df16.randomSplit([0.75, 0.25])

numerical=["odometer","car_age"]

numerical_vector_assembler = VectorAssembler(inputCols=numerical,
                                             outputCol='numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)


scaler = StandardScaler(inputCol='numerical_feature_vector',
                        outputCol='scaled_numerical_feature_vector',
                        withStd=True, withMean=True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)


indexer = StringIndexer(inputCols=['manufacturer','model','condition','cylinders','fuel','title_status','transmission','drive','type','paint_color','region'],
                        outputCols=['manufacturer_index','m_i','co_i','cy_i','f_i','ts_i','tr_i','d_i','ty_i','p_i','r_i'],handleInvalid="keep")

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)



one_hot_encoder = OneHotEncoder(inputCols=['manufacturer_index','m_i','co_i','cy_i','f_i','ts_i','tr_i','d_i','ty_i','p_i','r_i'],
                                outputCols=['manufacturer_index_h','m_i_h','co_i_h','cy_i_h','f_i_h','ts_i_h','tr_i_h','d_i_h','ty_i_h','p_i_h','r_i_h'])

one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector',
                                       'manufacturer_index_h','m_i_h','co_i_h','cy_i_h','f_i_h','ts_i_h','tr_i_h','d_i_h','ty_i_h','p_i_h','r_i_h'],
                            outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)



lr = LinearRegression(featuresCol='final_feature_vector',
                      labelCol='price')


lr=lr.fit(train)
pred_train_df = lr.transform(train).withColumnRenamed('prediction',
                                                      'predicted_vehicle_value')

pred_train_df.show(5)

pred_test_df = lr.transform(test).withColumnRenamed('prediction','predicted_vehicle_value')

result=lr.evaluate(test)
print(result.r2)
unlabeled_data=test.select("final_feature_vector")
predictions=lr.transform(unlabeled_data)
predictions.show(20)

+-----------+-----+-------------+------+---------+-----------+----+--------+------------+------------+-------+-------+-----------+-------+------------------------+-------------------------------+------------------+---+----+----+---+----+----+---+----+---+----+--------------------+--------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------+---------------+--------------------+-----------------------+
|     region|price| manufacturer| model|condition|  cylinders|fuel|odometer|title_status|transmission|  drive|   type|paint_color|car_age|numerical_feature_vector|scaled_numerical_feature_vector|manufacturer_index|m_i|co_i|cy_i|f_i|ts_i|tr_i|d_i|ty_i|p_i| r_i|manufacturer_index_h|         m_i_h|       co_i_h|       cy_i_h|        f_i_h|       ts_i_h|       tr_i_h|        d_i_h|        ty_i_h|         p_i_h|          r_i_h|final_feature_vector|predicted_vehicle_value|
+-----------+-----+-------------+------+---------+------