<a href="https://colab.research.google.com/github/lovepreetmultani/PySpark/blob/main/Pyspark_wine_quality.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [104]:
#!pip install pyspark

In [105]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt
from pyspark.ml.regression import LinearRegression

In [106]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [107]:
df_pyspark=spark.read.option('header','true').csv('drive/My Drive/Datasets/winequality-red.csv',inferSchema=True)

In [108]:
df_pyspark.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [109]:
df_pyspark.dtypes

[('fixed acidity', 'double'),
 ('volatile acidity', 'double'),
 ('citric acid', 'double'),
 ('residual sugar', 'double'),
 ('chlorides', 'double'),
 ('free sulfur dioxide', 'double'),
 ('total sulfur dioxide', 'double'),
 ('density', 'double'),
 ('pH', 'double'),
 ('sulphates', 'double'),
 ('alcohol', 'double'),
 ('quality', 'int')]

In [110]:
#VectorAssembler - it combines multiple columns of a DataFrame into a single vector column
assembler = VectorAssembler(inputCols=['fixed acidity','volatile acidity','citric acid','residual sugar','chlorides','free sulfur dioxide','total sulfur dioxide',
                                       'density','pH','alcohol','sulphates'], outputCol="Independent_features")
output=assembler.transform(df_pyspark)

In [111]:
output.show()

+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|          chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|Independent_features|
+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|          7.4|             0.7|        0.0|           1.9|              0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[7.4,0.7,0.0,1.9,...|
|          7.8|            0.88|        0.0|           2.6|              0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|[7.8,0.88,0.0,2.6...|
|          7.8|            0.76|       0.04|           2.3|              0.092|               1

In [112]:
finalized_data=output.select("Independent_features","quality")

In [113]:
finalized_data.show()

+--------------------+-------+
|Independent_features|quality|
+--------------------+-------+
|[7.4,0.7,0.0,1.9,...|      5|
|[7.8,0.88,0.0,2.6...|      5|
|[7.8,0.76,0.04,2....|      5|
|[11.2,0.28,0.56,1...|      6|
|[7.4,0.7,0.0,1.9,...|      5|
|[7.4,0.66,0.0,1.8...|      5|
|[7.9,0.6,0.06,1.6...|      5|
|[7.3,0.65,0.0,1.2...|      7|
|[7.8,0.58,0.02,2....|      7|
|[7.5,0.5,0.36,6.1...|      5|
|[6.7,0.58,0.08,1....|      5|
|[7.5,0.5,0.36,6.1...|      5|
|[5.6,0.615,0.0,1....|      5|
|[7.8,0.61,0.29,1....|      5|
|[8.9,0.62,0.18,3....|      5|
|[8.9,0.62,0.19,3....|      5|
|[8.5,0.28,0.56,1....|      7|
|[8.1,0.56,0.28,1....|      5|
|[7.4,0.59,0.08,4....|      4|
|[7.9,0.32,0.51,1....|      6|
+--------------------+-------+
only showing top 20 rows



In [114]:
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent_features', labelCol='quality')
regressor=regressor.fit(train_data)

In [115]:
lrModel_predictions=regressor.transform(test_data)

In [116]:
### Coefficients
regressor.coefficients

DenseVector([0.0321, -1.0163, -0.2137, 0.0054, -2.579, 0.0048, -0.0031, -36.2406, -0.4373, 0.2442, 1.0525])

In [117]:
### Intercepts
regressor.intercept

40.56166416478835

In [118]:
### Prediction
pred_results=regressor.evaluate(test_data)

In [119]:
## Final comparison
pred_results.predictions.show()

+--------------------+-------+------------------+
|Independent_features|quality|        prediction|
+--------------------+-------+------------------+
|[5.0,0.42,0.24,2....|      8|  6.66524135056752|
|[5.1,0.51,0.18,2....|      7| 6.418917609919177|
|[5.1,0.585,0.0,1....|      7| 6.432560825556919|
|[5.2,0.34,0.0,1.8...|      6| 6.900324472372162|
|[5.2,0.34,0.0,1.8...|      6| 6.900324472372162|
|[5.3,0.47,0.11,2....|      7|6.6630059520648715|
|[5.3,0.57,0.01,1....|      7| 6.333320604611991|
|[5.3,0.715,0.19,1...|      5| 5.120107999506878|
|[5.4,0.42,0.27,2....|      7| 5.936927740201327|
|[5.4,0.74,0.09,1....|      6| 5.531652434184537|
|[5.6,0.31,0.37,1....|      5| 5.251550596501865|
|[5.6,0.54,0.04,1....|      5| 5.786976839685693|
|[5.6,0.54,0.04,1....|      5| 5.786976839685693|
|[5.6,0.605,0.05,2...|      5| 6.153588500081931|
|[5.8,0.61,0.11,1....|      6| 5.692926486730528|
|[5.9,0.46,0.0,1.9...|      5| 5.821480433069674|
|[6.0,0.5,0.0,1.4,...|      5| 5.380456150553449|


In [120]:
pred_results.r2,pred_results.meanAbsoluteError,pred_results.meanSquaredError

(0.3720327074043577, 0.5099746556934734, 0.43026583103947746)