In [1]:
import shutil
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
data = spark.read.format("csv").option("header", "true").option("delimiter",";").load("winequality-white.csv")

In [3]:
data.count()

4898

In [4]:
for i in data.columns:
    data=data.withColumn(i,data[i].cast(DoubleType()))
data.printSchema()
data.columns

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: double (nullable = true)



['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [5]:
for i in data.columns:
    print(data.agg({i: "max"}).collect()[0])

Row(max(fixed acidity)=14.2)
Row(max(volatile acidity)=1.1)
Row(max(citric acid)=1.66)
Row(max(residual sugar)=65.8)
Row(max(chlorides)=0.346)
Row(max(free sulfur dioxide)=289.0)
Row(max(total sulfur dioxide)=440.0)
Row(max(density)=1.03898)
Row(max(pH)=3.82)
Row(max(sulphates)=1.08)
Row(max(alcohol)=14.2)
Row(max(quality)=9.0)


In [6]:
vectorAssembler = VectorAssembler(inputCols = ['fixed acidity','volatile acidity','citric acid','residual sugar','chlorides','free sulfur dioxide','total sulfur dioxide','density','pH','sulphates','alcohol'], outputCol = 'features')
vdata = vectorAssembler.transform(data)
vdata = vdata.select(['features', 'quality'])
vdata.show(3)

+--------------------+-------+
|            features|quality|
+--------------------+-------+
|[7.0,0.27,0.36,20...|    6.0|
|[6.3,0.3,0.34,1.6...|    6.0|
|[8.1,0.28,0.4,6.9...|    6.0|
+--------------------+-------+
only showing top 3 rows



In [7]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
vdata = scaler.fit(vdata).transform(vdata)
vdata.show()

+--------------------+-------+--------------------+
|            features|quality|     scaled_features|
+--------------------+-------+--------------------+
|[7.0,0.27,0.36,20...|    6.0|[8.29513396799212...|
|[6.3,0.3,0.34,1.6...|    6.0|[7.46562057119291...|
|[8.1,0.28,0.4,6.9...|    6.0|[9.59865502010517...|
|[7.2,0.23,0.32,8....|    6.0|[8.53213779564904...|
|[7.2,0.23,0.32,8....|    6.0|[8.53213779564904...|
|[8.1,0.28,0.4,6.9...|    6.0|[9.59865502010517...|
|[6.2,0.32,0.16,7....|    6.0|[7.34711865736445...|
|[7.0,0.27,0.36,20...|    6.0|[8.29513396799212...|
|[6.3,0.3,0.34,1.6...|    6.0|[7.46562057119291...|
|[8.1,0.22,0.43,1....|    6.0|[9.59865502010517...|
|[8.1,0.27,0.41,1....|    5.0|[9.59865502010517...|
|[8.6,0.23,0.4,4.2...|    5.0|[10.1911645892474...|
|[7.9,0.18,0.37,1....|    5.0|[9.36165119244825...|
|[6.6,0.16,0.4,1.5...|    7.0|[7.82112631267828...|
|[8.3,0.42,0.62,19...|    5.0|[9.83565884776209...|
|[6.6,0.17,0.38,1....|    7.0|[7.82112631267828...|
|[6.3,0.48,0

In [8]:
splits = vdata.randomSplit([0.80, 0.20])

In [9]:
splits[0].count()

3912

In [10]:
splits[1].count()

986

In [11]:
training = splits[0]

In [12]:
testing = splits[1]

In [13]:
#LinearRegression

In [14]:
lr = LinearRegression(featuresCol = 'scaled_features', labelCol='quality', maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [15]:
lrModel = lr.fit(training)

In [16]:
trainingSummary = lrModel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.840458
r2: 0.102342


In [17]:
lrPredictions = lrModel.transform(testing)
lrPredictions.select("prediction","quality","features").show(5)

+------------------+-------+--------------------+
|        prediction|quality|            features|
+------------------+-------+--------------------+
| 6.112844959122063|    8.0|[3.9,0.225,0.4,4....|
| 6.029190948969397|    7.0|[4.2,0.17,0.36,1....|
| 5.610920898206071|    3.0|[4.2,0.215,0.23,5...|
| 5.610920898206071|    5.0|[4.5,0.19,0.21,0....|
|5.9559936900858155|    6.0|[4.7,0.145,0.29,1...|
+------------------+-------+--------------------+
only showing top 5 rows



In [18]:
test_result = lrModel.evaluate(testing)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.82596


In [19]:
try:
    shutil.rmtree("lrModel")
except FileNotFoundError as e:
    pass
lrModel.save("lrModel")

In [20]:
loadedModel = lrModel.load("lrModel");

In [21]:
test_result = loadedModel.evaluate(testing)
test_result.rootMeanSquaredError

0.8259595588313764

In [22]:
lrPredictions = loadedModel.transform(testing)
lrPredictions.select("prediction","quality").show(5)

+------------------+-------+
|        prediction|quality|
+------------------+-------+
| 6.112844959122063|    8.0|
| 6.029190948969397|    7.0|
| 5.610920898206071|    3.0|
| 5.610920898206071|    5.0|
|5.9559936900858155|    6.0|
+------------------+-------+
only showing top 5 rows



In [23]:
try:
    shutil.rmtree("lrPredictions")
except FileNotFoundError as e:
    pass
lrPredictions.select("prediction","quality").write.csv("lrPredictions")

In [24]:
#RandomForestRegressor:

In [25]:
rf = RandomForestRegressor(labelCol='quality', featuresCol='scaled_features', numTrees=2, maxDepth=1, seed=42)
rfModel = rf.fit(training)

In [26]:
rfPredictions = rfModel.transform(testing)
rfPredictions.select("prediction","quality").show(5)

+-----------------+-------+
|       prediction|quality|
+-----------------+-------+
|5.961103834527265|    8.0|
|5.961103834527265|    7.0|
|5.961103834527265|    3.0|
|5.961103834527265|    5.0|
|5.961103834527265|    6.0|
+-----------------+-------+
only showing top 5 rows



In [27]:
valuesAndPreds = rfPredictions.select("prediction","quality").rdd
metrics = RegressionMetrics(valuesAndPreds)
metrics.meanSquaredError



0.7489070257902107

In [28]:
try:
    shutil.rmtree("rfModel")
except FileNotFoundError as e:
    pass
rfModel.save("rfModel")

In [29]:
loadedModel = rfModel.load("rfModel");

In [30]:
rfPredictions = loadedModel.transform(testing)
rfPredictions.select("prediction","quality").show(5)

+-----------------+-------+
|       prediction|quality|
+-----------------+-------+
|5.961103834527265|    8.0|
|5.961103834527265|    7.0|
|5.961103834527265|    3.0|
|5.961103834527265|    5.0|
|5.961103834527265|    6.0|
+-----------------+-------+
only showing top 5 rows



In [31]:
valuesAndPreds = rfPredictions.select("prediction","quality").rdd
metrics = RegressionMetrics(valuesAndPreds)
metrics.meanSquaredError

0.7489070257902107

In [32]:
try:
    shutil.rmtree("rfPredictions")
except FileNotFoundError as e:
    pass
rfPredictions.select("prediction","quality").write.csv("rfPredictions")

In [33]:
#RandomForestClassifier:

In [34]:
vdata.show()

+--------------------+-------+--------------------+
|            features|quality|     scaled_features|
+--------------------+-------+--------------------+
|[7.0,0.27,0.36,20...|    6.0|[8.29513396799212...|
|[6.3,0.3,0.34,1.6...|    6.0|[7.46562057119291...|
|[8.1,0.28,0.4,6.9...|    6.0|[9.59865502010517...|
|[7.2,0.23,0.32,8....|    6.0|[8.53213779564904...|
|[7.2,0.23,0.32,8....|    6.0|[8.53213779564904...|
|[8.1,0.28,0.4,6.9...|    6.0|[9.59865502010517...|
|[6.2,0.32,0.16,7....|    6.0|[7.34711865736445...|
|[7.0,0.27,0.36,20...|    6.0|[8.29513396799212...|
|[6.3,0.3,0.34,1.6...|    6.0|[7.46562057119291...|
|[8.1,0.22,0.43,1....|    6.0|[9.59865502010517...|
|[8.1,0.27,0.41,1....|    5.0|[9.59865502010517...|
|[8.6,0.23,0.4,4.2...|    5.0|[10.1911645892474...|
|[7.9,0.18,0.37,1....|    5.0|[9.36165119244825...|
|[6.6,0.16,0.4,1.5...|    7.0|[7.82112631267828...|
|[8.3,0.42,0.62,19...|    5.0|[9.83565884776209...|
|[6.6,0.17,0.38,1....|    7.0|[7.82112631267828...|
|[6.3,0.48,0

In [35]:
rfc = RandomForestClassifier(labelCol='quality', featuresCol='features', numTrees=9, maxDepth=20, seed=55)
rfcModel = rfc.fit(training)

In [36]:
rfcPredictions = rfcModel.transform(testing)
rfcPredictions.select("prediction","quality").show(5)

+----------+-------+
|prediction|quality|
+----------+-------+
|       7.0|    8.0|
|       7.0|    7.0|
|       5.0|    3.0|
|       6.0|    5.0|
|       6.0|    6.0|
+----------+-------+
only showing top 5 rows



In [37]:
evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName='f1')
f1 = evaluator.evaluate(rfcPredictions)
f1

0.6529545234724055

In [38]:
try:
    shutil.rmtree("rfcModel")
except FileNotFoundError as e:
    pass
rfcModel.save("rfcModel")

In [39]:
loadedModel = rfcModel.load("rfcModel");

In [40]:
rfcPredictions = loadedModel.transform(testing)
rfcPredictions.select("prediction","quality").show(5)

+----------+-------+
|prediction|quality|
+----------+-------+
|       7.0|    8.0|
|       7.0|    7.0|
|       5.0|    3.0|
|       6.0|    5.0|
|       6.0|    6.0|
+----------+-------+
only showing top 5 rows



In [41]:
evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName='f1')
f1 = evaluator.evaluate(rfcPredictions)
f1

0.6529545234724055

In [42]:
evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName='weightedPrecision')
precision = evaluator.evaluate(rfcPredictions)
precision

0.6657443982019989

In [43]:
evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName='weightedRecall')
recall = evaluator.evaluate(rfcPredictions)
recall

0.6612576064908722

In [44]:
try:
    shutil.rmtree("rfcPredictions")
except FileNotFoundError as e:
    pass
rfPredictions.select("prediction","quality").write.csv("rfcPredictions")