# Parkinson's Disease Detector with Apache Cassandra and PySpark Machine Learning

### Jupyter notebook inspired by the template at https://github.com/datastaxdevs/workshop-machine-learning/blob/master/jupyter/Random%20Forest.ipynb

In [36]:
!pip3 install matplotlib --quiet
!pip3 install ipykernel --quiet

In [2]:
!pip install cassandra-driver --quiet
!pip install pyspark==3.4.1 --quiet

In [3]:
!python3 -m ipykernel install --user --name=vs-l-pd-detector

Installed kernelspec vs-l-pd-detector in /Users/mariannelynemanaog/Library/Jupyter/kernels/vs-l-pd-detector


In [4]:
!PYDEVD_DISABLE_FILE_VALIDATION=1

In [55]:
import os
import random
import re
import warnings

import matplotlib.pyplot as plt
import pandas as pd
import cassandra
import pyspark

from IPython.display import display, Markdown
from random import randint, randrange

from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
warnings.filterwarnings('ignore')
%matplotlib inline

#### Helper function to have nicer formatting of Spark DataFrames

## Creating Tables and Loading Tables

In [13]:
# Install the latest version of Cassandra (4.1.3) from https://www.apache.org/dyn/closer.lua/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz

In [None]:
# Install GPG to verify the hash of the downloaded tarball
!arch -arm64 brew install gnupg gnupg2

# Link GPG
!brew link gnupg

!gpg --print-md SHA256 apache-cassandra-4.1.3-bin.tar.gz

In [6]:
# Compare the signature with the SHA256 file from the Downloads site
!curl -L https://downloads.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz.sha256

da014999723f4e1e2c15775dac6aaa9ff69a48f6df6465740fcd52ca9d19ea88


In [None]:
# Unpack the tarball
!tar xzvf apache-cassandra-4.1.3-bin.tar.gz

In [12]:
os.chdir('apache-cassandra-4.1.3')

In [13]:
!pwd

/Users/mariannelynemanaog/PycharmProjects/vs-ml-pd-detector/notebooks/apache-cassandra-4.1.3


In [14]:
!bin/cassandra

In [53]:
# Verify cassandra installation by checking its version number
!cassandra -v

4.1.3


In [None]:
# Start the cassandra server on the terminal
# !cassandra -f

### Connect to Cassandra

In [17]:
# Get the IP address by running 'cqlsh' on the terminal
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

### Create Demo Keyspace 

In [18]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS accelerate 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x7fbf380bd250>

### Set keyspace 

In [19]:
session.set_keyspace('accelerate')

### Create table called `speech_data`. Our PRIMARY will be a unique key (subjectId) we generate for each row.  This will have two datasets "train" and "test"

In [20]:
query = "CREATE TABLE IF NOT EXISTS speech_data \
                                   (subject_id text, jitter_percent float, jitter_abs float, rap float, ppq float, \
                                   apq_3 float, apq_5 float, apq_11 float, status int, \
                                   PRIMARY KEY (subject_id))"
session.execute(query)

<cassandra.cluster.ResultSet at 0x7fbf4a9a9c10>

### Load the train and test datasets from csv files

#### Insert all speech data into the DSE table `speech_data`

In [21]:
fileName = '/Users/mariannelynemanaog/PycharmProjects/vs-ml-pd-detector/src/data/train_and_test_sets/train_data.csv'
input_file = open(fileName, 'r')
i = 1
for line_number, line in enumerate(input_file):
    if line_number == 0:
        continue  # Skip the first line, as it has the header with the column names
    subject_id = i
    row = line.replace('\n', "").split(',')
    
    query = "INSERT INTO speech_data (subject_id, jitter_percent, jitter_abs, rap, ppq, \
                               apq_3, apq_5, apq_11, status)"
    query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
    session.execute(query, (str(row[0]), float(row[1]), float(row[2]), float(row[3]), float(row[4]), float(row[5]), float(row[6]), float(row[7]), int(row[8])))
    i = i + 1

fileName = '/Users/mariannelynemanaog/PycharmProjects/vs-ml-pd-detector/src/data/train_and_test_sets/test_data.csv'
input_file = open(fileName, 'r')

