In [1]:

import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('air_quality').getOrCreate()

In [2]:
df = spark.read.csv('Pollution.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Measurement date: string (nullable = true)
 |-- Station code: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- O3: double (nullable = true)
 |-- CO: double (nullable = true)
 |-- PM10: integer (nullable = true)
 |-- PM2.5: integer (nullable = true)



In [3]:
df.show()

+---+----------------+------------+--------------------+----------+-----------+-----+-----+-----+---+----+-----+
| ID|Measurement date|Station code|             Address|  Latitude|  Longitude|  SO2|  NO2|   O3| CO|PM10|PM2.5|
+---+----------------+------------+--------------------+----------+-----------+-----+-----+-----+---+----+-----+
|  1|   2019/1/1 1:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.004|0.061|0.002|1.1|  37|   22|
|  2|   2019/1/1 2:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.054|0.002|0.9|  38|   21|
|  3|   2019/1/1 3:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.053|0.002|1.0|  36|   23|
|  4|   2019/1/1 4:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003| 0.05|0.002|0.9|  41|   26|
|  5|   2019/1/1 5:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.046|0.003|0.8|  41|   25|
|  6|   2019/1/1 6:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.038|0.007

In [4]:
df_renamed = df.withColumnRenamed("PM2.5","Target")
df_renamed.columns

['ID',
 'Measurement date',
 'Station code',
 'Address',
 'Latitude',
 'Longitude',
 'SO2',
 'NO2',
 'O3',
 'CO',
 'PM10',
 'Target']

In [5]:
df_renamed.describe([
'ID',
 'Measurement date',
 'Station code',
 'Address',
 'Latitude',
 'Longitude',
]).show()

+-------+------------------+----------------+------------------+--------------------+-------------------+-------------------+
|summary|                ID|Measurement date|      Station code|             Address|           Latitude|          Longitude|
+-------+------------------+----------------+------------------+--------------------+-------------------+-------------------+
|  count|            209511|          209511|            209511|              209511|             209511|             209511|
|   mean|          104756.0|            null|113.00068254172812|                null| 37.553479045609905| 126.98934192508555|
| stddev|60480.760461488906|            null|  7.21175942625844|                null|0.05327004022581489|0.07879484593384273|
|    min|                 1|  2019/1/1 10:00|               101|10, Poeun-ro 6-gi...|         37.4523569|        126.8351506|
|    max|            209511|   2019/9/9 9:00|               125|71, Gangseo-ro 45...|         37.6587743|        127.1

In [6]:
df_renamed.describe([
'SO2',
 'NO2',
 'O3',
 'CO',
 'PM10',
 'Target'
]).show()

+-------+--------------------+--------------------+-------------------+-------------------+-----------------+------------------+
|summary|                 SO2|                 NO2|                 O3|                 CO|             PM10|            Target|
+-------+--------------------+--------------------+-------------------+-------------------+-----------------+------------------+
|  count|              206380|              206368|             206337|             206527|           204549|            204511|
|   mean|0.004051216203123259|0.028063517599629547|0.02496623484882006| 0.5351242210460029|46.14175087631814|28.322378747353444|
| stddev|0.004333254861274011|0.015935317603238443|0.01990250617013541|0.43011460506200494|79.23508759996433|   59.866435268257|
|    min|               0.001|               0.001|              0.001|                0.1|                1|                 1|
|    max|               0.378|                0.31|              1.346|               36.8|      

In [7]:
df_renamed.show()
df_renamed.count()

+---+----------------+------------+--------------------+----------+-----------+-----+-----+-----+---+----+------+
| ID|Measurement date|Station code|             Address|  Latitude|  Longitude|  SO2|  NO2|   O3| CO|PM10|Target|
+---+----------------+------------+--------------------+----------+-----------+-----+-----+-----+---+----+------+
|  1|   2019/1/1 1:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.004|0.061|0.002|1.1|  37|    22|
|  2|   2019/1/1 2:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.054|0.002|0.9|  38|    21|
|  3|   2019/1/1 3:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.053|0.002|1.0|  36|    23|
|  4|   2019/1/1 4:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003| 0.05|0.002|0.9|  41|    26|
|  5|   2019/1/1 5:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.046|0.003|0.8|  41|    25|
|  6|   2019/1/1 6:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.0

209511

In [8]:
df_new=df_renamed.na.drop(thresh=11)
df_new.show()
df_new.count()

+---+----------------+------------+--------------------+----------+-----------+-----+-----+-----+---+----+------+
| ID|Measurement date|Station code|             Address|  Latitude|  Longitude|  SO2|  NO2|   O3| CO|PM10|Target|
+---+----------------+------------+--------------------+----------+-----------+-----+-----+-----+---+----+------+
|  1|   2019/1/1 1:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.004|0.061|0.002|1.1|  37|    22|
|  2|   2019/1/1 2:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.054|0.002|0.9|  38|    21|
|  3|   2019/1/1 3:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.053|0.002|1.0|  36|    23|
|  4|   2019/1/1 4:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003| 0.05|0.002|0.9|  41|    26|
|  5|   2019/1/1 5:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.046|0.003|0.8|  41|    25|
|  6|   2019/1/1 6:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.003|0.0

204298

In [9]:
df_new.describe([
'SO2',
 'NO2',
 'O3',
 'CO',
 'PM10',
 'Target'
]).show()

+-------+--------------------+--------------------+--------------------+------------------+-----------------+------------------+
|summary|                 SO2|                 NO2|                  O3|                CO|             PM10|            Target|
+-------+--------------------+--------------------+--------------------+------------------+-----------------+------------------+
|  count|              204205|              204119|              204134|            204213|           204178|            204138|
|   mean|0.004055125976350049|0.028122237518311888| 0.02498874268863059|0.5350839564570707|46.12689418056794|28.299846182484398|
| stddev| 0.00398967741067353|0.015964390116414356|0.019875844609903836|0.3988179695695515|79.21405839995822|  59.7636372395751|
|    min|               0.001|               0.001|               0.001|               0.1|                1|                 1|
|    max|               0.378|                0.31|               1.346|              36.8|      

In [11]:
df_filter=df_new.drop('ID',
 'Measurement date',
 'Address',)
df_filter.show()

+------------+----------+-----------+-----+-----+-----+---+----+------+
|Station code|  Latitude|  Longitude|  SO2|  NO2|   O3| CO|PM10|Target|
+------------+----------+-----------+-----+-----+-----+---+----+------+
|         101|37.5720164|127.0050075|0.004|0.061|0.002|1.1|  37|    22|
|         101|37.5720164|127.0050075|0.003|0.054|0.002|0.9|  38|    21|
|         101|37.5720164|127.0050075|0.003|0.053|0.002|1.0|  36|    23|
|         101|37.5720164|127.0050075|0.003| 0.05|0.002|0.9|  41|    26|
|         101|37.5720164|127.0050075|0.003|0.046|0.003|0.8|  41|    25|
|         101|37.5720164|127.0050075|0.003|0.038|0.007|0.7|  38|    27|
|         101|37.5720164|127.0050075|0.004|0.052|0.002|1.1|  35|    23|
|         101|37.5720164|127.0050075|0.004|0.054|0.003|1.1|  40|    25|
|         101|37.5720164|127.0050075|0.004|0.041|0.012|0.8|  38|    25|
|         101|37.5720164|127.0050075|0.004|0.015|0.028|0.5|  30|    20|
|         101|37.5720164|127.0050075|0.005|0.013|0.029|0.5|  28|

In [12]:
from pyspark.sql.functions  import mean
mean_Stationcode = df_filter.select(mean(df_filter['Station code'])).collect()
mean_Stationcode
mean_Stationcode_val = mean_Stationcode[0][0]
a = df_filter.na.fill(mean_Stationcode_val, subset=['Station code'])

mean_Latitude = a.select(mean(a['Latitude'])).collect()
mean_Latitude
mean_Latitude_val = mean_Latitude[0][0]
b = a.na.fill(mean_Latitude_val, subset=['Latitude'])

mean_Longitude = b.select(mean(b['Longitude'])).collect()
mean_Longitude
mean_Longitude_val = mean_Latitude[0][0]
c = b.na.fill(mean_Longitude_val, subset=['Longitude'])

mean_SO2 = c.select(mean(c['SO2'])).collect()
mean_SO2
mean_SO2_val = mean_SO2[0][0]
d = c.na.fill(mean_SO2_val, subset=['SO2'])

mean_NO2 = d.select(mean(d['NO2'])).collect()
mean_NO2
mean_NO2_val = mean_NO2[0][0]
e = d.na.fill(mean_NO2_val, subset=['NO2'])

mean_O3 = e.select(mean(e['O3'])).collect()
mean_O3
mean_O3_val = mean_O3[0][0]
f = e.na.fill(mean_NO2_val, subset=['O3'])

mean_CO = f.select(mean(f['CO'])).collect()
mean_CO
mean_CO_val = mean_CO[0][0]
g = f.na.fill(mean_CO_val, subset=['CO'])

mean_PM10 = g.select(mean(g['PM10'])).collect()
mean_PM10
mean_PM10_val = mean_PM10[0][0]
h = g.na.fill(mean_PM10_val, subset=['PM10'])

mean_Target = h.select(mean(h['Target'])).collect()
mean_Target
mean_Target_val = mean_Target[0][0]
i = h.na.fill(mean_Target_val, subset=['Target'])



i.show()

+------------+----------+-----------+-----+-----+-----+---+----+------+
|Station code|  Latitude|  Longitude|  SO2|  NO2|   O3| CO|PM10|Target|
+------------+----------+-----------+-----+-----+-----+---+----+------+
|         101|37.5720164|127.0050075|0.004|0.061|0.002|1.1|  37|    22|
|         101|37.5720164|127.0050075|0.003|0.054|0.002|0.9|  38|    21|
|         101|37.5720164|127.0050075|0.003|0.053|0.002|1.0|  36|    23|
|         101|37.5720164|127.0050075|0.003| 0.05|0.002|0.9|  41|    26|
|         101|37.5720164|127.0050075|0.003|0.046|0.003|0.8|  41|    25|
|         101|37.5720164|127.0050075|0.003|0.038|0.007|0.7|  38|    27|
|         101|37.5720164|127.0050075|0.004|0.052|0.002|1.1|  35|    23|
|         101|37.5720164|127.0050075|0.004|0.054|0.003|1.1|  40|    25|
|         101|37.5720164|127.0050075|0.004|0.041|0.012|0.8|  38|    25|
|         101|37.5720164|127.0050075|0.004|0.015|0.028|0.5|  30|    20|
|         101|37.5720164|127.0050075|0.005|0.013|0.029|0.5|  28|

In [13]:
i.count()

204298

In [14]:

df_final=i.na.drop( )
df_final.count()

204298

In [15]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [16]:
assembler = VectorAssembler(
    inputCols=[ 
 'Station code',
 'Latitude',
 'Longitude',
 'SO2',
 'NO2',
 'O3',
 'CO',
 'PM10',
 'Target'],
    outputCol="features")

In [17]:
output = assembler.transform(df_final)
output.printSchema()
output.head(1)

root
 |-- Station code: integer (nullable = true)
 |-- Latitude: double (nullable = false)
 |-- Longitude: double (nullable = false)
 |-- SO2: double (nullable = false)
 |-- NO2: double (nullable = false)
 |-- O3: double (nullable = false)
 |-- CO: double (nullable = false)
 |-- PM10: integer (nullable = true)
 |-- Target: integer (nullable = true)
 |-- features: vector (nullable = true)



[Row(Station code=101, Latitude=37.5720164, Longitude=127.0050075, SO2=0.004, NO2=0.061, O3=0.002, CO=1.1, PM10=37, Target=22, features=DenseVector([101.0, 37.572, 127.005, 0.004, 0.061, 0.002, 1.1, 37.0, 22.0]))]

In [18]:
final_data = output.select("features",'Target')
final_data.show()

+--------------------+------+
|            features|Target|
+--------------------+------+
|[101.0,37.5720164...|    22|
|[101.0,37.5720164...|    21|
|[101.0,37.5720164...|    23|
|[101.0,37.5720164...|    26|
|[101.0,37.5720164...|    25|
|[101.0,37.5720164...|    27|
|[101.0,37.5720164...|    23|
|[101.0,37.5720164...|    25|
|[101.0,37.5720164...|    25|
|[101.0,37.5720164...|    20|
|[101.0,37.5720164...|    17|
|[101.0,37.5720164...|    24|
|[101.0,37.5720164...|    26|
|[101.0,37.5720164...|    27|
|[101.0,37.5720164...|    21|
|[101.0,37.5720164...|    19|
|[101.0,37.5720164...|    21|
|[101.0,37.5720164...|    19|
|[101.0,37.5720164...|    18|
|[101.0,37.5720164...|    15|
+--------------------+------+
only showing top 20 rows



In [19]:
train_data,test_data = final_data.randomSplit([0.7,0.3])
train_data.describe().show()
test_data.describe().show()

+-------+------------------+
|summary|            Target|
+-------+------------------+
|  count|            143003|
|   mean|28.389572246736083|
| stddev| 60.52001989658753|
|    min|                 1|
|    max|               995|
+-------+------------------+

+-------+------------------+
|summary|            Target|
+-------+------------------+
|  count|             61295|
|   mean| 28.08972999428991|
| stddev|57.880069387432755|
|    min|                 1|
|    max|               985|
+-------+------------------+



In [20]:
lr = LinearRegression(labelCol='Target')
# Fit the model to the data.
lrModel = lr.fit(train_data)
# Print the coefficients and intercept for linear regression.
print("Intercept: {}".format(lrModel.intercept))
print("Coefficients: {}".format(lrModel.coefficients))

Intercept: -1.3849809734900415e-13
Coefficients: [-6.422266490177323e-17,-1.7632715725109426e-15,1.6587714986079982e-15,5.656284407565022e-14,-7.154821114094772e-14,-3.4227811544443536e-14,-2.3130821222296508e-15,-4.270914508240403e-17,1.0000000000000002]


In [34]:
prediction_lr.select("prediction", "features").show()

+----------+--------------------+
|prediction|            features|
+----------+--------------------+
|      28.0|[101.0,37.5720164...|
|       1.0|[101.0,37.5720164...|
|      16.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|      12.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|      23.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|       7.0|[101.0,37.5720164...|
|      29.0|[101.0,37.5720164...|
|      14.0|[101.0,37.5720164...|
|      29.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|      19.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|       8.0|[101.0,37.5720164...|
|      23.0|[101.0,37.5720164...|
|      29.0|[101.0,37.5720164...|
|      16.0|[101.0,37.5720164...|
|      19.0|[101.0,37.5720164...|
+----------+--------------------+
only showing top 20 rows



In [21]:
test_results = lrModel.evaluate(test_data)
print ('Model: Linear Regression')
# evaluation metrics 
print("R2: {}".format(test_results.r2))
print("MAE: {}".format(test_results.meanAbsoluteError))
print("MSE: {}".format(test_results.meanSquaredError))
print("RSME: {}".format(test_results.rootMeanSquaredError))
# This shows the difference between the predicted value and the test data.
test_results.residuals.show()

Model: Linear Regression
R2: 1.0
MAE: 3.5763943310950586e-15
MSE: 1.808145026667448e-28
RSME: 1.3446728325758083e-14
+--------------------+
|           residuals|
+--------------------+
|1.776356839400250...|
|1.332267629550187...|
|1.776356839400250...|
|2.775557561562891...|
|1.776356839400250...|
|1.776356839400250...|
|1.776356839400250...|
|5.329070518200751...|
|1.776356839400250...|
|2.220446049250313...|
|1.776356839400250...|
|2.109423746787797...|
|                 0.0|
|1.776356839400250...|
|2.220446049250313...|
|2.664535259100375...|
|1.776356839400250...|
|1.776356839400250...|
|2.664535259100375...|
|1.776356839400250...|
+--------------------+
only showing top 20 rows



In [22]:
from pyspark.ml.classification import (RandomForestClassifier,LogisticRegression,DecisionTreeClassifier)
from pyspark.ml import Pipeline

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Target", outputCol="Target_index").fit(final_data)
df_end = indexer.transform(final_data)
df_end.show()

+--------------------+------+------------+
|            features|Target|Target_index|
+--------------------+------+------------+
|[101.0,37.5720164...|    22|        13.0|
|[101.0,37.5720164...|    21|        11.0|
|[101.0,37.5720164...|    23|        15.0|
|[101.0,37.5720164...|    26|        21.0|
|[101.0,37.5720164...|    25|        18.0|
|[101.0,37.5720164...|    27|        20.0|
|[101.0,37.5720164...|    23|        15.0|
|[101.0,37.5720164...|    25|        18.0|
|[101.0,37.5720164...|    25|        18.0|
|[101.0,37.5720164...|    20|        10.0|
|[101.0,37.5720164...|    17|         0.0|
|[101.0,37.5720164...|    24|        17.0|
|[101.0,37.5720164...|    26|        21.0|
|[101.0,37.5720164...|    27|        20.0|
|[101.0,37.5720164...|    21|        11.0|
|[101.0,37.5720164...|    19|         7.0|
|[101.0,37.5720164...|    21|        11.0|
|[101.0,37.5720164...|    19|         7.0|
|[101.0,37.5720164...|    18|         6.0|
|[101.0,37.5720164...|    15|         2.0|
+----------

In [23]:
(trainingData, testData) = df_end.randomSplit([0.7, 0.3])

In [24]:
rf = RandomForestClassifier(labelCol="Target_index", featuresCol="features", numTrees=20)
model_rf = rf.fit(trainingData)

In [25]:
prediction_rf = model_rf.transform(testData)
prediction_rf.show()

+--------------------+------+------------+--------------------+--------------------+----------+
|            features|Target|Target_index|       rawPrediction|         probability|prediction|
+--------------------+------+------------+--------------------+--------------------+----------+
|[101.0,37.5720164...|     1|        28.0|[0.00119012198750...|[5.95060993751859...|      28.0|
|[101.0,37.5720164...|    16|         1.0|[1.49339423844797...|[0.07466971192239...|       1.0|
|[101.0,37.5720164...|     7|        16.0|[0.05768621431409...|[0.00288431071570...|      16.0|
|[101.0,37.5720164...|     3|        29.0|[0.00119012198750...|[5.95060993751859...|      28.0|
|[101.0,37.5720164...|     9|        12.0|[0.30854490920759...|[0.01542724546037...|      12.0|
|[101.0,37.5720164...|    11|         8.0|[0.27973784140325...|[0.01398689207016...|       9.0|
|[101.0,37.5720164...|     5|        23.0|[0.00768282842301...|[3.84141421150554...|      23.0|
|[101.0,37.5720164...|     1|        28.

In [26]:
prediction_rf.select("prediction", "features").show()

+----------+--------------------+
|prediction|            features|
+----------+--------------------+
|      28.0|[101.0,37.5720164...|
|       1.0|[101.0,37.5720164...|
|      16.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|      12.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|      23.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|       7.0|[101.0,37.5720164...|
|      29.0|[101.0,37.5720164...|
|      14.0|[101.0,37.5720164...|
|      29.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|      19.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|       8.0|[101.0,37.5720164...|
|      23.0|[101.0,37.5720164...|
|      29.0|[101.0,37.5720164...|
|      16.0|[101.0,37.5720164...|
|      19.0|[101.0,37.5720164...|
+----------+--------------------+
only showing top 20 rows



In [28]:
lr = LogisticRegression(labelCol="Target_index", featuresCol="features")
model_lr = lr.fit(trainingData)

In [29]:
prediction_lr = model_rf.transform(testData)
prediction_lr.show()

+--------------------+------+------------+--------------------+--------------------+----------+
|            features|Target|Target_index|       rawPrediction|         probability|prediction|
+--------------------+------+------------+--------------------+--------------------+----------+
|[101.0,37.5720164...|     1|        28.0|[0.00119012198750...|[5.95060993751859...|      28.0|
|[101.0,37.5720164...|    16|         1.0|[1.49339423844797...|[0.07466971192239...|       1.0|
|[101.0,37.5720164...|     7|        16.0|[0.05768621431409...|[0.00288431071570...|      16.0|
|[101.0,37.5720164...|     3|        29.0|[0.00119012198750...|[5.95060993751859...|      28.0|
|[101.0,37.5720164...|     9|        12.0|[0.30854490920759...|[0.01542724546037...|      12.0|
|[101.0,37.5720164...|    11|         8.0|[0.27973784140325...|[0.01398689207016...|       9.0|
|[101.0,37.5720164...|     5|        23.0|[0.00768282842301...|[3.84141421150554...|      23.0|
|[101.0,37.5720164...|     1|        28.

In [30]:
prediction_lr.select("prediction", "features").show()

+----------+--------------------+
|prediction|            features|
+----------+--------------------+
|      28.0|[101.0,37.5720164...|
|       1.0|[101.0,37.5720164...|
|      16.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|      12.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|      23.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|       7.0|[101.0,37.5720164...|
|      29.0|[101.0,37.5720164...|
|      14.0|[101.0,37.5720164...|
|      29.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|      19.0|[101.0,37.5720164...|
|      28.0|[101.0,37.5720164...|
|       8.0|[101.0,37.5720164...|
|      23.0|[101.0,37.5720164...|
|      29.0|[101.0,37.5720164...|
|      16.0|[101.0,37.5720164...|
|      19.0|[101.0,37.5720164...|
+----------+--------------------+
only showing top 20 rows



In [31]:
dt = DecisionTreeClassifier(labelCol="Target_index", featuresCol="features")
model_dt = dt.fit(trainingData)

In [32]:
prediction_dt = model_dt.transform(testData)
prediction_dt.show()

+--------------------+------+------------+--------------------+--------------------+----------+
|            features|Target|Target_index|       rawPrediction|         probability|prediction|
+--------------------+------+------------+--------------------+--------------------+----------+
|[101.0,37.5720164...|     1|        28.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       9.0|
|[101.0,37.5720164...|    16|         1.0|[0.0,5020.0,0.0,0...|[0.0,1.0,0.0,0.0,...|       1.0|
|[101.0,37.5720164...|     7|        16.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       9.0|
|[101.0,37.5720164...|     3|        29.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       9.0|
|[101.0,37.5720164...|     9|        12.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       9.0|
|[101.0,37.5720164...|    11|         8.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       8.0|
|[101.0,37.5720164...|     5|        23.0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|       9.0|
|[101.0,37.5720164...|     1|        28.

In [33]:
prediction_dt.select("prediction", "features").show()

+----------+--------------------+
|prediction|            features|
+----------+--------------------+
|       9.0|[101.0,37.5720164...|
|       1.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       8.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       7.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       8.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
|       9.0|[101.0,37.5720164...|
+----------+--------------------+
only showing top 20 rows



In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [36]:
rf_accuracy = MulticlassClassificationEvaluator(labelCol='Target_index', metricName='accuracy').evaluate(prediction_rf)
print("RF's accuracy is %f"%rf_accuracy)
lr_accuracy = MulticlassClassificationEvaluator(labelCol='Target_index', metricName='accuracy').evaluate(prediction_lr)
print("LR's accuracy is %f"%lr_accuracy)
dt_accuracy= MulticlassClassificationEvaluator(labelCol='Target_index', metricName='accuracy').evaluate(prediction_dt)
print("DT's accuracy is %f"%dt_accuracy)

RF's accuracy is 0.542311
LR's accuracy is 0.542311
DT's accuracy is 0.337936


In [37]:
rf_precision = MulticlassClassificationEvaluator(labelCol='Target_index', metricName='weightedPrecision').evaluate(prediction_rf)
print("RF's precision is %f"%rf_precision)
lr_precision = MulticlassClassificationEvaluator(labelCol='Target_index', metricName='weightedPrecision').evaluate(prediction_lr)
print("LR's precision is %f"%lr_precision)
dt_precision= MulticlassClassificationEvaluator(labelCol='Target_index', metricName='weightedPrecision').evaluate(prediction_dt)
print("DT's precision is %f"%dt_precision)

RF's precision is 0.496601
LR's precision is 0.496601
DT's precision is 0.280801


In [38]:
rf_auc = BinaryClassificationEvaluator(labelCol='Target_index').evaluate(prediction_rf)
print("RF's precision is %f"%rf_auc)
lr_auc = BinaryClassificationEvaluator(labelCol='Target_index').evaluate(prediction_lr)
print("LR's precision is %f"%lr_auc)
dt_auc= BinaryClassificationEvaluator(labelCol='Target_index').evaluate(prediction_dt)
print("DT's precision is %f"%dt_auc)

RF's precision is 0.100470
LR's precision is 0.100470
DT's precision is 0.518355
