In [1]:
# Section must be included at the beginning of each new notebook. Remember to change the app name. 
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'


import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
#spark = SparkSession.builder.appName('operations').getOrCreate()
from pyspark import SparkContext,SparkConf
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql import SQLContext

def parsePoint(line):
    values = [float(x) for x in line.split(',')]
    return LabeledPoint(values[14], values[1:14])


conf = SparkConf().setAppName('Linear Support Vector Machines').setMaster('local[2]')
sc = SparkContext(conf=conf)
spark=SparkSession(sc)
# load and parse the data


df = spark.read.csv('heart.csv',inferSchema=True,header=True)

df = df.filter("thalach >=80")
df = df.filter("trestbps <=180")
df = df.filter("oldpeak<=5")
df = df.filter("chol <=400")
df = df.drop_duplicates()

df.toPandas().to_csv("heart-cleaned.csv",header=True)

data=sc.textFile('heart-cleaned.csv')

header = data.first()

data = data.filter(lambda line:line != header)

parseData = data.map(parsePoint)

(trainData,testData) = parseData.randomSplit([7,3])
trainData.persist()
testData.persist()

print (testData.collect())

# build the model



modelDF=SVMWithSGD.train(trainData)
# evaluating the model on training data
labelsAndPreds = trainData.map(lambda p : (p.label, modelDF.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda seq: seq[0] != seq[1]).count()/float(trainData.count())

print('SVM DF training error :' + str(trainErr))
print('SVM DF training acc :' + str(1-trainErr))




model = SVMWithSGD.train(trainData, iterations=10000, step=6, regParam=0.02, miniBatchFraction=0.1, regType='l1' )
# evaluating the model on training data
labelsAndPreds = trainData.map(lambda p : (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda seq: seq[0] != seq[1]).count()/float(trainData.count())

print('SVM training error :' + str(trainErr))
print('SVM training acc :' + str(1-trainErr))

model.save(sc, "SVMmodel")
samemodel = SVMModel.load(sc, "SVMmodel")

labelsAndPreds = testData.map(lambda p : (p.label, samemodel.predict(p.features)))
testErr = labelsAndPreds.filter(lambda seq: seq[0] != seq[1]).count()/float(testData.count())
print('SVM test error :' + str(testErr))
print('SVM test acc :' + str(1-testErr))


[LabeledPoint(0.0, [56.0,1.0,0.0,130.0,283.0,1.0,0.0,103.0,1.0,1.6,0.0,0.0,3.0]), LabeledPoint(1.0, [52.0,1.0,0.0,108.0,233.0,1.0,1.0,147.0,0.0,0.1,2.0,3.0,3.0]), LabeledPoint(0.0, [43.0,1.0,0.0,120.0,177.0,0.0,0.0,120.0,1.0,2.5,1.0,0.0,3.0]), LabeledPoint(0.0, [51.0,0.0,0.0,130.0,305.0,0.0,1.0,142.0,1.0,1.2,1.0,0.0,3.0]), LabeledPoint(1.0, [41.0,0.0,1.0,105.0,198.0,0.0,1.0,168.0,0.0,0.0,2.0,1.0,2.0]), LabeledPoint(0.0, [64.0,1.0,0.0,120.0,246.0,0.0,0.0,96.0,1.0,2.2,0.0,1.0,2.0]), LabeledPoint(0.0, [64.0,1.0,0.0,145.0,212.0,0.0,0.0,132.0,0.0,2.0,1.0,2.0,1.0]), LabeledPoint(1.0, [58.0,0.0,0.0,100.0,248.0,0.0,0.0,122.0,0.0,1.0,1.0,0.0,2.0]), LabeledPoint(0.0, [53.0,1.0,0.0,140.0,203.0,1.0,0.0,155.0,1.0,3.1,0.0,0.0,3.0]), LabeledPoint(0.0, [46.0,1.0,0.0,120.0,249.0,0.0,0.0,144.0,0.0,0.8,2.0,0.0,3.0]), LabeledPoint(0.0, [65.0,1.0,0.0,110.0,248.0,0.0,0.0,158.0,0.0,0.6,2.0,2.0,1.0]), LabeledPoint(1.0, [44.0,1.0,1.0,130.0,219.0,0.0,0.0,188.0,0.0,0.0,2.0,0.0,2.0]), LabeledPoint(0.0, [61.0,1.0,

In [2]:
def ModelAccuracy(model):
    ## Model acc
  
    predict = model.predict(parseData.map(lambda p:p.features))
    predict = predict.map(lambda p: float(p))
    ## Actual values
    predict_real = predict.zip(parseData.map(lambda p: p.label))
    matched = predict_real.filter(lambda p:p[0]==p[1])
    accuracy =  float(matched.count()) / float(predict_real.count())
    return accuracy

    acc = ModelAccuracy(model)
    print("accuracy="+str(acc))

from time import time

def trainEvaluateModel(parseData, iterations, step, miniBatchFraction, regParam, regType):
        startTime = time()
        model = SVMWithSGD.train(parseData, iterations=iterations, step=step, 
                                            miniBatchFraction=miniBatchFraction,regParam=regParam, regType=regType)
                ## accuracy
        accuracy = ModelAccuracy(model)
        duration = time() - startTime   # Time
        print("Parameters: "+"iterations="+str(iterations) + 
         ",  step="+str(step)+",  miniBatchFraction="+str(miniBatchFraction)+
          ", regParam"+str(regParam)+", regType=" + str(regType) +"\n"+
         "===>Time="+str(duration)+'%'+",  accuracy="+str(accuracy))
        return accuracy,duration,iterations,step,miniBatchFraction,regParam,regType,model

def gridSearch(parseData, iterationsList,stepList, miniBatchFractionList, regParamList, regTypeList):
    metrics = [trainEvaluateModel(parseData,iterations, step, miniBatchFraction, regParam, regType)
          for iterations in iterationsList
          for step in stepList
          for miniBatchFraction in miniBatchFractionList
          for regParam in regParamList
          for regType in regTypeList]
    # Return the best parameters
    sorted_metics = sorted(metrics, key=lambda k:k[0], reverse=True)
    best_parameters = sorted_metics[0]
    print("Best parameters："+"iterations="+str(best_parameters[2]) + 
         ",  step="+str( best_parameters[3])+",  miniBatchFraction="+str( best_parameters[4])+
          ", regParam"+str( best_parameters[5])+", regType=" + str( best_parameters[6]) +"\n"+ "accuracy="+str( best_parameters[0]))
    return  best_parameters

## Parameters list
iterationsList = [100,1000,10000] 
stepList = [1, 5, 10]
miniBatchFractionList = [0.1, 1]
regParamList = [0.01, 0.1]
regTypeList = ["l2","l1"]

## Return the best
best_parameters = gridSearch(parseData, iterationsList, stepList, miniBatchFractionList, regParamList, regTypeList)



sc.stop()

Parameters: iterations=100,  step=1,  miniBatchFraction=0.1, regParam0.01, regType=l2
===>Time=3.1372101306915283%,  accuracy=0.6313993174061433
Parameters: iterations=100,  step=1,  miniBatchFraction=0.1, regParam0.01, regType=l1
===>Time=2.5070972442626953%,  accuracy=0.6382252559726962
Parameters: iterations=100,  step=1,  miniBatchFraction=0.1, regParam0.1, regType=l2
===>Time=2.391275644302368%,  accuracy=0.552901023890785
Parameters: iterations=100,  step=1,  miniBatchFraction=0.1, regParam0.1, regType=l1
===>Time=2.547290802001953%,  accuracy=0.6313993174061433
Parameters: iterations=100,  step=1,  miniBatchFraction=1, regParam0.01, regType=l2
===>Time=2.620105743408203%,  accuracy=0.6143344709897611
Parameters: iterations=100,  step=1,  miniBatchFraction=1, regParam0.01, regType=l1
===>Time=2.3294103145599365%,  accuracy=0.5972696245733788
Parameters: iterations=100,  step=1,  miniBatchFraction=1, regParam0.1, regType=l2
===>Time=2.3632376194000244%,  accuracy=0.552901023890785

Parameters: iterations=10000,  step=5,  miniBatchFraction=0.1, regParam0.01, regType=l1
===>Time=65.98759269714355%,  accuracy=0.7440273037542662
Parameters: iterations=10000,  step=5,  miniBatchFraction=0.1, regParam0.1, regType=l2
===>Time=100.25843262672424%,  accuracy=0.552901023890785
Parameters: iterations=10000,  step=5,  miniBatchFraction=0.1, regParam0.1, regType=l1
===>Time=4.084408283233643%,  accuracy=0.7064846416382252
Parameters: iterations=10000,  step=5,  miniBatchFraction=1, regParam0.01, regType=l2
===>Time=100.64946842193604%,  accuracy=0.590443686006826
Parameters: iterations=10000,  step=5,  miniBatchFraction=1, regParam0.01, regType=l1
===>Time=100.3714108467102%,  accuracy=0.6279863481228669
Parameters: iterations=10000,  step=5,  miniBatchFraction=1, regParam0.1, regType=l2
===>Time=101.2881293296814%,  accuracy=0.5597269624573379
Parameters: iterations=10000,  step=5,  miniBatchFraction=1, regParam0.1, regType=l1
===>Time=100.70924377441406%,  accuracy=0.600682