<a href="https://colab.research.google.com/github/mridul-eecs/signal-processing-apachesparkml-apachesystemml/blob/master/KmeansSparkML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Preprocessing to work with spark

In [1]:

# spark dependencies:
# citation: http://medium.com/@rmache/big-data-with-spark-in-google-colab-7c046e24b3
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apachemirror.wuchna.com/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

!pip install -q findspark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 61kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 49.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=509b28298180f529865da2c474e39e54e07fd3ea8833dc8947c7232be7542392
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


In [2]:
!git clone https://github.com/mridul-eecs/signal-processing-apachesparkml-apachesystemml.git

Cloning into 'signal-processing-apachesparkml-apachesystemml'...
remote: Enumerating objects: 44, done.[K
remote: Counting objects:   2% (1/44)[Kremote: Counting objects:   4% (2/44)[Kremote: Counting objects:   6% (3/44)[Kremote: Counting objects:   9% (4/44)[Kremote: Counting objects:  11% (5/44)[Kremote: Counting objects:  13% (6/44)[Kremote: Counting objects:  15% (7/44)[Kremote: Counting objects:  18% (8/44)[Kremote: Counting objects:  20% (9/44)[Kremote: Counting objects:  22% (10/44)[Kremote: Counting objects:  25% (11/44)[Kremote: Counting objects:  27% (12/44)[Kremote: Counting objects:  29% (13/44)[Kremote: Counting objects:  31% (14/44)[Kremote: Counting objects:  34% (15/44)[Kremote: Counting objects:  36% (16/44)[Kremote: Counting objects:  38% (17/44)[Kremote: Counting objects:  40% (18/44)[Kremote: Counting objects:  43% (19/44)[Kremote: Counting objects:  45% (20/44)[Kremote: Counting objects:  47% (21/44)[Kremote: Counting obj

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType
from pyspark.sql.functions import lit
import os
from tqdm import tqdm_notebook as tqdm

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"


import findspark
findspark.init()

APP_NAME= "Human Motion Premitives"
SPARK_URL= "local[*]"
RANDOM_SEED = 141109
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 8
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

In [0]:
spark= SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

### Main program

In [0]:
datapath= '/content/signal-processing-apachesparkml-apachesystemml/df.parquet'
df= spark.read.parquet(datapath)

In [0]:
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, Normalizer

vectorizer= VectorAssembler(inputCols= ['x', 'y', 'z'], outputCol= 'features')
kmeans= KMeans().setK(13).setSeed(1)
pipe= Pipeline(stages= [vectorizer, kmeans])

model= pipe.fit(df)


In [15]:
model.transform(df)

DataFrame[x: int, y: int, z: int, class: string, features: vector, prediction: int]

In [0]:
wssse= model.stages[-1].computeCost(vectorizer.transform(df))

In [18]:
print(wssse)

15827497.774103818


In [24]:
# considering only 2 classes 
df= df.createOrReplaceTempView('df')
bindf= spark.sql("""select * from df where df.class in ('Use_telephone', 'Standup_chair')""")
bindf.show()

+---+---+---+-------------+
|  x|  y|  z|        class|
+---+---+---+-------------+
| 14| 46| 31|Standup_chair|
| 49| 24| 40|Use_telephone|
|  7| 30| 17|Standup_chair|
| 16| 41| 44|Standup_chair|
| 34| 43| 44|Use_telephone|
| 14| 40| 33|Standup_chair|
| 14| 40| 33|Standup_chair|
| 14| 40| 33|Standup_chair|
| 14| 40| 33|Standup_chair|
| 14| 40| 33|Standup_chair|
| 14| 40| 33|Standup_chair|
| 14| 40| 33|Standup_chair|
| 44| 31| 50|Use_telephone|
| 44| 31| 50|Use_telephone|
| 12| 30| 33|Standup_chair|
| 29| 41| 51|Standup_chair|
| 29| 41| 51|Standup_chair|
| 29| 41| 51|Standup_chair|
| 29| 41| 51|Standup_chair|
| 25| 36| 44|Use_telephone|
+---+---+---+-------------+
only showing top 20 rows



In [25]:
vectorizer= VectorAssembler(inputCols= ['x', 'y', 'z'], outputCol= 'features')
kmeans= KMeans().setK(2).setSeed(1)
pipe= Pipeline(stages= [vectorizer, kmeans])

model= pipe.fit(bindf)
pred= model.transform(bindf)
model.stages[-1].computeCost(vectorizer.transform(bindf))

4478194.572658991