In [7]:
import pyspark
import os
import glob

In [3]:
!ls hmp_data/Brush_teeth/

Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt
Accelerometer-2011-04-11-13-29-54-brush_teeth-f1.txt
Accelerometer-2011-05-30-08-35-11-brush_teeth-f1.txt
Accelerometer-2011-05-30-09-36-50-brush_teeth-f1.txt
Accelerometer-2011-05-30-10-34-16-brush_teeth-m1.txt
Accelerometer-2011-05-30-21-10-57-brush_teeth-f1.txt
Accelerometer-2011-05-30-21-55-04-brush_teeth-m2.txt
Accelerometer-2011-05-31-15-16-47-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-42-22-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-45-50-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-45-27-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-48-05-brush_teeth-f1.txt


In [4]:
# first define the schema
from pyspark.sql.types import StructField, StructType, IntegerType

In [5]:
schema = StructType([
    StructField('x', IntegerType(), True),
    StructField('y', IntegerType(), True),
    StructField('z', IntegerType(), True)
])

In [6]:
schema

StructType(List(StructField(x,IntegerType,true),StructField(y,IntegerType,true),StructField(z,IntegerType,true)))

In [13]:
class_folders = glob.glob('hmp_data/*_*')
class_folders

['hmp_data/Comb_hair',
 'hmp_data/Brush_teeth',
 'hmp_data/Climb_stairs',
 'hmp_data/Standup_chair',
 'hmp_data/Eat_soup',
 'hmp_data/Use_telephone',
 'hmp_data/Sitdown_chair',
 'hmp_data/Descend_stairs',
 'hmp_data/Getup_bed',
 'hmp_data/Eat_meat',
 'hmp_data/Pour_water',
 'hmp_data/Drink_glass',
 'hmp_data/Liedown_bed']

In [19]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [38]:
# we can import a literal to deal with data type for class
from pyspark.sql.functions import lit

In [55]:
def read_df(list_of_files, class_name):
    df = spark.read \
            .option("header", "false") \
            .option("delimiter", " ") \
            .csv(list_of_files, schema=schema)
    
    df = df.withColumn("class", lit(class_name)).withColumn("source", lit(list_of_files[0]))
    
    return df

df = None

for cat in class_folders:
    print(cat)
    
    tempdf = read_df(glob.glob(cat), class_name=cat.split("/")[1])
    
    if df == None:
        df = tempdf
    else:
        df = df.union(tempdf)

hmp_data/Comb_hair
hmp_data/Brush_teeth
hmp_data/Climb_stairs
hmp_data/Standup_chair
hmp_data/Eat_soup
hmp_data/Use_telephone
hmp_data/Sitdown_chair
hmp_data/Descend_stairs
hmp_data/Getup_bed
hmp_data/Eat_meat
hmp_data/Pour_water
hmp_data/Drink_glass
hmp_data/Liedown_bed


In [56]:
df.count()

354275

In [58]:
df.groupBy("class").count().show()

+--------------+-----+
|         class|count|
+--------------+-----+
| Use_telephone|15225|
| Standup_chair|25417|
|      Eat_meat|31236|
|     Getup_bed|45801|
|   Drink_glass|42792|
|    Pour_water|41673|
|     Comb_hair|23504|
|  Climb_stairs|40258|
| Sitdown_chair|25036|
|   Liedown_bed|11446|
|Descend_stairs|15375|
|   Brush_teeth|29829|
|      Eat_soup| 6683|
+--------------+-----+



In [59]:
df.show(5)

+---+---+---+---------+------------------+
|  x|  y|  z|    class|            source|
+---+---+---+---------+------------------+
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|
| 35| 36| 52|Comb_hair|hmp_data/Comb_hair|
| 34| 37| 53|Comb_hair|hmp_data/Comb_hair|
| 35| 37| 51|Comb_hair|hmp_data/Comb_hair|
+---+---+---+---------+------------------+
only showing top 5 rows



In [60]:
# string index the class column

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="class", outputCol="classIndex")

indexed_df = indexer.fit(df).transform(df)

indexed_df.show()

