In [1]:
import numpy as np
import pandas as pd
import pickle
import pyspark
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier, LinearSVC, RandomForestClassifier, OneVsRest
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
X_train = pickle.load(open('RGB_train_reduce.pickle', 'rb'))
X_test = pickle.load(open('RGB_test_reduce.pickle', 'rb'))
y_train = pickle.load(open('RGB_train_labels.pickle', 'rb'))
y_test = pickle.load(open('RGB_test_labels.pickle', 'rb'))

print(X_train.shape, X_test.shape)

(21600, 120) (5400, 120)


In [3]:
spark = pyspark.sql.SparkSession.builder.master('local[*]').getOrCreate()  # new context

In [38]:
train = pd.concat((pd.DataFrame(y_train, columns = ['label']), pd.DataFrame(X_train)), axis = 1)
test = pd.concat((pd.DataFrame(y_test, columns = ['label']), pd.DataFrame(X_test)), axis = 1)
train.iloc[0:5, 0:5]

Unnamed: 0,label,0,1,2,3
0,AnnualCrop,-1416.319604,-257.642085,382.648921,-446.744537
1,Forest,-4564.012617,232.017194,-172.520511,-265.605766
2,SeaLake,1467.440003,-607.687599,482.082708,1050.886258
3,Residential,5906.549777,-149.513881,92.945077,266.333431
4,Industrial,3428.028145,-1264.509012,-123.286326,245.533198


In [39]:
train = spark.createDataFrame(train)
test = spark.createDataFrame(test)
train.printSchema()

root
 |-- label: string (nullable = true)
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)
 |-- 10: double (nullable = true)
 |-- 11: double (nullable = true)
 |-- 12: double (nullable = true)
 |-- 13: double (nullable = true)
 |-- 14: double (nullable = true)
 |-- 15: double (nullable = true)
 |-- 16: double (nullable = true)
 |-- 17: double (nullable = true)
 |-- 18: double (nullable = true)
 |-- 19: double (nullable = true)
 |-- 20: double (nullable = true)
 |-- 21: double (nullable = true)
 |-- 22: double (nullable = true)
 |-- 23: double (nullable = true)
 |-- 24: double (nullable = true)
 |-- 25: double (nullable = true)
 |-- 26: double (nullable = true)
 |-- 27: double (nullable = true)
 |-- 28: double 

In [40]:
def preview(df, n = 5):
    return pd.DataFrame(df.take(n), columns = df.columns).iloc[:, 0:5]

In [41]:
preview(train)

Unnamed: 0,label,0,1,2,3
0,AnnualCrop,-1416.319604,-257.642085,382.648921,-446.744537
1,Forest,-4564.012617,232.017194,-172.520511,-265.605766
2,SeaLake,1467.440003,-607.687599,482.082708,1050.886258
3,Residential,5906.549777,-149.513881,92.945077,266.333431
4,Industrial,3428.028145,-1264.509012,-123.286326,245.533198


In [42]:
train.filter(train.label == 'Forest').select(['label', '0']).show(5)

+------+------------------+
| label|                 0|
+------+------------------+
|Forest|-4564.012617036548|
|Forest| -4618.90154239226|
|Forest|-4449.486284594558|
|Forest|-4390.513730199147|
|Forest|-4143.358269656351|
+------+------------------+
only showing top 5 rows



In [43]:
features = train.schema.names[1:]  # feature columns
assembler = VectorAssembler(inputCols = features, outputCol = 'features')
train_condense = assembler.transform(train)
test_condense = assembler.transform(test)

for field in features:
    train_condense = train_condense.drop(field)
    test_condense  = test_condense.drop(field)

In [44]:
# Convert the labels to floats
si = StringIndexer(inputCol = 'label', outputCol = 'label_float')
si_model = si.fit(train_condense)

train_condense = si_model.transform(train_condense)
test_condense = si_model.transform(test_condense)

# Drop the string column and rename the numerical one to 'label'
train_condense = train_condense.drop('label').withColumnRenamed('label_float', 'label')
test_condense = test_condense.drop('label').withColumnRenamed('label_float', 'label')

In [45]:
preview(train_condense)

