In [1]:
from pyspark.sql import SparkSession

In [2]:
##Initiate Spark Session
spark = SparkSession.builder.appName('cars_new').getOrCreate()

In [3]:
spark

In [5]:
##Read the data file from a csv file format
df_cars = spark.read.option('header', 'true').csv('cars.csv', inferSchema=True)

In [6]:
##Show the content of the file
df_cars.show()

+----------+----------+------+------+---+
|       Car|     Model|Volume|Weight|CO2|
+----------+----------+------+------+---+
|    Toyoty|      Aygo|  1000|   790| 99|
|Mitsubishi|Space Star|  1200|  1160| 95|
|     Skoda|    Citigo|  1000|   929| 95|
|      Fiat|       500|   900|   865| 90|
|      Mini|    Cooper|  1500|  1140|105|
|        VW|       Up!|  1000|   929|105|
|     Skoda|     Fabia|  1400|  1109| 90|
|  Mercedes|   A-Class|  1500|  1365| 92|
|      Ford|    Fiesta|  1500|  1112| 98|
|      Audi|        A1|  1600|  1150| 99|
|   Hyundai|       I20|  1100|   980| 99|
|    Suzuki|     Swift|  1300|   990|101|
|      Ford|    Fiesta|  1000|  1112| 99|
|     Honda|     Civic|  1600|  1252| 94|
|    Hundai|       I30|  1600|  1326| 97|
|      Opel|     Astra|  1600|  1330| 97|
|       BMW|         1|  1600|  1365| 99|
|     Mazda|         3|  2200|  1280|104|
|     Skoda|     Rapid|  1600|  1119|104|
|      Ford|     Focus|  2000|  1328|105|
+----------+----------+------+----

In [10]:
##perform describtive statistics on the data 
df_cars.describe().toPandas()

Unnamed: 0,summary,Car,Model,Volume,Weight,CO2
0,count,36,36,36.0,36.0,36.0
1,mean,,145.0,1611.111111111111,1292.2777777777778,102.02777777777776
2,stddev,,218.8412666751863,388.9750471451701,242.12388931656005,7.45457141092868
3,min,Audi,1,900.0,790.0,90.0
4,max,Volvo,Zafira,2500.0,1746.0,120.0


In [15]:
##show the schema
df_cars.printSchema()

root
 |-- Car: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- CO2: integer (nullable = true)



In [17]:
##number of columns
len(df_cars.columns)

5

In [18]:
##Convert string columns to integer to foster the development of the model
from pyspark.ml.feature import StringIndexer

In [37]:
indexer = StringIndexer(inputCol="Car", outputCol="Car_Cate")
indexed = indexer.fit(df_cars).transform(df_cars)

In [38]:
indexer = StringIndexer(inputCol="Model", outputCol="Model_Cate")
indexed = indexer.fit(df_cars).transform(indexed)

In [39]:
##the car and model column were converted to integer
indexed.show()

+----------+----------+------+------+---+--------+----------+
|       Car|     Model|Volume|Weight|CO2|Car_Cate|Model_Cate|
+----------+----------+------+------+---+--------+----------+
|    Toyoty|      Aygo|  1000|   790| 99|    15.0|      11.0|
|Mitsubishi|Space Star|  1200|  1160| 95|    13.0|      29.0|
|     Skoda|    Citigo|  1000|   929| 95|     2.0|      15.0|
|      Fiat|       500|   900|   865| 90|     7.0|       5.0|
|      Mini|    Cooper|  1500|  1140|105|    12.0|      17.0|
|        VW|       Up!|  1000|   929|105|    16.0|      31.0|
|     Skoda|     Fabia|  1400|  1109| 90|     2.0|      19.0|
|  Mercedes|   A-Class|  1500|  1365| 92|     1.0|       6.0|
|      Ford|    Fiesta|  1500|  1112| 98|     0.0|       0.0|
|      Audi|        A1|  1600|  1150| 99|     3.0|       7.0|
|   Hyundai|       I20|  1100|   980| 99|    10.0|      21.0|
|    Suzuki|     Swift|  1300|   990|101|    14.0|      30.0|
|      Ford|    Fiesta|  1000|  1112| 99|     0.0|       0.0|
|     Ho

In [40]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [41]:
indexed.columns

['Car', 'Model', 'Volume', 'Weight', 'CO2', 'Car_Cate', 'Model_Cate']

In [42]:
assembler_cars = VectorAssembler(inputCols=['Volume','Weight','Car_Cate','Model_Cate'], outputCol="Features")

In [43]:
assembler_cars

VectorAssembler_1844e6016947

In [44]:
output = assembler_cars.transform(indexed)

In [45]:
output.show()

