In [5]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark2 = SparkSession.builder.appName('ml').getOrCreate()
#Create a Spark Session
SpSession = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("ml") \
    .config("spark.executor.memory", "0.1g") \
    .config("spark.cores.max","2") \
    .config("spark.sql.warehouse.dir", "/home/sushant/Projects/Spark_Project/temp")\
    .getOrCreate()
    
#Get the Spark Context from Spark Session    
SpContext = SpSession.sparkContext

In [11]:
"""--------------------------------------------------------------------------
Load Data
-------------------------------------------------------------------------"""

#Load the CSV file into a RDD
irisData = SpContext.textFile("iris.csv")
irisData.cache()
irisData.count()

#Remove the first line (contains headers)
dataLines = irisData.filter(lambda x: "Sepal" not in x)
dataLines.count()

"""--------------------------------------------------------------------------
Cleanup Data
-------------------------------------------------------------------------"""

from pyspark.sql import Row
#Create a Data Frame from the data
parts = dataLines.map(lambda l: l.split(","))
irisMap = parts.map(lambda p: Row(SEPAL_LENGTH=float(p[0]),\
                                SEPAL_WIDTH=float(p[1]), \
                                PETAL_LENGTH=float(p[2]), \
                                PETAL_WIDTH=float(p[3]), \
                                SPECIES=p[4] ))
                                
# Infer the schema, and register the DataFrame as a table.
irisDf = SpSession.createDataFrame(irisMap)
irisDf.cache()
#Add a numeric indexer for the label/target column
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="SPECIES", outputCol="IND_SPECIES")
si_model = stringIndexer.fit(irisDf)
irisNormDf = si_model.transform(irisDf)

irisNormDf.select("SPECIES","IND_SPECIES").distinct().collect()
irisNormDf.cache()


DataFrame[PETAL_LENGTH: double, PETAL_WIDTH: double, SEPAL_LENGTH: double, SEPAL_WIDTH: double, SPECIES: string, IND_SPECIES: double]

In [13]:
"""--------------------------------------------------------------------------
Perform Data Analytics
-------------------------------------------------------------------------"""

#See standard parameters
irisNormDf.describe().show()

#Find correlation between predictors and target
for i in irisNormDf.columns:
    if not( isinstance(irisNormDf.select(i).take(1)[0][0], str)) :
        print( "Correlation to Species for ", i)#, \
                    #irisNormDf.stat.corr('IND_SPECIES',i))

"""--------------------------------------------------------------------------
Prepare data for ML
-------------------------------------------------------------------------"""

#Transform to a Data Frame for input to Machine Learing
#Drop columns that are not required (low correlation)

from pyspark.ml.linalg import Vectors
def transformToLabeledPoint(row) :
    lp = ( row["SPECIES"], row["IND_SPECIES"], \
                Vectors.dense([row["SEPAL_LENGTH"],\
                        row["SEPAL_WIDTH"], \
                        row["PETAL_LENGTH"], \
                        row["PETAL_WIDTH"]]))
    return lp
    
irisLp = irisNormDf.rdd.map(transformToLabeledPoint)
irisLpDf = SpSession.createDataFrame(irisLp,["species","label", "features"])
irisLpDf.select("species","label","features").show(10)
irisLpDf.cache()


+-------+------------------+------------------+------------------+-------------------+---------+------------------+
|summary|      PETAL_LENGTH|       PETAL_WIDTH|      SEPAL_LENGTH|        SEPAL_WIDTH|  SPECIES|       IND_SPECIES|
+-------+------------------+------------------+------------------+-------------------+---------+------------------+
|  count|               150|               150|               150|                150|      150|               150|
|   mean|3.7580000000000027| 1.199333333333334| 5.843333333333335|  3.057333333333334|     null|               1.0|
| stddev|1.7652982332594662|0.7622376689603467|0.8280661279778637|0.43586628493669793|     null|0.8192319205190403|
|    min|               1.0|               0.1|               4.3|                2.0|   setosa|               0.0|
|    max|               6.9|               2.5|               7.9|                4.4|virginica|               2.0|
+-------+------------------+------------------+------------------+------

DataFrame[species: string, label: double, features: vector]

In [14]:
"""--------------------------------------------------------------------------
Perform Machine Learning
-------------------------------------------------------------------------"""
#Split into training and testing data
(trainingData, testData) = irisLpDf.randomSplit([0.9, 0.1])
trainingData.count()
testData.count()
testData.collect()

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Create the model
dtClassifer = DecisionTreeClassifier(maxDepth=2, labelCol="label",\
                featuresCol="features")
dtModel = dtClassifer.fit(trainingData)

dtModel.numNodes
dtModel.depth

#Predict on the test data
predictions = dtModel.transform(testData)
predictions.select("prediction","species","label").collect()

#Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
                    labelCol="label",metricName="accuracy")
evaluator.evaluate(predictions)


#Draw a confusion matrix
predictions.groupBy("label","prediction").count().show()


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|    2|
|  2.0|       2.0|    6|
|  0.0|       0.0|    3|
+-----+----------+-----+