for line_number, line in enumerate(input_file):
    if line_number == 0:
        continue  # Skip the first line, as it has the header with the column names
    subject_id = i
    row = line.replace('\n', "").split(',')
        
    query = "INSERT INTO speech_data (subject_id, jitter_percent, jitter_abs, rap, ppq, \
                               apq_3, apq_5, apq_11, status)"
    query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
    session.execute(query, (str(row[0]), float(row[1]), float(row[2]), float(row[3]), float(row[4]), float(row[5]), float(row[6]), float(row[7]), int(row[8])))
    i = i + 1
    

## Machine Learning with Apache Cassandra and Apache Spark

#### Create a spark session that is connected to the database. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [8]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()

In [9]:
spark

In [23]:
rows = session.execute('select * from speech_data;')
df = pd.DataFrame(list(rows))

In [24]:
df.head()

Unnamed: 0,subject_id,apq_11,apq_3,apq_5,jitter_abs,jitter_percent,ppq,rap,status
0,phon_R01_S10_3,0.01033,0.00777,0.00898,9e-06,0.0021,0.00137,0.00109,0
1,phon_R01_S32_2,0.00903,0.00476,0.00588,2e-05,0.0027,0.00135,0.00116,1
2,CONT-11,0.039913,0.030384,0.035978,4.3e-05,0.53133,0.00332,0.002693,0
3,0.000157842,0.819181,18.808001,19.973,0.779,0.583,13.002,1.75,1
4,9.8239e-05,0.887069,11.811,12.712,0.768,0.742,11.455,2.226,1


In [26]:
print("Table Speech Data Row Count: ")
print(len(df))

Table Speech Data Row Count: 
1713


In [33]:
#Create PySpark DataFrame from Pandas
sparkDF=spark.createDataFrame(df) 
sparkDF.printSchema()
sparkDF.show()

root
 |-- subject_id: string (nullable = true)
 |-- apq_11: double (nullable = true)
 |-- apq_3: double (nullable = true)
 |-- apq_5: double (nullable = true)
 |-- jitter_abs: double (nullable = true)
 |-- jitter_percent: double (nullable = true)
 |-- ppq: double (nullable = true)
 |-- rap: double (nullable = true)
 |-- status: long (nullable = true)

+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+
|    subject_id|              apq_11|               apq_3|               apq_5|          jitter_abs|      jitter_percent|                 ppq|                 rap|status|
+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+
|phon_R01_S10_3|0.010329999960958958|0.007770000025629997|0.008980000391602516|9.000000318337698E-6|0.002099999925121665|0.0013699999

#### Create Vector with all elements of the speech data 

In [35]:
assembler = VectorAssembler(
    inputCols=['jitter_percent', 'jitter_abs', 'rap', 'ppq', 'apq_3', 'apq_5', 'apq_11'],
    outputCol='features')

trainingData = assembler.transform(sparkDF)

labelIndexer = StringIndexer(inputCol="status", outputCol="label", handleInvalid='keep')
trainingData1 = labelIndexer.fit(trainingData).transform(trainingData)

showDF(trainingData1)
print(trainingData1.count())

