* (1) Load features file with Spark
* (2) Use VectorAssembler and StringIndexer to preprocess features and annotation labels
* (3) Run Decision Tree Classifier

In [27]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("YourAppName").getOrCreate()

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer

In [3]:
# Load feature files with Spark
df = spark.read.parquet("../outputs/df_features_all.parquet")

                                                                                

In [4]:
# Quick look on the dataframe
df

DataFrame[subject: string, walk_or_not: bigint, fea_acc_x_mean: double, fea_acc_x_std: double, fea_acc_x_min: double, fea_acc_x_max: double, fea_acc_y_mean: double, fea_acc_y_std: double, fea_acc_y_min: double, fea_acc_y_max: double, fea_acc_z_mean: double, fea_acc_z_std: double, fea_acc_z_min: double, fea_acc_z_max: double, fea_smv_mean: double, fea_smv_std: double, fea_smv_min: double, fea_smv_max: double, __index_level_0__: string]

In [5]:
df.take(1)

[Row(subject='id650857ca', walk_or_not=1, fea_acc_x_mean=-0.01985191347753744, fea_acc_x_std=0.7305734007458594, fea_acc_x_min=-2.879, fea_acc_x_max=3.035, fea_acc_y_mean=1.2901231281198005, fea_acc_y_std=0.564906663897679, fea_acc_y_min=0.109, fea_acc_y_max=2.793, fea_acc_z_mean=0.18915640599001665, fea_acc_z_std=0.43573244950368045, fea_acc_z_min=-1.8360000000000003, fea_acc_z_max=1.352, fea_smv_mean=1.5075437010309913, fea_smv_std=0.6860202037338495, fea_smv_min=0.40363349712331853, fea_smv_max=3.4315750028230476, __index_level_0__='id650857ca_AnkleL_1_Walking_100_1')]

In [6]:
df.show(3)

+----------+-----------+--------------------+------------------+-------------+-------------+------------------+------------------+-------------+-------------+-------------------+-------------------+-------------------+-------------+------------------+------------------+-------------------+------------------+--------------------+
|   subject|walk_or_not|      fea_acc_x_mean|     fea_acc_x_std|fea_acc_x_min|fea_acc_x_max|    fea_acc_y_mean|     fea_acc_y_std|fea_acc_y_min|fea_acc_y_max|     fea_acc_z_mean|      fea_acc_z_std|      fea_acc_z_min|fea_acc_z_max|      fea_smv_mean|       fea_smv_std|        fea_smv_min|       fea_smv_max|   __index_level_0__|
+----------+-----------+--------------------+------------------+-------------+-------------+------------------+------------------+-------------+-------------+-------------------+-------------------+-------------------+-------------+------------------+------------------+-------------------+------------------+--------------------+
|id6508

In [7]:
df.columns

['subject',
 'walk_or_not',
 'fea_acc_x_mean',
 'fea_acc_x_std',
 'fea_acc_x_min',
 'fea_acc_x_max',
 'fea_acc_y_mean',
 'fea_acc_y_std',
 'fea_acc_y_min',
 'fea_acc_y_max',
 'fea_acc_z_mean',
 'fea_acc_z_std',
 'fea_acc_z_min',
 'fea_acc_z_max',
 'fea_smv_mean',
 'fea_smv_std',
 'fea_smv_min',
 'fea_smv_max',
 '__index_level_0__']

In [15]:
# Select features columns
all_columns = df.columns
columns_to_remove = ['subject', 'walk_or_not', '__index_level_0__']
features_selected = list(filter(lambda x: x not in columns_to_remove, all_columns))

['fea_acc_x_mean',
 'fea_acc_x_std',
 'fea_acc_x_min',
 'fea_acc_x_max',
 'fea_acc_y_mean',
 'fea_acc_y_std',
 'fea_acc_y_min',
 'fea_acc_y_max',
 'fea_acc_z_mean',
 'fea_acc_z_std',
 'fea_acc_z_min',
 'fea_acc_z_max',
 'fea_smv_mean',
 'fea_smv_std',
 'fea_smv_min',
 'fea_smv_max']

In [35]:
# Preprocess feature files

vectorAssembler = VectorAssembler(inputCols=features_selected, outputCol="features")
v_df = vectorAssembler.transform(df)

