In [29]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import pyspark.sql.functions as func
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.clustering import KMeans
spark=SparkSession.builder.appName('Cars').getOrCreate()
from pyspark.sql.types import IntegerType


In [2]:
path = "CIS_Automotive_Kaggle_Sample.csv"

spark = SparkSession.builder.appName("Automotive1").getOrCreate()
df = spark.read.option("header",'True').option('delimiter', ',').csv(path)

In [3]:
df.show(5)

+--------------------+--------+----------+----------+-----+--------+-------+-----+-------------------+-------------+----------+--------------+--------+--------+----------------------+------------------------+----------------------+---------------------+----------------------+-------------------+--------------------+--------------------+-----------------------+--------------------+--------------------+-----------------------------------+--------------------+--------+------------+-----------+--------------+---------------+--------------+-------------+----------------+-----------------+---------------+--------------+-----------+--------------+--------------+--------------+---------------+--------------------+--------------------+------------------+------------------+---------------------+------------+--------------+-----------+--------+------------------+---------------+-----------------+--------------+---------------+-----------------------+----------------------+--------------------+---

In [4]:
columns = ["vin","msrp","askPrice","brandName","modelName"]
selected_df = df.select(columns)
selected_df.show(5)

+--------------------+-----+--------+----------+--------------+
|                 vin| msrp|askPrice| brandName|     modelName|
+--------------------+-----+--------+----------+--------------+
|abc5f0360059cf7b6...| 1498|    1498|MITSUBISHI|Eclipse Spyder|
|e24402cc77f6fd2d6...|10589|   10589|    NISSAN|        Altima|
|1c5a8dc966b3d3b37...|11992|    9940|      FORD|        Escape|
|edce23814c88f5a1d...|12387|   12387| CHEVROLET|         Cruze|
|6b4f8c610d14d864f...|41659|   41659|      FORD|         F-150|
+--------------------+-----+--------+----------+--------------+
only showing top 5 rows



In [5]:
from pyspark.sql.functions import col,isnan, when, count
selected_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in selected_df.columns]
   ).show()

+---+----+--------+---------+---------+
|vin|msrp|askPrice|brandName|modelName|
+---+----+--------+---------+---------+
|  0|   0|       0|     1260|     5843|
+---+----+--------+---------+---------+



In [6]:
cleaned_df = selected_df.dropna(subset=["brandName", "modelName"])

In [7]:
from pyspark.sql.functions import col,isnan, when, count
cleaned_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cleaned_df.columns]
   ).show()

+---+----+--------+---------+---------+
|vin|msrp|askPrice|brandName|modelName|
+---+----+--------+---------+---------+
|  0|   0|       0|        0|        0|
+---+----+--------+---------+---------+



In [8]:
cleaned_df.show()

+--------------------+-----+--------+----------+---------------+
|                 vin| msrp|askPrice| brandName|      modelName|
+--------------------+-----+--------+----------+---------------+
|abc5f0360059cf7b6...| 1498|    1498|MITSUBISHI| Eclipse Spyder|
|e24402cc77f6fd2d6...|10589|   10589|    NISSAN|         Altima|
|1c5a8dc966b3d3b37...|11992|    9940|      FORD|         Escape|
|edce23814c88f5a1d...|12387|   12387| CHEVROLET|          Cruze|
|6b4f8c610d14d864f...|41659|   41659|      FORD|          F-150|
|7a7752d920ebee107...|12970|   12970|      FORD|         Fusion|
|ec634358248242175...|15218|   15218|      FORD|         Escape|
|f64e8bdce175d66b9...|18755|   18755|      FORD|           Flex|
|45c4fc09de25f741e...|36999|   36999|      FORD|       Explorer|
|d1795f984bd6e4364...|18276|   18276|      FORD|         Escape|
|6e2afe5b217193bb3...|22140|   22140|      FORD|         Escape|
|45c4fc09de25f741e...|    0|       0|      FORD|       Explorer|
|f24a5358ce834ef52...|144

In [9]:
index1 = StringIndexer(inputCol="brandName", outputCol="brandname")
indexerModel = index1.fit(cleaned_df)


In [10]:
indexed_df = indexerModel.transform(cleaned_df)
indexed_df.show()

+--------------------+-----+--------+---------+---------------+
|                 vin| msrp|askPrice|brandname|      modelName|
+--------------------+-----+--------+---------+---------------+
|abc5f0360059cf7b6...| 1498|    1498|     25.0| Eclipse Spyder|
|e24402cc77f6fd2d6...|10589|   10589|      4.0|         Altima|
|1c5a8dc966b3d3b37...|11992|    9940|      1.0|         Escape|
|edce23814c88f5a1d...|12387|   12387|      0.0|          Cruze|
|6b4f8c610d14d864f...|41659|   41659|      1.0|          F-150|
|7a7752d920ebee107...|12970|   12970|      1.0|         Fusion|
|ec634358248242175...|15218|   15218|      1.0|         Escape|
|f64e8bdce175d66b9...|18755|   18755|      1.0|           Flex|
|45c4fc09de25f741e...|36999|   36999|      1.0|       Explorer|
|d1795f984bd6e4364...|18276|   18276|      1.0|         Escape|
|6e2afe5b217193bb3...|22140|   22140|      1.0|         Escape|
|45c4fc09de25f741e...|    0|       0|      1.0|       Explorer|
|f24a5358ce834ef52...|14496|   14496|   