Unnamed: 0,features,label
0,"[-1416.3196041218278, -257.6420849554097, 382....",1.0
1,"[-4564.012617036548, 232.01719434779352, -172....",4.0
2,"[1467.4400031629802, -607.6875987440877, 482.0...",2.0
3,"[5906.549776707974, -149.5138807170132, 92.945...",0.0
4,"[3428.0281451156866, -1264.5090124886615, -123...",6.0


In [None]:
tree = DecisionTreeClassifier(maxDepth = 5, minInstancesPerNode = 5).fit(train_condense)
tree_pred = tree.transform(test_condense)

In [47]:
preview(tree_pred)

Unnamed: 0,features,label,rawPrediction,probability,prediction
0,"[3127.221905557597, -1098.5030441349777, 1070....",6.0,"[11.0, 31.0, 5.0, 10.0, 0.0, 10.0, 602.0, 10.0...","[0.015942028985507246, 0.04492753623188406, 0....",6.0
1,"[-5202.2362898898045, 116.5073389911345, -291....",4.0,"[0.0, 4.0, 6.0, 7.0, 37.0, 10.0, 0.0, 70.0, 0....","[0.0, 0.026143790849673203, 0.0392156862745098...",7.0
2,"[-1498.8714233595265, 679.1006007327401, 111.6...",5.0,"[141.0, 41.0, 186.0, 204.0, 103.0, 327.0, 0.0,...","[0.07863915225878416, 0.02286670384829894, 0.1...",7.0
3,"[-5118.9629494324145, 185.18077070886073, -117...",4.0,"[0.0, 11.0, 192.0, 20.0, 1924.0, 36.0, 0.0, 10...","[0.0, 0.00466893039049236, 0.08149405772495756...",4.0
4,"[-1875.9314698926842, 160.99124884573897, -601...",9.0,"[6.0, 164.0, 218.0, 72.0, 127.0, 144.0, 1.0, 1...","[0.003424657534246575, 0.09360730593607305, 0....",9.0


In [48]:
evaluator = MulticlassClassificationEvaluator(metricName = 'accuracy')
accuracy = evaluator.evaluate(tree_pred)
accuracy

0.4653703703703704

In [49]:
SVC = LinearSVC(regParam = 1, standardization = False)
OVR = OneVsRest(classifier = SVC, parallelism = 8)
OVR_model = OVR.fit(train_condense)
OVR_pred = OVR_model.transform(test_condense)

In [50]:
preview(OVR_pred)

Unnamed: 0,features,label,prediction
0,"[3127.221905557597, -1098.5030441349777, 1070....",6.0,6.0
1,"[-5202.2362898898045, 116.5073389911345, -291....",4.0,4.0
2,"[-1498.8714233595265, 679.1006007327401, 111.6...",5.0,0.0
3,"[-5118.9629494324145, 185.18077070886073, -117...",4.0,4.0
4,"[-1875.9314698926842, 160.99124884573897, -601...",9.0,8.0


In [51]:
SVC_acc = evaluator.evaluate(OVR_pred)
SVC_acc

0.35814814814814816

In [53]:
dsift_feat_train = pickle.load(open('dsift_train_reduce.pickle', 'rb'))
dsift_feat_test = pickle.load(open('dsift_test_reduce.pickle', 'rb'))
dsift_labels_train = pickle.load(open('dsift_train_labels.pickle', 'rb'))
dsift_labels_test = pickle.load(open('dsift_test_labels.pickle', 'rb'))

dsift_train = pd.concat((pd.DataFrame(dsift_labels_train, columns = ['label']), 
                         pd.DataFrame(dsift_feat_train)), axis = 1)
dsift_test = pd.concat((pd.DataFrame(dsift_labels_test, columns = ['label']), 
                        pd.DataFrame(dsift_feat_test)), axis = 1)

dsift_train = spark.createDataFrame(dsift_train)
dsift_test = spark.createDataFrame(dsift_test)

In [54]:
dsift_features = dsift_train.schema.names[1:]  # feature columns
dsift_assembler = VectorAssembler(inputCols = dsift_features, outputCol = 'features')
dsift_train_condense = dsift_assembler.transform(dsift_train)
dsift_test_condense = dsift_assembler.transform(dsift_test)