[Row(subject_id='phon_R01_S10_3', apq_11=0.010329999960958958, apq_3=0.007770000025629997, apq_5=0.008980000391602516, jitter_abs=9.000000318337698e-06, jitter_percent=0.002099999925121665, ppq=0.0013699999544769526, rap=0.0010900000343099236, status=0, features=DenseVector([0.0021, 0.0, 0.0011, 0.0014, 0.0078, 0.009, 0.0103]), label=1.0),
 Row(subject_id='phon_R01_S32_2', apq_11=0.009030000306665897, apq_3=0.0047599999234080315, apq_5=0.005880000069737434, jitter_abs=1.9999999494757503e-05, jitter_percent=0.0027000000700354576, ppq=0.0013500000350177288, rap=0.0011599999852478504, status=1, features=DenseVector([0.0027, 0.0, 0.0012, 0.0014, 0.0048, 0.0059, 0.009]), label=0.0),
 Row(subject_id='CONT-11', apq_11=0.03991299867630005, apq_3=0.03038400039076805, apq_5=0.03597800061106682, jitter_abs=4.3263000407023355e-05, jitter_percent=0.5313299894332886, ppq=0.003319700015708804, rap=0.002693000016734004, status=0, features=DenseVector([0.5313, 0.0, 0.0027, 0.0033, 0.0304, 0.036, 0.0399

1713


In [44]:
# Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  5, truncate = True):
    if(truncate):
        pd.set_option('display.max_colwidth', 50)
    else:
        pd.set_option('display.max_colwidth', -1)
    pd.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pd.reset_option('display.max_rows')

### We will be training a model with Random Forest, and because of this we need to split up our dataset in to a training and test set. Will split 80/20. 

## TODO: To split data based on the train and test sets already determined.

In [45]:
# Split the data into train and test
splits = trainingData1.randomSplit([0.8, 0.2], 1234)
train = splits[0]
test = splits[1]

print ("Train Dataframe Row Count: ")
print (train.count())
print ("Test Datafram Row Count: ")
print (test.count())

Train Dataframe Row Count: 
1376
Test Datafram Row Count: 
337


In [38]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

model = rf.fit(train)

predictions = model.transform(test)
#predictions.show()
print (predictions.count())
showDF(predictions)

337


[Row(subject_id='0.000100405', apq_11=0.9120830297470093, apq_3=11.600000381469727, apq_5=10.295999526977539, jitter_abs=0.5130000114440918, jitter_percent=0.37700000405311584, ppq=9.812000274658203, rap=1.1319999694824219, status=0, features=DenseVector([0.377, 0.513, 1.132, 9.812, 11.6, 10.296, 0.9121]), label=1.0, rawPrediction=DenseVector([4.9465, 5.0535, 0.0]), probability=DenseVector([0.4946, 0.5054, 0.0]), prediction=1.0),
 Row(subject_id='0.000100506', apq_11=0.89239102602005, apq_3=4.0970001220703125, apq_5=4.559999942779541, jitter_abs=0.8949999809265137, jitter_percent=1.027999997138977, ppq=4.96999979019165, rap=3.0840001106262207, status=0, features=DenseVector([1.028, 0.895, 3.084, 4.97, 4.097, 4.56, 0.8924]), label=1.0, rawPrediction=DenseVector([5.0439, 4.9561, 0.0]), probability=DenseVector([0.5044, 0.4956, 0.0]), prediction=0.0),
 Row(subject_id='0.000103653', apq_11=0.7939450144767761, apq_3=2.811000108718872, apq_5=12.038000106811523, jitter_abs=0.2669999897480011, 

In [46]:
showDF(predictions.select("status", "label", "prediction", "probability"))

Unnamed: 0,status,label,prediction,probability
0,0,1.0,1.0,"[0.49464777884629807, 0.5053522211537019, 0.0]"
1,0,1.0,0.0,"[0.5043893602082486, 0.49561063979175135, 0.0]"
2,0,1.0,1.0,"[0.4195383364911304, 0.5804616635088695, 0.0]"
3,1,0.0,0.0,"[0.5004198185259635, 0.4995801814740365, 0.0]"
4,0,1.0,1.0,"[0.48103877113003507, 0.5189612288699649, 0.0]"


### We can now use the MulticlassClassificationEvaluator to evalute the accuracy of our predictions. 

In [64]:
# compute key evaluation metrics on the test set, i.e., accuracy, 
# weightedPrecision, weightedRecall, weightedTruePositiveRate, 
# weightedFalsePositiveRate, weightedFMeasure

precision_vals = 3

accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)
print("Test set accuracy = " + str(round(accuracy, precision_vals)))

weighted_precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="weightedPrecision")
weighted_precision = weighted_precision_evaluator.evaluate(predictions)
print("Test set weighted precision = " + str(round(weighted_precision, precision_vals)))

weighted_recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="weightedRecall")
weighted_recall = weighted_recall_evaluator.evaluate(predictions)
print("Test set weighted recall = " + str(round(weighted_recall, precision_vals)))

weighted_tpr_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="weightedTruePositiveRate")
weighted_tpr = weighted_tpr_evaluator.evaluate(predictions)
print("Test set weighted true positive rate = " + str(round(weighted_tpr, precision_vals)))

weighted_fpr_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="weightedFalsePositiveRate")
weighted_fpr = weighted_tpr_evaluator.evaluate(predictions)
print("Test set weighted false positive rate = " + str(round(weighted_fpr, precision_vals)))

weighted_f_measure_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="weightedFMeasure")
weighted_f_measure = weighted_f_measure_evaluator.evaluate(predictions)
print("Test set weighted F-measure = " + str(round(weighted_f_measure, precision_vals)))

Test set accuracy = 0.709
Test set weighted precision = 0.706
Test set weighted recall = 0.709
Test set weighted true positive rate = 0.709
Test set weighted false positive rate = 0.709
Test set weighted F-measure = 0.707


In [None]:
session.execute("""drop table speech_data""")