In [11]:
index2 =  StringIndexer(inputCol="modelName", outputCol="modelname")

indexerModel = index2.fit(indexed_df)


In [12]:
indexed_df = indexerModel.transform(indexed_df)
indexed_df.show()

+--------------------+-----+--------+---------+---------+
|                 vin| msrp|askPrice|brandname|modelname|
+--------------------+-----+--------+---------+---------+
|abc5f0360059cf7b6...| 1498|    1498|     25.0|    528.0|
|e24402cc77f6fd2d6...|10589|   10589|      4.0|     19.0|
|1c5a8dc966b3d3b37...|11992|    9940|      1.0|      3.0|
|edce23814c88f5a1d...|12387|   12387|      0.0|     16.0|
|6b4f8c610d14d864f...|41659|   41659|      1.0|      0.0|
|7a7752d920ebee107...|12970|   12970|      1.0|      9.0|
|ec634358248242175...|15218|   15218|      1.0|      3.0|
|f64e8bdce175d66b9...|18755|   18755|      1.0|    131.0|
|45c4fc09de25f741e...|36999|   36999|      1.0|      7.0|
|d1795f984bd6e4364...|18276|   18276|      1.0|      3.0|
|6e2afe5b217193bb3...|22140|   22140|      1.0|      3.0|
|45c4fc09de25f741e...|    0|       0|      1.0|      7.0|
|f24a5358ce834ef52...|14496|   14496|      3.0|     49.0|
|d719f73e9f670abb6...|10222|   10222|      1.0|    102.0|
|92c36195ba5a4

In [15]:
indexed_df = indexed_df.withColumn('msrp', df['msrp'].cast(IntegerType()))
indexed_df = indexed_df.withColumn('askPrice', df['askPrice'].cast(IntegerType()))

+---------+------+
|brandname| count|
+---------+------+
|      8.0|218559|
|      0.0|889692|
|      7.0|231989|
|     49.0|   363|
|     29.0| 16609|
|     47.0|   401|
|     42.0|  1200|
|     44.0|   850|
|     35.0|  3427|
|     18.0| 97385|
|      1.0|781769|
|     39.0|  1811|
|     34.0|  3897|
|     37.0|  2645|
|     25.0| 39089|
|     36.0|  2882|
|      4.0|312876|
|     23.0| 41361|
|     50.0|   251|
|     45.0|   800|
+---------+------+
only showing top 20 rows



In [16]:
indexed_df.dtypes

[('vin', 'string'),
 ('msrp', 'int'),
 ('askPrice', 'int'),
 ('brandname', 'double'),
 ('modelname', 'double')]

In [22]:
assembler = VectorAssembler(inputCols = [
     "askPrice", "modelname",'msrp'],
                            outputCol="features")
train = assembler.transform(indexed_df).select('brandname', 'features')
train.show(truncate = False, n=2)

+---------+----------------------+
|brandname|features              |
+---------+----------------------+
|25.0     |[1498.0,528.0,1498.0] |
|4.0      |[10589.0,19.0,10589.0]|
+---------+----------------------+
only showing top 2 rows



In [23]:
train_df,test_df = train.randomSplit([0.8,0.2],seed =42)

In [25]:
nv = NaiveBayes(labelCol='brandname',featuresCol='features' )
model = nv.fit(train_df)

In [28]:
predictions_nv = model.transform(test_df)
predictions_nv.select("features", "brandname", "prediction").show()

+-------------+---------+----------+
|     features|brandname|prediction|
+-------------+---------+----------+
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
|[0.0,1.0,0.0]|      0.0|      11.0|
+-------------+---------+----------+
only showing top 20 rows



In [31]:

evaluator = MulticlassClassificationEvaluator(labelCol="brandname", predictionCol="prediction", metricName="accuracy")
accuracy_nv = evaluator.evaluate(predictions_nv)
print(f"Test Accuracy Naive Bayes : {accuracy_nv}")

Test Accuracy Naive Bayes : 0.012497936909824522


In [32]:
precision_evaluator = MulticlassClassificationEvaluator(labelCol="brandname", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions_nv)

recall_evaluator = MulticlassClassificationEvaluator(labelCol="brandname", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions_nv)

f1_evaluator = MulticlassClassificationEvaluator(labelCol="brandname", predictionCol="prediction", metricName="f1")
f1 = f1_evaluator.evaluate(predictions_nv)

print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")


Precision: 0.07138777992704061
Recall: 0.012497936909824533
F1 Score: 0.0062099811892053275


TypeError: cannot pickle '_thread.RLock' object