In [40]:
import findspark
findspark.init('C:\spark\spark-2.4.7-bin-hadoop2.7')
findspark.find()
import pyspark
findspark.find()

'C:\\spark\\spark-2.4.7-bin-hadoop2.7'

In [41]:
from pyspark.sql import SparkSession

In [42]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

In [43]:
from pyspark.ml.classification import RandomForestClassifier

In [44]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [45]:
spark = SparkSession.builder.appName("Classification with Spark").getOrCreate()

In [46]:
dataset = spark.read.csv("data_clean.csv",header=True, inferSchema = True)

In [47]:
dataset.show(5)

+-------------+-----------------+------------------------+--------+-----------------+----------------------+------------------+--------------------+
|      Summary|  Temperature (C)|Apparent Temperature (C)|Humidity|Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Pressure (millibars)|
+-------------+-----------------+------------------------+--------+-----------------+----------------------+------------------+--------------------+
|Partly Cloudy|9.472222222222221|       7.388888888888887|    0.89|          14.1197|                 251.0|15.826300000000002|             1015.13|
|Partly Cloudy|9.355555555555558|       7.227777777777777|    0.86|          14.2646|                 259.0|15.826300000000002|             1015.63|
|Mostly Cloudy|9.377777777777778|       9.377777777777778|    0.89|           3.9284|                 204.0|           14.9569|             1015.94|
|Partly Cloudy| 8.28888888888889|       5.944444444444446|    0.83|          14.1036|                 269.

In [48]:
dataset.printSchema()

root
 |-- Summary: string (nullable = true)
 |-- Temperature (C): double (nullable = true)
 |-- Apparent Temperature (C): double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed (km/h): double (nullable = true)
 |-- Wind Bearing (degrees): double (nullable = true)
 |-- Visibility (km): double (nullable = true)
 |-- Pressure (millibars): double (nullable = true)



In [49]:
from pyspark.sql.functions import col, count, isnan, when
#checking for null ir nan type values in our columns
dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in dataset.columns]).show()

+-------+---------------+------------------------+--------+-----------------+----------------------+---------------+--------------------+
|Summary|Temperature (C)|Apparent Temperature (C)|Humidity|Wind Speed (km/h)|Wind Bearing (degrees)|Visibility (km)|Pressure (millibars)|
+-------+---------------+------------------------+--------+-----------------+----------------------+---------------+--------------------+
|      0|              0|                       0|       0|                0|                     0|              0|                   0|
+-------+---------------+------------------------+--------+-----------------+----------------------+---------------+--------------------+



In [18]:
#train, test = data.randomSplit([0.7, 0.3])

In [50]:
#train.show(5)

In [51]:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, outputCol="features")

    pipeline = Pipeline(stages=indexers + encoders + [assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label',col(labelCol))

    return data.select(indexCol,'features','label')

In [52]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [55]:
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[1:]),r[0]]).toDF(['features','label'])

In [57]:
transformed = transData(dataset)
transformed.show(5)

+--------------------+-------------+
|            features|        label|
+--------------------+-------------+
|[9.47222222222222...|Partly Cloudy|
|[9.35555555555555...|Partly Cloudy|
|[9.37777777777777...|Mostly Cloudy|
|[8.28888888888889...|Partly Cloudy|
|[8.75555555555555...|Mostly Cloudy|
+--------------------+-------------+
only showing top 5 rows



In [69]:
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(transformed)
labelIndexer.transform(transformed).show(5, True)

+--------------------+-------------+------------+
|            features|        label|indexedLabel|
+--------------------+-------------+------------+
|[9.47222222222222...|Partly Cloudy|         0.0|
|[9.35555555555555...|Partly Cloudy|         0.0|
|[9.37777777777777...|Mostly Cloudy|         1.0|
|[8.28888888888889...|Partly Cloudy|         0.0|
|[8.75555555555555...|Mostly Cloudy|         1.0|
+--------------------+-------------+------------+
only showing top 5 rows



In [72]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [76]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
                                outputCol="indexedFeatures", \
                                maxCategories=4).fit(transformed)
featureIndexer.transform(transformed).show(5, True)

+--------------------+-------------+--------------------+
|            features|        label|     indexedFeatures|
+--------------------+-------------+--------------------+
|[9.47222222222222...|Partly Cloudy|[9.47222222222222...|
|[9.35555555555555...|Partly Cloudy|[9.35555555555555...|
|[9.37777777777777...|Mostly Cloudy|[9.37777777777777...|
|[8.28888888888889...|Partly Cloudy|[8.28888888888889...|
|[8.75555555555555...|Mostly Cloudy|[8.75555555555555...|
+--------------------+-------------+--------------------+
only showing top 5 rows



In [77]:
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])

trainingData.show(5)
testData.show(5)

+--------------------+-------------+
|            features|        label|
+--------------------+-------------+
|[-16.666666666666...|        Clear|
|[-15.555555555555...|        Clear|
|[-15.483333333333...|Partly Cloudy|
|[-15.0,-22.738888...|        Clear|
|[-14.977777777777...|        Foggy|
+--------------------+-------------+
only showing top 5 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-16.666666666666...|Clear|
|[-15.733333333333...|Clear|
|[-15.466666666666...|Clear|
|[-15.0,-15.0,0.0,...|Clear|
|[-15.0,-15.0,0.83...|Foggy|
+--------------------+-----+
only showing top 5 rows



In [78]:
from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=100)

In [79]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [80]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])

In [81]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [82]:
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[-16.666666666666...|Clear| Mostly Cloudy|
|[-15.733333333333...|Clear| Mostly Cloudy|
|[-15.466666666666...|Clear| Mostly Cloudy|
|[-15.0,-15.0,0.0,...|Clear| Partly Cloudy|
|[-15.0,-15.0,0.83...|Foggy|         Foggy|
+--------------------+-----+--------------+
only showing top 5 rows



In [83]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[-2]
print(rfModel)  # summary only

Test Error = 0.508185
RandomForestClassificationModel (uid=RandomForestClassifier_73100c9abb9c) with 100 trees
