In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate(SparkConf().setMaster('local[*]'))
spark = SparkSession.builder.getOrCreate()

In [2]:
#!wget https://github.com/IBM/coursera/blob/master/coursera_ml/shake.parquet?raw=true
#!move shake.parquet?raw=true shake.parquet

In [3]:
df = spark.read.parquet('shake.parquet')
df.show(5)

+-----+---------+-----+-----+-----+
|CLASS| SENSORID|    X|    Y|    Z|
+-----+---------+-----+-----+-----+
|    2| qqqqqqqq| 0.12| 0.12| 0.12|
|    2|aUniqueID| 0.03| 0.03| 0.03|
|    2| qqqqqqqq|-3.84|-3.84|-3.84|
|    2| 12345678| -0.1| -0.1| -0.1|
|    2| 12345678|-0.15|-0.15|-0.15|
+-----+---------+-----+-----+-----+
only showing top 5 rows



In [4]:
#!pip install pixiedust

In [10]:
import pixiedust
display(df)

CLASS,SENSORID,X,Y,Z
2,12345678,0.47,0.47,0.47
2,bcbcbcbc,0.58,0.58,0.58
2,qqqqqqqq,-0.9,-0.9,-0.9
2,pj123456,-0.14,-0.14,-0.14
2,gholi,7.56,7.56,7.56
2,Bbbbbbb,-0.1,-0.1,-0.1
2,qqqqqqqq,-0.29,-0.29,-0.29
2,Bbbbbbb,-0.49,-0.49,-0.49
2,Bbbbbbb,-0.02,-0.02,-0.02
2,12345,0.18,0.18,0.18


In [5]:
# Now We’ll use Apache SystemML to implement Discrete Fourier Transformation (DFF)
from systemml import MLContext, dml

df.createOrReplaceTempView('df')
ml = MLContext(spark)

In [6]:
# dml script for systemml DFF

dml_script = '''
PI = 3.141592654
N = nrow(signal)

n = seq(0, N-1, 1)
k = seq(0, N-1, 1)

M = (n %*% t(k))*(2*PI/N)

Xa = cos(M) %*% signal
Xb = sin(M) %*% signal

DFT = cbind(Xa, Xb)
'''

Now it’s time to create a function which takes a single row Apache Spark data frame as argument (the one containing the accelerometer measurement time series for one axis) and returns the Fourier transformation of it. In addition, we are adding an index column for later joining all axis together and renaming the columns to appropriate names. The result of this function is an Apache Spark DataFrame containing the Fourier Transformation of its input in two columns.

In [7]:
from pyspark.sql.functions import monotonically_increasing_id

def dft_systemml(signal,name):
    prog = dml(dml_script).input('signal', signal).output('DFT')
    
    return (

    #execute the script inside the SystemML engine running on top of Apache Spark
    ml.execute(prog) 
     
         #read result from SystemML execution back as SystemML Matrix
        .get('DFT') 
     
         #convert SystemML Matrix to ApacheSpark DataFrame 
        .toDF() 
     
         #rename default column names
        .selectExpr('C1 as %sa' % (name), 'C2 as %sb' % (name)) 
     
         #add unique ID per row for later joining
        .withColumn("id", monotonically_increasing_id())
    )

Now it’s time to create individual DataFrames containing only a subset of the data. We filter simultaneously for accelerometer each sensor axis and one for each class. This means you’ll get 6 DataFrames. Please implement this using the relational API of DataFrames or SparkSQL. Please use class 1 and 2

In [8]:
#use sql API
x1 = spark.sql('Select X from df where class==1')
y1 = spark.sql('Select Y from df where class==1')
z1 = spark.sql('Select Z from df where class==1')
x2 = spark.sql('Select X from df where class==2')
y2 = spark.sql('Select Y from df where class==2')
z2 = spark.sql('Select Z from df where class==2')