+---+---+---+---------+------------------+----------+
|  x|  y|  z|    class|            source|classIndex|
+---+---+---+---------+------------------+----------+
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|
| 35| 36| 52|Comb_hair|hmp_data/Comb_hair|       8.0|
| 34| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|
| 35| 37| 51|Comb_hair|hmp_data/Comb_hair|       8.0|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|
| 33| 38| 51|Comb_hair|hmp_data/Comb_hair|       8.0|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|
| 35| 37| 51|Comb_hair|hmp_data/Comb_hair|       8.0|
| 33| 39| 52|Comb_hair|hmp_data/Comb_hair|       8.0|
| 33| 37| 52|Comb_hair|hmp_data/Comb_hair|       8.0|
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|
| 34| 37| 52|Comb_hair|hmp_data/Comb_hair|       8.0|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|
| 34| 38| 52|Comb_hair|hmp_d

In [62]:
indexed_df.groupBy("classIndex").count().show()

+----------+-----+
|classIndex|count|
+----------+-----+
|       8.0|23504|
|       0.0|45801|
|       7.0|25036|
|       1.0|42792|
|       4.0|31236|
|      11.0|11446|
|       3.0|40258|
|       2.0|41673|
|      10.0|15225|
|       6.0|25417|
|       5.0|29829|
|       9.0|15375|
|      12.0| 6683|
+----------+-----+



In [64]:
# perform one hot encoding (vec field) in spark

from pyspark.ml.feature import OneHotEncoder

In [66]:
encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")

encoded = encoder.transform(indexed_df)

encoded.show()

+---+---+---+---------+------------------+----------+--------------+
|  x|  y|  z|    class|            source|classIndex|   categoryVec|
+---+---+---+---------+------------------+----------+--------------+
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 35| 36| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 34| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 35| 37| 51|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 33| 38| 51|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 35| 37| 51|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 33| 39| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 33| 37| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|
| 33| 37| 53|Comb_hair|hmp_data/Co

In [67]:
encoded.groupBy("categoryVec").count().show()

+---------------+-----+
|    categoryVec|count|
+---------------+-----+
| (12,[0],[1.0])|45801|
| (12,[8],[1.0])|23504|
| (12,[9],[1.0])|15375|
|(12,[10],[1.0])|15225|
| (12,[1],[1.0])|42792|
| (12,[4],[1.0])|31236|
| (12,[3],[1.0])|40258|
| (12,[6],[1.0])|25417|
| (12,[5],[1.0])|29829|
| (12,[2],[1.0])|41673|
| (12,[7],[1.0])|25036|
|(12,[11],[1.0])|11446|
|     (12,[],[])| 6683|
+---------------+-----+



In [68]:
from pyspark.ml.linalg import Vector, Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler

In [69]:
vectorAssembler = VectorAssembler(inputCols=["x", "y", "z"], 
                                 outputCol="features")

features = vectorAssembler.transform(encoded)

In [70]:
features.show()

