In [11]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

In [3]:
spark = SparkSession.builder.appName("Practise").getOrCreate()
spark

23/06/21 20:04:14 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Read Dataset

In [19]:
df = spark.read.csv("ford.csv", header=True, inferSchema=True)
df.show(5)

+-------+----+-----+------------+-------+--------+---+----+----------+
|  model|year|price|transmission|mileage|fuelType|tax| mpg|engineSize|
+-------+----+-----+------------+-------+--------+---+----+----------+
| Fiesta|2017|12000|   Automatic|  15944|  Petrol|150|57.7|       1.0|
|  Focus|2018|14000|      Manual|   9083|  Petrol|150|57.7|       1.0|
|  Focus|2017|13000|      Manual|  12456|  Petrol|150|57.7|       1.0|
| Fiesta|2019|17500|      Manual|  10460|  Petrol|145|40.3|       1.5|
| Fiesta|2019|16500|   Automatic|   1482|  Petrol|145|48.7|       1.0|
+-------+----+-----+------------+-------+--------+---+----+----------+
only showing top 5 rows



In [10]:
df.describe().show()

+-------+------+------------------+------------------+------------+------------------+--------+------------------+------------------+-------------------+
|summary| model|              year|             price|transmission|           mileage|fuelType|               tax|               mpg|         engineSize|
+-------+------+------------------+------------------+------------+------------------+--------+------------------+------------------+-------------------+
|  count| 17966|             17966|             17966|       17966|             17966|   17966|             17966|             17966|              17966|
|   mean|  null|2016.8664699988867|12279.534843593454|        null|23362.608760992985|    null|113.32945563842814| 57.90697985082969| 1.3508070800400782|
| stddev|  null|2.0503359784876998| 4741.343657354183|        null| 19472.05434910513|    null|62.012456193801725|10.125695709164098|0.43236727240069733|
|    min| B-MAX|              1996|               495|   Automatic|         

In [29]:
#Get Data Type
df.printSchema()

root
 |-- model: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- transmission: string (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- fuelType: string (nullable = true)
 |-- tax: integer (nullable = true)
 |-- mpg: double (nullable = true)
 |-- engineSize: double (nullable = true)



In [18]:
#Get null values
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----+----+-----+------------+-------+--------+---+---+----------+
|model|year|price|transmission|mileage|fuelType|tax|mpg|engineSize|
+-----+----+-----+------------+-------+--------+---+---+----------+
|    0|   0|    0|           0|      0|       0|  0|  0|         0|
+-----+----+-----+------------+-------+--------+---+---+----------+



In [20]:
df.columns

['model',
 'year',
 'price',
 'transmission',
 'mileage',
 'fuelType',
 'tax',
 'mpg',
 'engineSize']

In [23]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=['model','transmission','fuelType', 'mpg', 'engineSize'], 
                          outputCols=['modelIndexed','transmissionIndexed','fuelTypeIndexed', 'mpgIndexed', 'engineSizeIndexed'])

dfIndexed = indexer.fit(df).transform(df)
dfIndexed.show()

+---------+----+-----+------------+-------+--------+---+----+----------+------------+-------------------+---------------+----------+-----------------+
|    model|year|price|transmission|mileage|fuelType|tax| mpg|engineSize|modelIndexed|transmissionIndexed|fuelTypeIndexed|mpgIndexed|engineSizeIndexed|
+---------+----+-----+------------+-------+--------+---+----+----------+------------+-------------------+---------------+----------+-----------------+
|   Fiesta|2017|12000|   Automatic|  15944|  Petrol|150|57.7|       1.0|         0.0|                1.0|            0.0|       4.0|              0.0|
|    Focus|2018|14000|      Manual|   9083|  Petrol|150|57.7|       1.0|         1.0|                0.0|            0.0|       4.0|              0.0|
|    Focus|2017|13000|      Manual|  12456|  Petrol|150|57.7|       1.0|         1.0|                0.0|            0.0|       4.0|              0.0|
|   Fiesta|2019|17500|      Manual|  10460|  Petrol|145|40.3|       1.5|         0.0|         

In [36]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['modelIndexed','transmissionIndexed','fuelTypeIndexed', 'mpgIndexed', 'engineSizeIndexed'],
                            outputCol='IndependentVariable')
output = assembler.transform(dfIndexed)
output.show()

from pyspark.ml.regression import LinearRegression
selectedFeatures = output.select(["IndependentVariable", "price"])
#split dataset 
trainData, testData = selectedFeatures.randomSplit([0.8,0.2])
regModel = LinearRegression(
    featuresCol="IndependentVariable", labelCol="price"
)
regressor = regModel.fit(trainData)

+---------+----+-----+------------+-------+--------+---+----+----------+------------+-------------------+---------------+----------+-----------------+--------------------+
|    model|year|price|transmission|mileage|fuelType|tax| mpg|engineSize|modelIndexed|transmissionIndexed|fuelTypeIndexed|mpgIndexed|engineSizeIndexed| IndependentVariable|
+---------+----+-----+------------+-------+--------+---+----+----------+------------+-------------------+---------------+----------+-----------------+--------------------+
|   Fiesta|2017|12000|   Automatic|  15944|  Petrol|150|57.7|       1.0|         0.0|                1.0|            0.0|       4.0|              0.0| (5,[1,3],[1.0,4.0])|
|    Focus|2018|14000|      Manual|   9083|  Petrol|150|57.7|       1.0|         1.0|                0.0|            0.0|       4.0|              0.0| (5,[0,3],[1.0,4.0])|
|    Focus|2017|13000|      Manual|  12456|  Petrol|150|57.7|       1.0|         1.0|                0.0|            0.0|       4.0|        

23/06/21 20:35:43 WARN Instrumentation: [058cd304] regParam is zero, which might cause numerical instability and overfitting.


In [31]:
regressor.coefficients

DenseVector([192.0953, 1361.2582, 1676.1643, 100.651, -601.7099])

In [32]:
regressor.intercept

10902.759266345774

In [33]:
y_pred = regressor.evaluate(testData)

In [34]:
y_pred.predictions.show()

+-------------------+-----+------------------+
|IndependentVariable|price|        prediction|
+-------------------+-----+------------------+
|          (5,[],[])| 4000|10902.759266345774|
|          (5,[],[])| 4640|10902.759266345774|
|          (5,[],[])| 4799|10902.759266345774|
|          (5,[],[])| 5599|10902.759266345774|
|          (5,[],[])| 5795|10902.759266345774|
|          (5,[],[])| 5989|10902.759266345774|
|          (5,[],[])| 5999|10902.759266345774|
|          (5,[],[])| 6080|10902.759266345774|
|          (5,[],[])| 6150|10902.759266345774|
|          (5,[],[])| 6270|10902.759266345774|
|          (5,[],[])| 6298|10902.759266345774|
|          (5,[],[])| 6400|10902.759266345774|
|          (5,[],[])| 6490|10902.759266345774|
|          (5,[],[])| 6495|10902.759266345774|
|          (5,[],[])| 6495|10902.759266345774|
|          (5,[],[])| 6498|10902.759266345774|
|          (5,[],[])| 6500|10902.759266345774|
|          (5,[],[])| 6590|10902.759266345774|
|          (5