In [1]:
#Load the CSV file into a RDD
irisData = sc.textFile("data/iris.csv")
irisData.persist()

data/iris.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [2]:
#Remove the first line (contains headers)
dataLines = irisData.filter(lambda x: "Sepal" not in x)
dataLines.count()

150

In [3]:
import math
from pyspark.mllib.linalg import Vectors

In [4]:
def transformToNumeric( inputStr) :
    attList=inputStr.split(",")
    
    #Set default to setosa
    irisValue=1.0
    if attList[4] == "versicolor":
        irisValue=2.0
    if attList[4] == "virginica":
        irisValue=3.0
       
    #Filter out columns not wanted at this stage
    values= Vectors.dense([ irisValue, \
                     float(attList[0]),  \
                     float(attList[1]),  \
                     float(attList[2]),  \
                     float(attList[3])  \
                     ])
    return values

In [5]:
#Change to a Vector
irisVectors = dataLines.map(transformToNumeric)
irisVectors.collect()

[DenseVector([1.0, 5.1, 3.5, 1.4, 0.2]),
 DenseVector([1.0, 4.9, 3.0, 1.4, 0.2]),
 DenseVector([1.0, 4.7, 3.2, 1.3, 0.2]),
 DenseVector([1.0, 4.6, 3.1, 1.5, 0.2]),
 DenseVector([1.0, 5.0, 3.6, 1.4, 0.2]),
 DenseVector([1.0, 5.4, 3.9, 1.7, 0.4]),
 DenseVector([1.0, 4.6, 3.4, 1.4, 0.3]),
 DenseVector([1.0, 5.0, 3.4, 1.5, 0.2]),
 DenseVector([1.0, 4.4, 2.9, 1.4, 0.2]),
 DenseVector([1.0, 4.9, 3.1, 1.5, 0.1]),
 DenseVector([1.0, 5.4, 3.7, 1.5, 0.2]),
 DenseVector([1.0, 4.8, 3.4, 1.6, 0.2]),
 DenseVector([1.0, 4.8, 3.0, 1.4, 0.1]),
 DenseVector([1.0, 4.3, 3.0, 1.1, 0.1]),
 DenseVector([1.0, 5.8, 4.0, 1.2, 0.2]),
 DenseVector([1.0, 5.7, 4.4, 1.5, 0.4]),
 DenseVector([1.0, 5.4, 3.9, 1.3, 0.4]),
 DenseVector([1.0, 5.1, 3.5, 1.4, 0.3]),
 DenseVector([1.0, 5.7, 3.8, 1.7, 0.3]),
 DenseVector([1.0, 5.1, 3.8, 1.5, 0.3]),
 DenseVector([1.0, 5.4, 3.4, 1.7, 0.2]),
 DenseVector([1.0, 5.1, 3.7, 1.5, 0.4]),
 DenseVector([1.0, 4.6, 3.6, 1.0, 0.2]),
 DenseVector([1.0, 5.1, 3.3, 1.7, 0.5]),
 DenseVector([1.

In [6]:
#Perform statistical Analysis
from pyspark.mllib.stat import Statistics
irisStats=Statistics.colStats(irisVectors)
irisStats.mean()

array([ 2.        ,  5.84333333,  3.05733333,  3.758     ,  1.19933333])

In [7]:
irisStats.variance()

array([ 0.67114094,  0.68569351,  0.18997942,  3.11627785,  0.58100626])

In [8]:
irisStats.min()

array([ 1. ,  4.3,  2. ,  1. ,  0.1])

In [9]:
irisStats.max()

array([ 3. ,  7.9,  4.4,  6.9,  2.5])

In [10]:
Statistics.corr(irisVectors)

array([[ 1.        ,  0.78256123, -0.42665756,  0.9490347 ,  0.95654733],
       [ 0.78256123,  1.        , -0.11756978,  0.87175378,  0.81794113],
       [-0.42665756, -0.11756978,  1.        , -0.4284401 , -0.36612593],
       [ 0.9490347 ,  0.87175378, -0.4284401 ,  1.        ,  0.96286543],
       [ 0.95654733,  0.81794113, -0.36612593,  0.96286543,  1.        ]])

In [11]:
#Transform to a Data Frame for input to Machine Learing
#Drop columns that are not required (low correlation)

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [12]:
def transformToLabeledPoint(inStr) :
    attList=inStr.split(",")
    lp = ( attList[4], Vectors.dense([attList[0],attList[2],attList[3]]))
    return lp

In [13]:
irisLp = dataLines.map(transformToLabeledPoint)
irisDF = sqlContext.createDataFrame(irisLp,["label", "features"])
irisDF.select("label","features").show(10)

+------+-------------+
| label|     features|
+------+-------------+
|setosa|[5.1,1.4,0.2]|
|setosa|[4.9,1.4,0.2]|
|setosa|[4.7,1.3,0.2]|
|setosa|[4.6,1.5,0.2]|
|setosa|[5.0,1.4,0.2]|
|setosa|[5.4,1.7,0.4]|
|setosa|[4.6,1.4,0.3]|
|setosa|[5.0,1.5,0.2]|
|setosa|[4.4,1.4,0.2]|
|setosa|[4.9,1.5,0.1]|
+------+-------------+
only showing top 10 rows



In [14]:
#Find correlations
numFeatures = irisDF.take(1)[0].features.size
labelRDD = irisDF.map(lambda lp: lp.label)
for i in range(numFeatures):
    featureRDD = irisDF.map(lambda lp: lp.features[i])
    corr = Statistics.corr(labelRDD, featureRDD, 'pearson')
    print('%d\t%g' % (i, corr))

Py4JJavaError: An error occurred while calling o90.corr.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 14, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/isaac/spark/spark-1.6.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/isaac/spark/spark-1.6.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/isaac/spark/spark-1.6.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
ValueError: could not convert string to float: setosa

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
	at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1341)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.numCols(RowMatrix.scala:61)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:328)
	at org.apache.spark.mllib.stat.correlation.PearsonCorrelation$.computeCorrelationMatrix(PearsonCorrelation.scala:49)
	at org.apache.spark.mllib.stat.correlation.Correlation$class.computeCorrelationWithMatrixImpl(Correlation.scala:46)
	at org.apache.spark.mllib.stat.correlation.PearsonCorrelation$.computeCorrelationWithMatrixImpl(PearsonCorrelation.scala:34)
	at org.apache.spark.mllib.stat.correlation.PearsonCorrelation$.computeCorrelation(PearsonCorrelation.scala:40)
	at org.apache.spark.mllib.stat.correlation.Correlations$.corr(Correlation.scala:60)
	at org.apache.spark.mllib.stat.Statistics$.corr(Statistics.scala:112)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.corr(PythonMLLibAPI.scala:820)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/isaac/spark/spark-1.6.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/isaac/spark/spark-1.6.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/isaac/spark/spark-1.6.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
ValueError: could not convert string to float: setosa

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
#Indexing needed as pre-req for Decision Trees
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(irisDF)
td = si_model.transform(irisDF)
td.collect()

In [None]:
#Split into training and testing data
(trainingData, testData) = td.randomSplit([0.9, 0.1])
trainingData.count()
testData.count()
testData.collect()

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
#Create the model
dtClassifer = DecisionTreeClassifier(maxDepth=2, labelCol="indexed")
dtModel = dtClassifer.fit(trainingData)

dtModel.numNodes

In [None]:
dtModel.depth

In [None]:
#Predict on the test data
predictions = dtModel.transform(trainingData)
predictions.select("prediction","indexed","label","features").collect()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
                    labelCol="indexed",metricName="precision")
evaluator.evaluate(predictions)  

In [None]:
#Draw a confusion matrix
labelList=predictions.select("indexed","label").distinct().toPandas()
predictions.groupBy("indexed","prediction").count().show()