+---+---+---+---------+------------------+----------+--------------+----------------+
|  x|  y|  z|    class|            source|classIndex|   categoryVec|        features|
+---+---+---+---------+------------------+----------+--------------+----------------+
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[33.0,37.0,53.0]|
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[33.0,37.0,53.0]|
| 35| 36| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[35.0,36.0,52.0]|
| 34| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[34.0,37.0,53.0]|
| 35| 37| 51|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[35.0,37.0,51.0]|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[34.0,38.0,52.0]|
| 33| 38| 51|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[33.0,38.0,51.0]|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[34.0,38.0,52.0]|
| 35| 37| 51|Comb_hair|hmp_data/Comb_hair|       8.0|(

In [71]:
# normalize the data

In [92]:
from pyspark.ml.feature import Normalizer, StandardScaler

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

normalized_data = normalizer.transform(features)

normalized_data.show()

+---+---+---+---------+------------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|    class|            source|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+---------+------------------+----------+--------------+----------------+--------------------+
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[33.0,37.0,53.0]|[0.26829268292682...|
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[33.0,37.0,53.0]|[0.26829268292682...|
| 35| 36| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[35.0,36.0,52.0]|[0.28455284552845...|
| 34| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[34.0,37.0,53.0]|[0.27419354838709...|
| 35| 37| 51|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[35.0,37.0,51.0]|[0.28455284552845...|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[34.0,38.0,52.0]|[0.27419354838709...|
| 33| 38| 51|Comb_hair|hmp_data/Comb_

In [93]:
normalized_data.select("features_norm").rdd.take(5)

[Row(features_norm=DenseVector([0.2683, 0.3008, 0.4309])),
 Row(features_norm=DenseVector([0.2683, 0.3008, 0.4309])),
 Row(features_norm=DenseVector([0.2846, 0.2927, 0.4228])),
 Row(features_norm=DenseVector([0.2742, 0.2984, 0.4274])),
 Row(features_norm=DenseVector([0.2846, 0.3008, 0.4146]))]

In [94]:
x = normalized_data.select("features_norm").rdd.map(lambda x: x[0][0] + x[0][1] + x[0][2])

In [115]:
standScaler = StandardScaler(withMean=True, inputCol="features", outputCol="standardized")

standardized_data = standScaler.fit(features).transform(features)

In [116]:
standardized_data.select("standardized").rdd.take(5)

[Row(standardized=DenseVector([0.4693, -0.1552, 1.145])),
 Row(standardized=DenseVector([0.4693, -0.1552, 1.145])),
 Row(standardized=DenseVector([0.6402, -0.274, 1.0249])),
 Row(standardized=DenseVector([0.5547, -0.1552, 1.145])),
 Row(standardized=DenseVector([0.6402, -0.1552, 0.9048]))]

In [125]:
standardized_data.show()

+---+---+---+---------+------------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|    class|            source|classIndex|   categoryVec|        features|        standardized|
+---+---+---+---------+------------------+----------+--------------+----------------+--------------------+
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[33.0,37.0,53.0]|[0.46928836329523...|
| 33| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[33.0,37.0,53.0]|[0.46928836329523...|
| 35| 36| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[35.0,36.0,52.0]|[0.64019088509803...|
| 34| 37| 53|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[34.0,37.0,53.0]|[0.55473962419663...|
| 35| 37| 51|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[35.0,37.0,51.0]|[0.64019088509803...|
| 34| 38| 52|Comb_hair|hmp_data/Comb_hair|       8.0|(12,[8],[1.0])|[34.0,38.0,52.0]|[0.55473962419663...|
| 33| 38| 51|Comb_hair|hmp_data/Comb_

# ETL

In [126]:
standardized_data.write.parquet("processed_hmp_data.parquet")

In [127]:
!ls -lt

total 1884
drwxr-xr-x  2 xuren xuren  16384 Apr 26 11:25 processed_hmp_data.parquet
-rw-r--r--  1 xuren xuren  25253 Apr 26 11:23 03_01_spark_ml_pipeline.ipynb
drwxr-xr-x 17 xuren xuren   4096 Apr 26 08:49 hmp_data
-rw-r--r--  1 xuren xuren 167239 Apr 25 00:17 a0_m4_exercice3.2_spark2.3_python3.5_cos.ipynb
-rw-r--r--  1 xuren xuren 112048 Apr 25 00:12 washing.parquet
-rw-r--r--  1 xuren xuren 342175 Apr 25 00:07 02_plotting.ipynb
-rw-r--r--  1 xuren xuren 160079 Apr 24 23:03 a0_m4_exercice3.1_spark2.3_python3.5_cos.ipynb
-rw-r--r--  1 xuren xuren   9805 Apr 24 22:35 02_scratch.ipynb
-rw-r--r--  1 xuren xuren  73468 Apr 24 22:24 a6_w2_ex1.ipynb
-rw-r--r--  1 xuren xuren 932997 Apr 24 22:18 hmp.parquet
-rw-r--r--  1 xuren xuren  46391 Apr 24 21:52 02_statistical_moments.ipynb
-rw-r--r--  1 xuren xuren   2724 Apr 24 17:55 a6_w1_ex1.ipynb
-rw-r--r--  1 xuren xuren   5579 Apr 24 17:46 a6_w1_ex3.ipynb
-rw-r--r--  1 xuren xuren   3788 Apr 24 17:46 pyspark0.ipynb
drwxr-xr-x  2 x