for field in dsift_features:
    dsift_train_condense = dsift_train_condense.drop(field)
    dsift_test_condense  = dsift_test_condense.drop(field)

dsift_si = StringIndexer(inputCol = 'label', outputCol = 'label_float')
dsift_si_model = dsift_si.fit(dsift_train_condense)
dsift_train_condense = dsift_si_model.transform(dsift_train_condense)
dsift_test_condense = dsift_si_model.transform(dsift_test_condense)
dsift_train_condense = dsift_train_condense.drop('label').withColumnRenamed('label_float', 'label')
dsift_test_condense = dsift_test_condense.drop('label').withColumnRenamed('label_float', 'label')

In [55]:
preview(dsift_train_condense)

Unnamed: 0,features,label
0,"[317.81890869140625, 997.9087524414062, -400.8...",1.0
1,"[-340.1216125488281, -42.97116470336914, 177.8...",4.0
2,"[-83.17477416992188, -468.1846008300781, 117.6...",2.0
3,"[-226.61183166503906, -193.66929626464844, -53...",0.0
4,"[249.69735717773438, 397.8346252441406, -909.8...",6.0


In [56]:
tree2 = DecisionTreeClassifier(maxDepth = 5, minInstancesPerNode = 5).fit(dsift_train_condense)
tree2_pred = tree2.transform(dsift_test_condense)
dsift_accuracy = evaluator.evaluate(tree2_pred)
dsift_accuracy

0.3131481481481482

In [57]:
RF = RandomForestClassifier(maxDepth = 5, numTrees = 100).fit(train_condense)
RF_pred = RF.transform(test_condense)
RF_acc = evaluator.evaluate(RF_pred)
RF_acc

0.5187037037037037

In [58]:
RF2 = RandomForestClassifier(maxDepth = 5, numTrees = 100).fit(dsift_train_condense)
RF2_pred = RF.transform(dsift_test_condense)
RF2_acc = evaluator.evaluate(RF2_pred)
RF2_acc

0.16203703703703703

In [59]:
accuracy = MulticlassClassificationEvaluator(metricName = 'accuracy')
precision = MulticlassClassificationEvaluator(metricName = 'weightedPrecision')
recall = MulticlassClassificationEvaluator(metricName = 'weightedRecall')
f1 = MulticlassClassificationEvaluator(metricName = 'f1')

metrics = [accuracy, precision, recall, f1]
metric_labels = ['accuracy', 'precision', 'recall', 'f1']

eval_list = list()

# the predictions from each model
predictions = [tree_pred, OVR_pred, RF_pred, tree2_pred, RF2_pred]
model_labels = ['tree', 'SVM', 'RF', 'tree_dsift', 'RF_dsift']

# for each model's predictions, calculate error metrics and add to a Pandas Series
for pred in zip(model_labels, predictions):
    name = pred[0]
    predict = pred[1]
    
    metric_vals = pd.Series(dict([(x[0], x[1].evaluate(predict)) 
                                  for x in zip(metric_labels, metrics)]),
                            name = name)
    eval_list.append(metric_vals)
    
# combine all Series into a DataFrame
eval_df = pd.concat(eval_list, axis = 1).T
eval_df = eval_df[metric_labels]
eval_df

Unnamed: 0,accuracy,precision,recall,f1
tree,0.46537,0.487072,0.46537,0.456837
SVM,0.358148,0.300723,0.358148,0.302938
RF,0.518704,0.547267,0.518704,0.46237
tree_dsift,0.313148,0.262925,0.313148,0.281069
RF_dsift,0.162037,0.099598,0.162037,0.09277


In [None]:
RF_model = RandomForestClassifier(numTrees = 100)

paramgrid = (ParamGridBuilder().addGrid(RF_model.subsamplingRate, [0.5, 1])\
                               .addGrid(RF_model.maxDepth, [5, 10])\
                               .addGrid(RF_model.minInstancesPerNode, [1, 5]).build())

# use 5-fold cross validation 
crossval = CrossValidator(estimator = RF_model, estimatorParamMaps = paramgrid, 
                          evaluator = accuracy, numFolds = 3)

RF_model_cv = crossval.fit(train_condense)

In [4]:
spark.stop()