+----------+----------+------+------+---+--------+----------+--------------------+
|       Car|     Model|Volume|Weight|CO2|Car_Cate|Model_Cate|            Features|
+----------+----------+------+------+---+--------+----------+--------------------+
|    Toyoty|      Aygo|  1000|   790| 99|    15.0|      11.0|[1000.0,790.0,15....|
|Mitsubishi|Space Star|  1200|  1160| 95|    13.0|      29.0|[1200.0,1160.0,13...|
|     Skoda|    Citigo|  1000|   929| 95|     2.0|      15.0|[1000.0,929.0,2.0...|
|      Fiat|       500|   900|   865| 90|     7.0|       5.0|[900.0,865.0,7.0,...|
|      Mini|    Cooper|  1500|  1140|105|    12.0|      17.0|[1500.0,1140.0,12...|
|        VW|       Up!|  1000|   929|105|    16.0|      31.0|[1000.0,929.0,16....|
|     Skoda|     Fabia|  1400|  1109| 90|     2.0|      19.0|[1400.0,1109.0,2....|
|  Mercedes|   A-Class|  1500|  1365| 92|     1.0|       6.0|[1500.0,1365.0,1....|
|      Ford|    Fiesta|  1500|  1112| 98|     0.0|       0.0|[1500.0,1112.0,0....|
|   

In [48]:
output.select('features', 'CO2')

DataFrame[features: vector, CO2: int]

In [54]:
final_data_cars = output.select('features','CO2')

In [55]:
train_data, test_data = final_data_cars.randomSplit([0.7,0.3])

In [57]:
train_data.show()

+--------------------+---+
|            features|CO2|
+--------------------+---+
|[900.0,865.0,7.0,...| 90|
|[1000.0,790.0,15....| 99|
|[1000.0,929.0,16....|105|
|[1000.0,1112.0,0....| 99|
|[1100.0,980.0,10....| 99|
|[1300.0,990.0,14....|101|
|[1400.0,1109.0,2....| 90|
|[1500.0,1112.0,0....| 98|
|[1500.0,1140.0,12...|105|
|[1500.0,1365.0,1....| 92|
|[1500.0,1465.0,1....|102|
|[1600.0,1119.0,2....|104|
|[1600.0,1150.0,3....| 99|
|[1600.0,1235.0,0....|104|
|[1600.0,1252.0,8....| 94|
|[1600.0,1326.0,9....| 97|
|[1600.0,1330.0,5....| 97|
|[1600.0,1365.0,4....| 99|
|[1600.0,1405.0,5....|109|
|[1600.0,1523.0,6....|109|
+--------------------+---+
only showing top 20 rows



In [58]:
test_data.show()

+--------------------+---+
|            features|CO2|
+--------------------+---+
|[1000.0,929.0,2.0...| 95|
|[1200.0,1160.0,13...| 95|
|[1600.0,1390.0,4....|108|
|[1600.0,1415.0,2....| 99|
|[2000.0,1725.0,3....|114|
|[2100.0,1365.0,1....| 99|
|[2100.0,1605.0,1....|115|
|[2500.0,1395.0,1....|120|
+--------------------+---+



In [59]:
##import the pyspark linear regression
from pyspark.ml.regression import LinearRegression

In [60]:
lr = LinearRegression(featuresCol='features', labelCol='CO2')

In [61]:
trained_model = lr.fit(train_data)

In [63]:
trained_model

LinearRegressionModel: uid=LinearRegression_a64a8a95f431, numFeatures=4

In [68]:
results = trained_model.evaluate(train_data)

In [69]:
##The r2 (coefficeint of determination is less than 0.5 in this model) therefore we might want to use a different or bigger data
print(results.r2)

0.3751113225281494


In [70]:
print(results.meanSquaredError)

25.174086721008848


In [71]:
print(results.meanAbsoluteError)

4.280712772912348


In [72]:
unlabeled_data = test_data.select('features')

In [73]:
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|[1000.0,929.0,2.0...|
|[1200.0,1160.0,13...|
|[1600.0,1390.0,4....|
|[1600.0,1415.0,2....|
|[2000.0,1725.0,3....|
|[2100.0,1365.0,1....|
|[2100.0,1605.0,1....|
|[2500.0,1395.0,1....|
+--------------------+



In [76]:
predictions = trained_model.transform(unlabeled_data)

In [77]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[1000.0,929.0,2.0...| 93.39008920526553|
|[1200.0,1160.0,13...|102.99531693715284|
|[1600.0,1390.0,4....|100.66242523026231|
|[1600.0,1415.0,2....|102.21401511637279|
|[2000.0,1725.0,3....| 106.2801049518824|
|[2100.0,1365.0,1....|101.43122508101547|
|[2100.0,1605.0,1....|104.96875297673243|
|[2500.0,1395.0,1....|104.30296062602164|
+--------------------+------------------+

