In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType

schema = StructType([
    StructField('x', IntegerType(), True),
    StructField('y', IntegerType(), True),
    StructField('z', IntegerType(), True),
])

In [3]:
import os

In [4]:
file_list_filtered = [s for s in os.listdir('HMP_Dataset') if '_' in s]

In [5]:
file_list_filtered

['Brush_teeth',
 'Climb_stairs',
 'Comb_hair',
 'Descend_stairs',
 'Drink_glass',
 'Eat_meat',
 'Eat_soup',
 'Getup_bed',
 'Liedown_bed',
 'Pour_water',
 'Sitdown_chair',
 'Standup_chair',
 'Use_telephone']

In [16]:
df = None

from pyspark.sql.functions import lit

for category in file_list_filtered[:3]:
    data_files = os.listdir('HMP_Dataset/' + category)
    for data_file in data_files:
        print(data_file)
        temp_df = spark \
        .read \
        .option('header', 'false') \
        .option('delimiter', ' ') \
        .csv('HMP_Dataset/' + category + '/' + data_file, schema=schema)
        
        temp_df = temp_df.withColumn('class', lit(category)) # create column class contains category
        
        temp_df = temp_df.withColumn('source', lit(data_file)) # create column class contains source
        
        if df is None:
            df = temp_df
        else:
            df = df.union(temp_df)

Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt
Accelerometer-2011-04-11-13-29-54-brush_teeth-f1.txt
Accelerometer-2011-05-30-08-35-11-brush_teeth-f1.txt
Accelerometer-2011-05-30-09-36-50-brush_teeth-f1.txt
Accelerometer-2011-05-30-10-34-16-brush_teeth-m1.txt
Accelerometer-2011-05-30-21-10-57-brush_teeth-f1.txt
Accelerometer-2011-05-30-21-55-04-brush_teeth-m2.txt
Accelerometer-2011-05-31-15-16-47-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-42-22-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-45-50-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-45-27-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-48-05-brush_teeth-f1.txt
Accelerometer-2011-03-24-10-24-39-climb_stairs-f1.txt
Accelerometer-2011-03-24-10-25-44-climb_stairs-f1.txt
Accelerometer-2011-03-29-09-55-46-climb_stairs-f1.txt
Accelerometer-2011-04-05-18-21-22-climb_stairs-f1.txt
Accelerometer-2011-04-05-18-32-29-climb_stairs-f1.txt
Accelerometer-2011-04-11-11-44-35-climb_stairs-f1.txt
Accelerometer-2011-04-11-11-57-50-climb_

In [17]:
df.show()

+---+---+---+-----------+--------------------+
|  x|  y|  z|      class|              source|
+---+---+---+-----------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 20| 51| 35|Brush_teeth|Accelerometer-201...|
| 18| 49| 34|Brush_teeth|Accelerometer-201...|
| 19| 48| 34|Brush_teeth|Accelerometer-201...|
| 16| 53| 34|Brush_teeth|Accelerometer-201...|
| 18| 52| 35|

In [18]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [19]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Normalizer


indexer = StringIndexer(inputCol='class', outputCol='label')

vectorAssembler = VectorAssembler(inputCols=['x', 'y', 'z'],
                                 outputCol='features')

normalizer = Normalizer(inputCol='features', outputCol='features_norm', p=1.0)

In [20]:
from pyspark.ml.classification import LogisticRegression

In [21]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [22]:
from pyspark.ml import Pipeline

In [23]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [26]:
model = pipeline.fit(df_train)

In [27]:
prediction = model.transform(df_train)

In [28]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [29]:
ev = MulticlassClassificationEvaluator().setMetricName('accuracy').setLabelCol('label').setPredictionCol('prediction')

In [30]:
ev.evaluate(prediction)

0.4563007045260461

In [31]:
model = pipeline.fit(df_test)

In [32]:
prediction = model.transform(df_test)

In [34]:
ev.evaluate(prediction)

0.4466670241861962