indexer = StringIndexer(inputCol="walk_or_not", outputCol="label")
iv_df = indexer.fit(df).transform(v_df)

In [36]:
iv_df

DataFrame[subject: string, walk_or_not: bigint, fea_acc_x_mean: double, fea_acc_x_std: double, fea_acc_x_min: double, fea_acc_x_max: double, fea_acc_y_mean: double, fea_acc_y_std: double, fea_acc_y_min: double, fea_acc_y_max: double, fea_acc_z_mean: double, fea_acc_z_std: double, fea_acc_z_min: double, fea_acc_z_max: double, fea_smv_mean: double, fea_smv_std: double, fea_smv_min: double, fea_smv_max: double, __index_level_0__: string, features: vector, label: double]

In [37]:
# Split data
splits = iv_df.randomSplit([0.6, 0.4], 1)  # Split to two groups: 60% and 40%; seed = 1
train_df = splits[0]
test_df = splits[1]

print(iv_df.count())
print(train_df.count())
print(test_df.count())

13730
8245
5485


# Run with Decision Tree

In [38]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [39]:
dt_model = dt.fit(train_df)

In [40]:
dt_predictions = dt_model.transform(test_df)

In [52]:
def dt_evaluator_metricName(metric_name):
    dt_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName=metric_name)
    dt_value = dt_evaluator.evaluate(dt_predictions)
    print(metric_name, ':', dt_value)

In [70]:
metric_name = "f1"                         # Calculates the F1 score, which is the harmonic mean of precision and recall.
dt_evaluator_metricName(metric_name)

metric_name = "accuracy"                   # Calculates the accuracy of the classification.
dt_evaluator_metricName(metric_name)

print("-----------------------------------")

metric_name = "weightedPrecision"          # Calculates the weighted precision, which considers class imbalance by weighting the precision of each class by the number of true instances for that class.
dt_evaluator_metricName(metric_name)

metric_name = "weightedRecall"             # Calculates the weighted recall, which considers class imbalance by weighting the recall of each class by the number of true instances for that class.
dt_evaluator_metricName(metric_name)

metric_name = "weightedTruePositiveRate"   # Calculates the weighted true positive rate, which is the true positive rate of the model weighted by the number of true instances for each label.
dt_evaluator_metricName(metric_name)

metric_name = "weightedFalsePositiveRate"  # Calculates the weighted false positive rate, which is the false positive rate of the model weighted by the number of true instances for each label.
dt_evaluator_metricName(metric_name)

metric_name = "weightedFMeasure"           # Calculates the weighted F1 score, which balances precision and recall, considering class imbalance.
dt_evaluator_metricName(metric_name)

print("-----------------------------------")

metric_name = "truePositiveRateByLabel"    # Calculates the true positive rate for each label.
dt_evaluator_metricName(metric_name)

metric_name = "falsePositiveRateByLabel"   # Calculates the false positive rate for each label.
dt_evaluator_metricName(metric_name)

metric_name = "precisionByLabel"           #  Calculates the precision for each label.
dt_evaluator_metricName(metric_name)

metric_name = "recallByLabel"              # Calculates the recall for each label.
dt_evaluator_metricName(metric_name)

metric_name = "fMeasureByLabel"            #  Calculates the F1 score for each label.
dt_evaluator_metricName(metric_name)

print("-----------------------------------")

metric_name = "logLoss"                    # Calculates the logarithmic loss.
dt_evaluator_metricName(metric_name)

metric_name = "hammingLoss"                # Calculates the hamming loss.
dt_evaluator_metricName(metric_name)

f1 : 0.9018715455734343
accuracy : 0.897538742023701
-----------------------------------
weightedPrecision : 0.9117710819436946
weightedRecall : 0.897538742023701
weightedTruePositiveRate : 0.897538742023701
weightedFalsePositiveRate : 0.12875021665622827
weightedFMeasure : 0.9018715455734343
-----------------------------------
truePositiveRateByLabel : 0.9056306306306307
falsePositiveRateByLabel : 0.1368421052631579
precisionByLabel : 0.9656580211335255
recallByLabel : 0.9056306306306307
fMeasureByLabel : 0.9346815434681544
-----------------------------------
logLoss : 0.2821881718530204
hammingLoss : 0.102461257976299
