In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('basics').getOrCreate()
df = spark.read.options(header="True", inferSchema="True").csv('cleaned data.csv')
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Vehicle ID: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Class: string (nullable = true)
 |-- Drive: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Engine Cylinders: integer (nullable = true)
 |-- Engine Displacement: double (nullable = true)
 |-- Turbocharger: integer (nullable = true)
 |-- Supercharger: integer (nullable = true)
 |-- Fuel Type: string (nullable = true)
 |-- Fuel Economy Score: integer (nullable = true)
 |-- Gasoline/Electricity Blended: boolean (nullable = true)



In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [3]:
assembler = VectorAssembler(
  inputCols=['Year',
             'Engine Cylinders',
             'Engine Displacement',
             'Turbocharger',
             'Supercharger'],
              outputCol="features")

In [4]:
output = assembler.transform(df)

In [5]:
from pyspark.ml.feature import StringIndexer

In [6]:
indexer = StringIndexer(inputCol="Class", outputCol="ClassIndex")
output_fixed = indexer.fit(output).transform(output)

In [7]:
indexer = StringIndexer(inputCol="Drive", outputCol="DriveIndex")
output_fixed = indexer.fit(output_fixed).transform(output_fixed)

In [8]:
indexer = StringIndexer(inputCol="Transmission", outputCol="TransmissionIndex")
output_fixed = indexer.fit(output_fixed).transform(output_fixed)


In [9]:
indexer = StringIndexer(inputCol="Fuel Type", outputCol="Fuel_TypeIndex")
output_fixed = indexer.fit(output_fixed).transform(output_fixed)


In [10]:
output_fixed.columns

['_c0',
 'Vehicle ID',
 'Year',
 'Class',
 'Drive',
 'Transmission',
 'Engine Cylinders',
 'Engine Displacement',
 'Turbocharger',
 'Supercharger',
 'Fuel Type',
 'Fuel Economy Score',
 'Gasoline/Electricity Blended',
 'features',
 'ClassIndex',
 'DriveIndex',
 'TransmissionIndex',
 'Fuel_TypeIndex']

In [11]:
assembler = VectorAssembler(
  inputCols=['Year',
             'Engine Cylinders',
             'Engine Displacement',
             'Turbocharger',
             'Supercharger',
             'ClassIndex',
             'DriveIndex',
             'TransmissionIndex',
             'Fuel_TypeIndex'],
              outputCol="features_fixed")
output_fixed = assembler.transform(output_fixed)

In [12]:
final_data = output_fixed.select("features_fixed",'Fuel Economy Score')
#final_data.toPandas().to_csv('mycsv.csv')

In [13]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [14]:
#train_data.count(), test_data.count()

In [15]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [16]:
rfc = RandomForestClassifier(labelCol='Fuel Economy Score',featuresCol='features_fixed', numTrees=100, maxDepth=10)

In [17]:
rfc_model = rfc.fit(train_data)

In [18]:
rfc_predictions = rfc_model.transform(test_data)

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'Fuel Economy Score')

In [20]:
print("RFC")
print(my_binary_eval.evaluate(rfc_predictions))

RFC
1.0


In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [22]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="Fuel Economy Score", predictionCol="prediction", metricName="accuracy")

In [23]:
rfc_acc = acc_evaluator.evaluate(rfc_predictions)

In [24]:
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))

----------------------------------------
A random forest ensemble has an accuracy of: 67.54%


In [25]:
dtc = DecisionTreeClassifier(labelCol='Fuel Economy Score',featuresCol='features_fixed', maxDepth=10)

In [26]:
dtc_model = dtc.fit(train_data)

In [27]:
dtc_predictions = dtc_model.transform(test_data)

In [28]:
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))

DTC
1.0


In [29]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)

In [30]:
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

----------------------------------------
A single decision tree has an accuracy of: 67.60%
