The purpose of this assignment is to learn how feature engineering improves model performance. 

Stpes:  
1) Apply Discrete Fourier Transformation on the accelerometer sensor time series.    
2) Transform the dataset from the time to the frequency domain.  
3) Use a classification algorithm to create a model and submit the new predictions.  


In [1]:
#your cloudant credentials go here
# @hidden_cell
# @hidden_cell
credentials_1 = {
  'password':"""""",
  'custom_url':'',
  'username':'',
  'url':'https://undefined'
}


Let's create a SparkSession object and put the Cloudant credentials into it

In [2]:
spark = SparkSession\
    .builder\
    .appName("Cloudant Spark SQL Example in Python using temp tables")\
    .config("cloudant.host",credentials_1['custom_url'].split('@')[1])\
    .config("cloudant.username", credentials_1['username'])\
    .config("cloudant.password",credentials_1['password'])\
    .config("jsonstore.rdd.partitions", 1)\
    .getOrCreate()

Now it’s time to read the sensor data and create a temporary query table.

In [3]:
df=spark.read.load('shake_classification', "org.apache.bahir.cloudant")
df.createOrReplaceTempView("df")

We need to make sure SystemML is installed.


In [4]:
!pip install systemml

Collecting systemml
[?25l  Downloading https://files.pythonhosted.org/packages/b1/94/62104cb8c526b462cd501c7319926fb81ac9a5668574a0b3407658a506ab/systemml-1.2.0.tar.gz (9.7MB)
[K    100% |████████████████████████████████| 9.7MB 3.5MB/s eta 0:00:01
Building wheels for collected packages: systemml
  Running setup.py bdist_wheel for systemml ... [?25ldone
[?25h  Stored in directory: /gpfs/fs01/user/s2f3-1c0783c216973a-3e2ac3e59e61/.cache/pip/wheels/cf/07/79/b3ed6f12afe06b2ab55d60dcfd62e66240f5d8c6088a518177
Successfully built systemml
[31mnotebook 5.0.0 requires nbconvert, which is not installed.[0m
[31mipywidgets 6.0.0 requires widgetsnbextension~=2.0.0, which is not installed.[0m
[31mtensorflow 1.3.0 requires tensorflow-tensorboard<0.2.0,>=0.1.0, which is not installed.[0m
Installing collected packages: systemml
Successfully installed systemml-1.2.0


We’ll use Apache SystemML to implement Discrete Fourier Transformation. This way all computation continues to happen on the Apache Spark cluster for advanced scalability and performance.

In [5]:
from systemml import MLContext, dml
ml = MLContext(spark)

As you’ve learned from the lecture, implementing Discrete Fourier Transformation in a linear algebra programming language is simple. Apache SystemML DML is such a language and as you can see the implementation is straightforward and doesn’t differ too much from the mathematical definition (Just note that the sum operator has been swapped with a vector dot product using the %*% syntax borrowed from R
):

<img style="float: left;" src="https://wikimedia.org/api/rest_v1/media/math/render/svg/1af0a78dc50bbf118ab6bd4c4dcc3c4ff8502223">



In [6]:
dml_script = '''
PI = 3.141592654
N = nrow(signal)

n = seq(0, N-1, 1)
k = seq(0, N-1, 1)

M = (n %*% t(k))*(2*PI/N)

Xa = cos(M) %*% signal
Xb = sin(M) %*% signal

DFT = cbind(Xa, Xb)
'''

Now it’s time to create a function which takes a single row Apache Spark data frame as argument (the one containing the accelerometer measurement time series for one axis) and returns the Fourier transformation of it. In addition, we are adding an index column for later joining all axis together and renaming the columns to appropriate names. The result of this function is an Apache Spark DataFrame containing the Fourier Transformation of its input in two columns. 


In [7]:
from pyspark.sql.functions import monotonically_increasing_id

def dft_systemml(signal,name):
    prog = dml(dml_script).input('signal', signal).output('DFT')
    
    return (

    #execute the script inside the SystemML engine running on top of Apache Spark
    ml.execute(prog) 
     
         #read result from SystemML execution back as SystemML Matrix
        .get('DFT') 
     
         #convert SystemML Matrix to ApacheSpark DataFrame 
        .toDF() 
     
         #rename default column names
        .selectExpr('C1 as %sa' % (name), 'C2 as %sb' % (name)) 
     
         #add unique ID per row for later joining
        .withColumn("id", monotonically_increasing_id())
    )
        




In [11]:
df.show()

+-----+--------+-----+-----+-----+--------------------+--------------------+
|CLASS|SENSORID|    X|    Y|    Z|                 _id|                _rev|
+-----+--------+-----+-----+-----+--------------------+--------------------+
|    0|velqwert|  0.0|  0.0|  0.0|020533cb000ab3de5...|1-f5e2391b0458648...|
|    0|velqwert|  0.0|  0.0|  0.0|020533cb000ab3de5...|1-f5e2391b0458648...|
|    0|velqwert|-0.01|-0.01|-0.01|020533cb000ab3de5...|1-df37baeaa4c3926...|
|    0|velqwert|  0.0|  0.0|  0.0|020533cb000ab3de5...|1-f5e2391b0458648...|
|    0|velqwert|  0.0|  0.0|  0.0|020533cb000ab3de5...|1-f5e2391b0458648...|
|    0|velqwert| 0.01| 0.01| 0.01|020533cb000ab3de5...|1-3e653d871d6c029...|
|    0|velqwert|-0.02|-0.02|-0.02|020533cb000ab3de5...|1-f3902329a0f1765...|
|    0|velqwert|  0.0|  0.0|  0.0|020533cb000ab3de5...|1-f5e2391b0458648...|
|    0|velqwert| 0.01| 0.01| 0.01|020533cb000ab3de5...|1-3e653d871d6c029...|
|    0|velqwert|-0.01|-0.01|-0.01|020533cb000ab3de5...|1-df37baeaa4c3926...|

Now it’s time to create DataFrames containing for each accelerometer sensor axis and one for each class. This means you’ll get 6 DataFrames. Please implement this using the relational API of DataFrames or SparkSQL.


In [23]:
# spark.sql("SELECT class,X from df where class = 1").show()
x0 = spark.sql("SELECT class,X from df where class = 0")
y0 = spark.sql("SELECT class,Y from df where class = 0")
z0 = spark.sql("SELECT class,Z from df where class = 0")
x1 = spark.sql("SELECT class,X from df where class = 1")
y1 = spark.sql("SELECT class,Y from df where class = 1")
z1 = spark.sql("SELECT class,Z from df where class = 1")

Since we’ve created this cool DFT function before, we can just call it for each of the 6 DataFrames now. And since the result of this function call is a DataFrame again we can use the pyspark best practice in simply calling methods on it sequentially. So what we are doing is the following:

- Calling DFT for each class and accelerometer sensor axis.
- Joining them together on the ID column. 
- Re-adding a column containing the class index.
- Stacking both Dataframes for each classes together



In [24]:
from pyspark.sql.functions import lit

df_class_0 = dft_systemml(x0,'x') \
    .join(dft_systemml(y0,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z0,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(0))
    
df_class_1 = dft_systemml(x1,'x') \
    .join(dft_systemml(y1,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z1,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(1))

df_dft = df_class_0.union(df_class_1)

df_dft.show()

SystemML Statistics:
Total execution time:		1.873 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		1.062 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.976 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		0.980 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		1.044 sec.
Number of executed Spark inst:	0.


SystemML Statistics:
Total execution time:		1.018 sec.
Number of executed Spark inst:	0.


+----------+---+--------------------+---+--------------------+---+--------------------+-----+
|        id| xa|                  xb| ya|                  yb| za|                  zb|class|
+----------+---+--------------------+---+--------------------+---+--------------------+-----+
|        26|0.0|  3.4659961767723746|0.0|  3.4659961767723746|0.0|  3.4659961767723746|    0|
|        29|0.0|  -6.067001873042779|0.0|  -6.067001873042779|0.0|  -6.0670018

Please create a VectorAssembler which consumes the newly created DFT columns and produces a column “features”


In [26]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=["xa","xb","ya","yb","za","zb"],
                                  outputCol="features")

Please insatiate a classifier from the SparkML package and assign it to the classifier variable. Make sure to set the “class” column as target.


In [30]:
from pyspark.ml.classification import GBTClassifier
classifier = GBTClassifier(labelCol="class", featuresCol="features",maxIter=10)

Let’s train and evaluate…


In [31]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, classifier])

In [32]:
model = pipeline.fit(df_dft)

In [33]:
prediction = model.transform(df_dft)

In [34]:
prediction.show()

+----------+---+--------------------+---+--------------------+---+--------------------+-----+--------------------+----------+
|        id| xa|                  xb| ya|                  yb| za|                  zb|class|            features|prediction|
+----------+---+--------------------+---+--------------------+---+--------------------+-----+--------------------+----------+
|        26|0.0|  3.4659961767723746|0.0|  3.4659961767723746|0.0|  3.4659961767723746|    0|[0.0,3.4659961767...|       0.0|
|        29|0.0|  -6.067001873042779|0.0|  -6.067001873042779|0.0|  -6.067001873042779|    0|[0.0,-6.067001873...|       0.0|
|       474|0.0|  -4.773707646336872|0.0|  -4.773707646336872|0.0|  -4.773707646336872|    0|[0.0,-4.773707646...|       0.0|
|       964|0.0|  -1.668821675169658|0.0|  -1.668821675169658|0.0|  -1.668821675169658|    0|[0.0,-1.668821675...|       0.0|
|8589934658|0.0| 0.12590268819396355|0.0| 0.12590268819396355|0.0| 0.12590268819396355|    0|[0.0,0.1259026881...|    

In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("class")
    
binEval.evaluate(prediction) 

1.0

Prediction value is 1.0 which is better than the last two assignment predictions, feature engineering improves the model performance.


# Assignment submission  
If the value is greater than 0.8, please submit your solution to the grader by executing the following cells, please don’t forget to obtain an assignment submission token (secret) from the Courera’s graders web page and paste it to the “secret” variable below, including your email address you’ve used for Coursera.

In [36]:
!rm -Rf a2_m4.json

In [37]:
prediction = prediction.repartition(1)
prediction.write.json('a2_m4.json')

In [38]:
!rm -f rklib.py
!wget https://raw.githubusercontent.com/romeokienzler/developerWorks/master/coursera/ai/rklib.py

--2018-10-18 16:58:32--  https://raw.githubusercontent.com/romeokienzler/developerWorks/master/coursera/ai/rklib.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2029 (2.0K) [text/plain]
Saving to: ‘rklib.py’


2018-10-18 16:58:32 (16.0 MB/s) - ‘rklib.py’ saved [2029/2029]



In [39]:
!zip -r a2_m4.json.zip a2_m4.json

  adding: a2_m4.json/ (stored 0%)
  adding: a2_m4.json/_SUCCESS (stored 0%)
  adding: a2_m4.json/._SUCCESS.crc (stored 0%)
  adding: a2_m4.json/part-00000-edc82fd7-d3af-40af-9901-0b86cc89303c.json (deflated 87%)
  adding: a2_m4.json/.part-00000-edc82fd7-d3af-40af-9901-0b86cc89303c.json.crc (stored 0%)


In [40]:
!base64 a2_m4.json.zip > a2_m4.json.zip.base64

In [41]:
from rklib import submit
key = "-fBiYHYDEeiR4QqiFhAvkA"
part = "IjtJk"
email = ""
secret = "jFvyHgc6EQLgn8wC"

with open('a2_m4.json.zip.base64', 'r') as myfile:
    data=myfile.read()
submit(email, secret, key, part, [part], data)

Submission successful, please check on the coursera grader page for the status
