In [1]:
# Import necessary libraries
from pyspark.sql import SQLContext, Row
from pyspark import SparkContext
sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())



from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

from pyspark.mllib.classification import LogisticRegressionWithLBFGS,LogisticRegressionModel
from pyspark import SparkConf
from numpy import array

from pyspark.sql.types import *

In [2]:
# get data from this link:http://archive.ics.uci.edu/ml/datasets/Pima+Indians+Diabetes
# import file from cluster
data = sc.textFile("/FileStore/tables/l3mne6tv1474396224239/pima.txt")

In [3]:
# parse the data file and convert it to floating
data=data.map(lambda line:[float(x) for x in line.split(',')])

In [4]:
# define feature columns, vectors
features = data.map(lambda row: row[0:8])
print features.take(2)

In [5]:
#Normalize the features and transform them
from pyspark.mllib.feature import StandardScaler
standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)

In [6]:
# Define the target and features and zip them
lab = data.map(lambda row: row[-1])
transformedData = lab.zip(features)

In [7]:
# Create labeled point object for combined features/target
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))


In [8]:
# Randomly split the data
(trainingData, testData) = transformedData.randomSplit([0.7, 0.3])

In [9]:
# Start with Decision Trees Model

modelTree = DecisionTree.trainClassifier(
    trainingData,
    numClasses=2,
    categoricalFeaturesInfo={}, # all features are continuous
    impurity='gini',
    maxDepth=5,
    maxBins=32)

In [10]:
# Run a prediction of the features for comparison
predictions = modelTree.predict(trainingData.map(lambda x: x.features))

In [11]:
#Check training data target values against predictions
labelsAndPredictions = trainingData.map(lambda lp: lp.label).zip(predictions)

In [12]:
# Define training error with incorrect predictions from training values
trainErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(trainingData.count())

In [13]:
print('Train Error = ' + str(trainErr))

In [14]:
#Compare against Ensemble model
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils

In [15]:
# Go with Random Forest
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},numTrees=3, featureSubsetStrategy="auto",impurity='gini', maxDepth=5, maxBins=32)

In [16]:
predictions = model.predict(trainingData.map(lambda x: x.features))

In [17]:
labelsAndPredictions = trainingData.map(lambda lp: lp.label).zip(predictions)

In [18]:
trainErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(trainingData.count())

In [19]:
print('Train Error = ' + str(trainErr))

In [20]:
print('Optimal classification tree model:')
print(modelTree)