#MLLib for classification problem using PySpark

In [2]:
!pip install pyspark



In [3]:
#   We'll start by loading the required libraries for this tutorial.
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler 
import pandas as pd

In [4]:
# create a spark session 
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("ML using pyspark") \
    .getOrCreate()

In [5]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
 
sqlContext = SQLContext(spark)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('Seed_Data.csv')
df.take(5)



AnalysisException: Path does not exist: file:/c:/Users/sunil/DBS Repo/DBS/DBSEdu/Machine Learning and Pattern Recognisation/CA/Seed_Data.csv

In [None]:
df.describe()

df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
A,210,14.847523809523816,2.9096994306873647,10.59,21.18
P,210,14.559285714285718,1.3059587265640225,12.41,17.25
C,210,0.8709985714285714,0.023629416583846364,0.8081,0.9183
LK,210,5.628533333333335,0.44306347772645016,4.899,6.675
WK,210,3.258604761904762,0.37771444490658673,2.63,4.033
A_Coef,210,3.7002009523809516,1.503557130821779,0.7651,8.456
LKG,210,5.408071428571429,0.4914804991024053,4.519,6.55
target,210,1.0,0.818447591071135,0,2


In [None]:
#data prepration 

from pyspark.ml.feature import VectorAssembler
inputs=['A', 'P', 'C', 'LK', 'WK','A_Coef', 'LKG']
vectorAssembler = VectorAssembler(inputCols = inputs, outputCol = 'features')
v_df = vectorAssembler.transform(df)
v_df = v_df.select(['features', 'target'])
v_df.show(3)

+--------------------+------+
|            features|target|
+--------------------+------+
|[15.26,14.84,0.87...|     0|
|[14.88,14.57,0.88...|     0|
|[14.29,14.09,0.90...|     0|
+--------------------+------+
only showing top 3 rows



In [None]:
#split the dataset

splits = v_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
 

# apply logistic regression in pyspark

train the model using trainset

In [None]:
from pyspark.ml.classification import LogisticRegression

 
lr = LogisticRegression(featuresCol = 'features', labelCol = 'target', maxIter=10)
lrModel = lr.fit(train_df)

evaluate the model

In [None]:
 trainingSummary = lrModel.summary
accuracy = trainingSummary.accuracy
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print(accuracy,precision,recall)

0.9215686274509803 0.9215686274509804 0.9215686274509804


#Example 2: redo the task of classification using Decision Tree  classifier.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

 
dtc = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'target')
dtcModel = dtc.fit(train_df)

In [None]:
# Make predictions.
predictions = dtcModel.transform(test_df)
predictions
 

DataFrame[features: vector, target: int, rawPrediction: vector, probability: vector, prediction: double]

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

Accuracy = 0.929825 


#Example 3: redo the task of classification using Random forest classifier

#Example 4:
use winequality-red dataset, to predict quality using logistic regression and dt classifier in pyspark. compute the value of accuracy. 

In [None]:
 
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('winequality-red.csv')
df.take(5)

[Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5),
 Row(fixed acidity=7.8, volatile acidity=0.88, citric acid=0.0, residual sugar=2.6, chlorides=0.098, free sulfur dioxide=25.0, total sulfur dioxide=67.0, density=0.9968, pH=3.2, sulphates=0.68, alcohol=9.8, quality=5),
 Row(fixed acidity=7.8, volatile acidity=0.76, citric acid=0.04, residual sugar=2.3, chlorides=0.092, free sulfur dioxide=15.0, total sulfur dioxide=54.0, density=0.997, pH=3.26, sulphates=0.65, alcohol=9.8, quality=5),
 Row(fixed acidity=11.2, volatile acidity=0.28, citric acid=0.56, residual sugar=1.9, chlorides=0.075, free sulfur dioxide=17.0, total sulfur dioxide=60.0, density=0.998, pH=3.16, sulphates=0.58, alcohol=9.8, quality=6),
 Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=1

#apply assembler

In [None]:
from pyspark.ml.feature import VectorAssembler
inputs=['fixed acidity',	'volatile acidity',	'citric acid']
vectorAssembler = VectorAssembler(inputCols = inputs, outputCol = 'features')
v_df = vectorAssembler.transform(df)
v_df = v_df.select(['features', 'quality'])
v_df.show(3)

+---------------+-------+
|       features|quality|
+---------------+-------+
|  [7.4,0.7,0.0]|      5|
| [7.8,0.88,0.0]|      5|
|[7.8,0.76,0.04]|      5|
+---------------+-------+
only showing top 3 rows



In [None]:
#split the dataset

splits = v_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

#apply logistic reg

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
lr = LogisticRegression(featuresCol = 'features', labelCol = 'quality', maxIter=10)
lrModel = lr.fit(train_df)
# Make predictions.
pred_lr = lrModel.transform(test_df)

evaluator = MulticlassClassificationEvaluator(
    labelCol="quality", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred_lr)
print("Accuracy = %g " % (accuracy))
Accuracy = 0.492341 

Accuracy = 0.492341 


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
#fit dt
dtc = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'quality')
dtcModel = dtc.fit(train_df)

# Make predictions 
pred_dt = dtcModel.transform(test_df)

# evaluate dt 
evaluator = MulticlassClassificationEvaluator(
    labelCol="quality", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred_dt)
print("Accuracy = %g " % (accuracy))
Accuracy = 0.461707

Accuracy = 0.461707 


In [None]:
 trainingSummary = lrModel.summary
accuracy = trainingSummary.accuracy
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print(accuracy,precision,recall)

0.4682971014492754 0.4041527984554447 0.4682971014492754


#apply dt

In [None]:
#fit dt
dtc = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'quality')
dtcModel = dtc.fit(train_df)

# Make predictions.
predictions = dtcModel.transform(test_df)

 
evaluator = MulticlassClassificationEvaluator(
    labelCol="quality", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))




Accuracy = 0.492929 
