In [1]:
# The code was removed by Watson Studio for sharing.

In [2]:
spark = SparkSession\
    .builder\
    .appName("Cloudant Spark SQL Example in Python using temp tables")\
    .config("cloudant.host",credentials_1['custom_url'].split('@')[1])\
    .config("cloudant.username", credentials_1['username'])\
    .config("cloudant.password",credentials_1['password'])\
    .getOrCreate()

In [3]:
df=spark.read.load('shake_classification', "org.apache.bahir.cloudant")
df.createOrReplaceTempView("df")

In [4]:
!pip install systemml

Collecting systemml
[?25l  Downloading https://files.pythonhosted.org/packages/b1/94/62104cb8c526b462cd501c7319926fb81ac9a5668574a0b3407658a506ab/systemml-1.2.0.tar.gz (9.7MB)
[K    100% |████████████████████████████████| 9.7MB 4.7MB/s eta 0:00:01   8% |██▊                             | 819kB 38.8MB/s eta 0:00:01
Building wheels for collected packages: systemml
  Running setup.py bdist_wheel for systemml ... [?25ldone
[?25h  Stored in directory: /gpfs/fs01/user/s54f-be0868e612925a-a5a98664e301/.cache/pip/wheels/cf/07/79/b3ed6f12afe06b2ab55d60dcfd62e66240f5d8c6088a518177
Successfully built systemml
[31mnotebook 5.0.0 requires nbconvert, which is not installed.[0m
[31mipywidgets 6.0.0 requires widgetsnbextension~=2.0.0, which is not installed.[0m
[31mtensorflow 1.3.0 requires tensorflow-tensorboard<0.2.0,>=0.1.0, which is not installed.[0m
Installing collected packages: systemml
Successfully installed systemml-1.2.0


In [5]:
from systemml import MLContext, dml
ml = MLContext(spark)

In [6]:
dml_script = '''
PI = 3.141592654
N = nrow(signal)

n = seq(0, N-1, 1)
k = seq(0, N-1, 1)

M = (n %*% t(k))*(2*PI/N)

Xa = cos(M) %*% signal
Xb = sin(M) %*% signal

DFT = cbind(Xa, Xb)
'''

In [7]:
from pyspark.sql.functions import monotonically_increasing_id

def dft_systemml(signal,name):
    prog = dml(dml_script).input('signal', signal).output('DFT')
    
    return (

    #execute the script inside the SystemML engine running on top of Apache Spark
    ml.execute(prog) 
     
         #read result from SystemML execution back as SystemML Matrix
        .get('DFT') 
     
         #convert SystemML Matrix to ApacheSpark DataFrame 
        .toDF() 
     
         #rename default column names
        .selectExpr('C1 as %sa' % (name), 'C2 as %sb' % (name)) 
     
         #add unique ID per row for later joining
        .withColumn("id", monotonically_increasing_id())
    )

In [8]:

x0 = spark.sql("SELECT X from df where class = 0")
y0 = spark.sql("SELECT Y from df where class = 0")
z0 = spark.sql("SELECT Z from df where class = 0")
x1 = spark.sql("SELECT X from df where class = 1")
y1 = spark.sql("SELECT Y from df where class = 1")
z1 = spark.sql("SELECT Z from df where class = 1")

In [9]:
from pyspark.sql.functions import lit

df_class_0 = dft_systemml(x0,'x') \
    .join(dft_systemml(y0,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z0,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(0))
    
df_class_1 = dft_systemml(x1,'x') \
    .join(dft_systemml(y1,'y'), on=['id'], how='inner') \
    .join(dft_systemml(z1,'z'), on=['id'], how='inner') \
    .withColumn('class', lit(1))

df_dft = df_class_0.union(df_class_1)

df_dft.show()

[Stage 3:=====>                                                    (1 + 7) / 10]
SystemML Statistics:
Total execution time:		3.931 sec.
Number of executed Spark inst:	0.

                                                                                

[Stage 7:>                                                         (0 + 7) / 10]
[Stage 7:=====>                                                    (1 + 8) / 10]
SystemML Statistics:
Total execution time:		5.816 sec.
Number of executed Spark inst:	0.

                                                                                

[Stage 11:>                                                        (0 + 7) / 10]
[Stage 11:=====>                                                   (1 + 9) / 10]
SystemML Statistics:
Total execution time:		5.758 sec.
Number of executed Spark inst:	0.

                                                                                

[Stage 15:>                                                        (0 + 9) / 10

In [10]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(
    inputCols=["xa", "xb", "ya", "yb", "za", "zb"],
    outputCol="features")

In [11]:
from pyspark.ml.classification import RandomForestClassifier
classifier = RandomForestClassifier(labelCol="class", featuresCol="features", numTrees=10)

In [12]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, classifier])

In [13]:
model = pipeline.fit(df_dft)

In [14]:

prediction = model.transform(df_dft)

In [15]:
prediction.show()

+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+-------------+-----------+----------+
|         id|                  xa|                  xb|                  ya|                  yb|                  za|                  zb|class|            features|rawPrediction|probability|prediction|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+-------------+-----------+----------+
|         26| 0.22952572619747336|-0.16313414201392878| 0.22952572619747336|-0.16313414201392878| 0.22952572619747336|-0.16313414201392878|    0|[0.22952572619747...|   [10.0,0.0]|  [1.0,0.0]|       0.0|
|         29| 0.08358446358658746|-0.01409982255320...| 0.08358446358658746|-0.01409982255320...| 0.08358446358658746|-0.01409982255320...|    0|[0.08358446358658...|   [10.0,0.0]|  [1

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("class")
    
binEval.evaluate(prediction)

1.0

In [17]:
!rm -Rf a2_m4.json

In [18]:
prediction = prediction.repartition(1)
prediction.write.json('a2_m4.json')

In [19]:
!rm -f rklib.py
!wget https://raw.githubusercontent.com/romeokienzler/developerWorks/master/coursera/ai/rklib.py

--2019-01-12 04:12:43--  https://raw.githubusercontent.com/romeokienzler/developerWorks/master/coursera/ai/rklib.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2289 (2.2K) [text/plain]
Saving to: ‘rklib.py’


2019-01-12 04:12:43 (16.1 MB/s) - ‘rklib.py’ saved [2289/2289]



In [20]:
!zip -r a2_m4.json.zip a2_m4.json

  adding: a2_m4.json/ (stored 0%)
  adding: a2_m4.json/_SUCCESS (stored 0%)
  adding: a2_m4.json/.part-00000-6643914c-6224-4f36-af78-4a4fa3f43895.json.crc (stored 0%)
  adding: a2_m4.json/._SUCCESS.crc (stored 0%)
  adding: a2_m4.json/part-00000-6643914c-6224-4f36-af78-4a4fa3f43895.json (deflated 89%)


In [21]:
!base64 a2_m4.json.zip > a2_m4.json.zip.base64

In [22]:
from rklib import submit
key = "-fBiYHYDEeiR4QqiFhAvkA"
part = "IjtJk"
email = "ia11287n@pace.edu"
secret = "VW0hWteCen6wJmu1"

with open('a2_m4.json.zip.base64', 'r') as myfile:
    data=myfile.read()
submit(email, secret, key, part, [part], data)

Submission successful, please check on the coursera grader page for the status
-------------------------
{"elements":[{"itemId":"B8wXV","id":"f_F-qCtuEei_fRLwaVDk3g~B8wXV~ybp0qxZSEemB4Q7X5cdx1A","courseId":"f_F-qCtuEei_fRLwaVDk3g"}],"paging":{},"linked":{}}
-------------------------
