In [1]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType,StringType
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
from pyspark.ml.linalg import VectorUDT,Vectors
from pyspark.ml.classification import DecisionTreeClassifier,LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder,TrainValidationSplit
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import PCA as PCAml

def toFloat(x):
    if x == '?':
        return 5.0
    else:
        return float(x)

def doLine(l):
    item=l.split(",")
    label = 1
    if item[10]=='2':
        label=0
    return (Vectors.dense([toFloat(e) for e in item[1:10]]),label)

path = '/FileStore/tables/breast_cancer_wisconsin-2f6e5.data'  
raw_data = sc.textFile(path)
schema = StructType([StructField("features", VectorUDT(), True),
                     StructField("label",IntegerType(),True)])
data = SQLContext(sc).createDataFrame(raw_data.map(doLine),schema)
#data.show()
raw_data.take(10)
#1)the provided code read the data from the file "breast-cancer-wisconsin.data" and then transform it in a dataFrame with a specific with 2 fields
#2)data has 2 field "features" and "label" and label is 0 which means begnin(2) 
#and 1 means malignat (4)
#3) the schema of the data is features label
#4)tumors benin(0) and malign (1)

#generally, data for the ML algorithm should looks like [label,arrays of features] for each item in the data set.

In [2]:
#1.b)what does data looks like ?
#data.show()

In [3]:
#1.c) what does schema looks like
#data.printSchema()

In [4]:
#1.d) what is the number of tumors of type begnin and malign
#data.groupBy("label").count().show()

In [5]:
#2) SPLIT DATA INTO TRAINING SET AND TEST SET
trainingData, testData = data.randomSplit([0.9, 0.1], seed = 2018)

In [6]:
#3) BUILD DECISION TREEE MODEL

    # THis will define the structure of the decision tree
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

    # Train model.  This will build the actual decision Tree
bc_model = dt.fit(trainingData)

In [7]:
#4) TESTING THE MODEL
predictions = bc_model.transform(testData)
predictions.show()
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",
labelCol="label",
metricName='areaUnderROC')
result = evaluator.evaluate(predictions)
print("the area under the ROC is : ",result)

predictions = bc_model.transform(testData)
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print ("the accuracy for the normql decison Tree model is = %g" % (accuracy))


In [8]:
#5 improving the model with cross validation and train vector split
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 6]) \
    .build()

#the cross validator is just like a step between the architecture definition and the training(which allows to choose better the parmeters)
#in this case, the total number of possible combonations tested by the cross validator will be: 3(number of possiblities for depth)*5
#the cross validator will generate k=5 (training,test) pairs and train over them using all the possible combinasions of the parameters. Then it compute the desired parameters as an average over all the model and then will return the best parameters.NB: This step is applied only in the training data
crossval = CrossValidator(estimator=dt,
                           estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)

#Cross validation step is like adding a new feature to our DT
bc_model = crossval.fit(trainingData)

predictions = bc_model.transform(testData)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",
labelCol="label",
metricName='areaUnderROC')
result = evaluator.evaluate(predictions)
print("the area under the ROC with the cross validation model is : ",result)



tvs = TrainValidationSplit(estimator=dt,
                           estimatorParamMaps=paramGrid,
                           evaluator=BinaryClassificationEvaluator(),
                           trainRatio=0.9)
bc_model = tvs.fit(trainingData)
predictions = bc_model.transform(testData)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",
labelCol="label",
metricName='areaUnderROC')
result = evaluator.evaluate(predictions)
print("the area under the ROC with the Train validation split model is : ",result)

In [9]:
#6) LOGISTIC REGRESSION

# Build the model
def parsePoint(line):
  item=line.split(",")
  label = 1
  if item[10]=='2':
    label=0
  return LabeledPoint(label, [toFloat(e) for e in item[1:10]])



parsedData = sc.textFile(path).map(parsePoint)
model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("The accuracy using the Logistic regression algorithm  = " + str(1-trainErr))

In [10]:
#6.2) DOWNLOAD THE IRIS DATA SET AND BUILD A CLASSIFIER
path = '/FileStore/tables/iris.data'  
raw_data = sc.textFile(path)
#raw_data.map(lambda e : e).count()

def doLineIris(l):
    item=l.split(",")
    print("item ",item)
    label = 0
    if item[4]=='Iris-Setosa':
        label=1
    elif item[4]=='Iris-Versicolor' : 
       label = 2
    return (Vectors.dense([toFloat(e) for e in item[0:4]]),label)

schema = StructType([StructField("features", VectorUDT(), True),
                     StructField("label",IntegerType(),True)])
#data = SQLContext(sc).createDataFrame(raw_data.map(doLineIris),schema)
#data.show(50)

trainingData, testData = data.randomSplit([0.9, 0.1], seed = 2018)

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 6]) \
    .build()
crossval = CrossValidator(estimator=dt,
                           estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=10)

#Cross validation step is like adding a new feature to our DT
bc_model = crossval.fit(trainingData)

predictions = bc_model.transform(testData)
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
#print(raw_data.take(150))
print ("the accuracy of the decison Tree model on the IRIS data set is = %g" % (accuracy))


In [11]:
#6.3) VISUALIZATION TASK(IRIS DATA SET)

pca = PCAml(k=2, inputCol="features", outputCol="pca")
model = pca.fit(data)
transformed = model.transform(data)