Since we’ve created this cool DFT function before, we can just call it for each of the 6 DataFrames now. And since the result of this function call is a DataFrame again we can use the pyspark best practice in simply calling methods on it sequentially. So what we are doing is the following:

Calling DFT for each class and accelerometer sensor axis.
Joining them together on the ID column.
Re-adding a column containing the class index.
Stacking both Dataframes for each classes together

In [9]:
from pyspark.sql.functions import lit

df_class_1 = dft_systemml(x1,'x') \
    .join(dft_systemml(y1,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z1,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(0))
    
df_class_2 = dft_systemml(x2,'x') \
    .join(dft_systemml(y2,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z2,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(1))

df_dft = df_class_1.union(df_class_2)


SystemML Statistics:
Total execution time:		0.525 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.103 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.093 sec.
Number of executed Spark inst:	0.


[Stage 17:>                                                         (0 + 8) / 8]
                                                                                
[Stage 19:>                                                         (0 + 8) / 8]
SystemML Statistics:
Total execution time:		7.161 sec.
Number of executed Spark inst:	6.

                                                                                

[Stage 25:>                                                         (0 + 8) / 8]
[Stage 27:>                                                         (0 + 8) / 8]
SystemML Statistics:
Total execution time:		6.674 sec.
Number of executed Spark inst:	6.

                                                            

In [11]:
display(df_dft)

id,xa,xb,ya,yb,za,zb,class
51539607553.0,0.0662358667786167,-0.0192202494209549,0.0662358667786167,-0.0192202494209549,0.0662358667786167,-0.0192202494209549,0.0
42949672967.0,-0.0399786943338758,-0.0317286648233335,-0.0399786943338758,-0.0317286648233335,-0.0399786943338758,-0.0317286648233335,0.0
42949673632.0,8.89677287014402,-216.54129947668335,8.89677287014402,-216.54129947668335,8.89677287014402,-216.54129947668335,1.0
8589935056.0,1187.9539805064012,563.6584273901925,1187.9539805064012,563.6584273901925,1187.9539805064012,563.6584273901925,1.0
34359739502.0,-953.6465018650664,-1491.9519172762805,-953.6465018650664,-1491.9519172762805,-953.6465018650664,-1491.9519172762805,1.0
42949673244.0,120.40213878935188,803.4354520952852,120.40213878935188,803.4354520952852,120.40213878935188,803.4354520952852,1.0
42949673319.0,713.2374606778315,-158.9697345577231,713.2374606778315,-158.9697345577231,713.2374606778315,-158.9697345577231,1.0
17179869241.0,738.5296749656382,-856.8975782132882,738.5296749656382,-856.8975782132882,738.5296749656382,-856.8975782132882,1.0
25769804504.0,480.5459018054239,-382.4621761449661,480.5459018054239,-382.4621761449661,480.5459018054239,-382.4621761449661,1.0
243.0,1371.328195253221,1029.6495340552196,1371.328195253221,1029.6495340552196,1371.328195253221,1029.6495340552196,1.0


Now use machine learning classification

In [27]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier

assemb = VectorAssembler(inputCols=['xa','xb','ya','yb','za','zb'],outputCol='features')
gtbc = GBTClassifier(labelCol='class',featuresCol='features',maxIter=10)
pipeline = Pipeline(stages=[assemb,gtbc])
model = pipeline.fit(df_dft)
prediction = model.transform(df_dft)
prediction.show(5)

+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|         id|                  xa|                  xb|                  ya|                  yb|                  za|                  zb|class|            features|       rawPrediction|         probability|prediction|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|17179869191|-0.03750866686245739|-0.00607638770223...|-0.03750866686245739|-0.00607638770223...|-0.03750866686245739|-0.00607638770223...|    0|[-0.0375086668624...|[0.83002855223973...|[0.84024566850429...|       0.0|
| 8589934592| -0.0213952544564988| -0.1156058071676539| -0.0213952544564988| -0.1156058071676539| -0.0213952544564988| -

In [28]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("class")
    
binEval.evaluate(prediction)

0.9981761070017225