In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(
	'Read CSV File into DataFrame').getOrCreate()

df = spark.read.csv('C:/Users/jorda/Documents/GitHub/SDSC-BigData/daily_weather/daily_weather.csv', sep=',',
						inferSchema=True, header=True)

In [3]:
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [4]:
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [5]:
df = df.drop('number')

In [6]:
df = df.na.drop()

In [7]:
df.count(), len(df.columns)

(1064, 10)

In [8]:
binarizer = Binarizer(threshold=24.99999, inputCol='relative_humidity_3pm', outputCol='label')
binarizedDF = binarizer.transform(df)

In [9]:
binarizedDF.select('relative_humidity_3pm', 'label').show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [10]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol='features')
assembled = assembler.transform(binarizedDF)

In [11]:
(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed=13234)

In [12]:
trainingData.count(), testData.count()

(846, 218)

In [13]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features', maxDepth=5, minInstancesPerNode=20, impurity='gini')

In [14]:
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [15]:
predictions = model.transform(testData)

In [16]:
predictions.select('prediction', 'label').show(10)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
+----------+-----+
only showing top 10 rows



In [17]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="precision")

In [20]:
accuracy = 0.1

In [21]:
print ("Accuracy = %.2g" % (accuracy * 100))

Accuracy = 10


In [22]:
print ("Accuracy = %100g" % (accuracy))

Accuracy =                                                                                                  0.1


In [23]:
print ("Accuracy = %100.2g" % (accuracy))

Accuracy =                                                                                                  0.1
