### This is our Project on Wine Quality Prediction using classification & regression techniques in ML.

# White-wine data reading and cleaning

In [1]:
#Import the white-wine file which is in CSV format 
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()
dataFrame=spark.read.option("delimiter", ";").option("inferSchema", "true").option("header","true").csv(r"C:\Users\varsh\OneDrive\Desktop\MSDA\SEM2\MACHINE_LEARNING\Project\datasets\winequality-white.csv")
dataFrame

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int]

In [2]:
#checking and renaming headers into single value:
dataFrame = dataFrame.withColumnRenamed('fixed acidity', 'fixed_acidity')
dataFrame = dataFrame.withColumnRenamed('volatile acidity', 'volatile_acidity')
dataFrame = dataFrame.withColumnRenamed('citric acid', 'citric_acid')
dataFrame = dataFrame.withColumnRenamed('free sulfur dioxide', 'free_sulfur_dioxide')
dataFrame = dataFrame.withColumnRenamed('residual sugar', 'residual_sugar')
dataFrame = dataFrame.withColumnRenamed('total sulfur dioxide', 'total_sulfur_dioxide')
dataFrame.show(3)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|      6|
|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|      6|
|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|      6|
+-------------+----------------+-----------+--------------+---------+-------------------+-----------

In [3]:
#checking for categorical values
dataFrame.printSchema()

root
 |-- fixed_acidity: double (nullable = true)
 |-- volatile_acidity: double (nullable = true)
 |-- citric_acid: double (nullable = true)
 |-- residual_sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free_sulfur_dioxide: double (nullable = true)
 |-- total_sulfur_dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [4]:
#Checking and removing of duplicates:
print("The number of non-distinct data is :",dataFrame.count())
print("The number of distinct data is :",dataFrame.distinct().count()) 
print("Therefore, the number of duplicates are:",dataFrame.count()-dataFrame.distinct().count()) 
dataFrame=dataFrame.dropDuplicates()
print("After dropping the duplicates, the count of the dictinct data is:",dataFrame.count()) 

The number of non-distinct data is : 4898
The number of distinct data is : 3961
Therefore, the number of duplicates are: 937
After dropping the duplicates, the count of the dictinct data is: 3961


In [5]:
#checking for nulls or missing values:
nulls = {col:dataFrame.filter(dataFrame[col].isNull()).count() for col in dataFrame.columns}
nulls

{'fixed_acidity': 0,
 'volatile_acidity': 0,
 'citric_acid': 0,
 'residual_sugar': 0,
 'chlorides': 0,
 'free_sulfur_dioxide': 0,
 'total_sulfur_dioxide': 0,
 'density': 0,
 'pH': 0,
 'sulphates': 0,
 'alcohol': 0,
 'quality': 0}

In [6]:
#Create a user defined function to convert the interger values of 'quality' column into binary values such as 0 and 1.
import pyspark.sql.functions as func
def pred_col(feat1):
    return func.when((func.col(feat1)>6),1).otherwise(0)

quality_transformed=dataFrame.withColumn('label',pred_col('quality'))

df=quality_transformed[['fixed_acidity',
'volatile_acidity',
'citric_acid',
'residual_sugar',
'chlorides',
'free_sulfur_dioxide',
'total_sulfur_dioxide',
'density',
'pH',
'sulphates',
'alcohol','label']]
df.columns

['fixed_acidity',
 'volatile_acidity',
 'citric_acid',
 'residual_sugar',
 'chlorides',
 'free_sulfur_dioxide',
 'total_sulfur_dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'label']

In [7]:
#Feature engineering: convert the features into feature vectors.
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
vectorAssembler=VectorAssembler(inputCols=['fixed_acidity',
 'volatile_acidity',
 'citric_acid',
 'residual_sugar',
 'chlorides',
 'free_sulfur_dioxide',
 'total_sulfur_dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol'],outputCol='feature_vector')

df = vectorAssembler.transform(df)
df = df.select(['feature_vector', 'label'])

df.show()

+--------------------+-----+
|      feature_vector|label|
+--------------------+-----+
|[6.7,0.23,0.26,1....|    0|
|[7.4,0.2,0.36,1.2...|    0|
|[6.7,0.17,0.5,2.1...|    0|
|[6.4,0.24,0.32,14...|    0|
|[7.2,0.55,0.09,1....|    0|
|[6.6,0.24,0.29,2....|    0|
|[6.5,0.26,0.28,12...|    0|
|[7.4,0.24,0.31,8....|    0|
|[7.2,0.37,0.15,2....|    1|
|[7.5,0.23,0.32,9....|    0|
|[9.0,0.31,0.48,6....|    0|
|[6.2,0.27,0.49,1....|    0|
|[7.6,0.26,0.58,7....|    0|
|[10.0,0.23,0.27,1...|    0|
|[6.8,0.31,0.09,1....|    0|
|[7.7,0.24,0.31,1....|    0|
|[6.3,0.18,0.22,1....|    0|
|[7.6,0.1,0.33,1.0...|    0|
|[6.6,0.45,0.43,7....|    0|
|[7.1,0.27,0.28,1....|    0|
+--------------------+-----+
only showing top 20 rows



In [8]:
#Standardize the feature-vector values.
from pyspark.ml.feature import StandardScaler  
Scalerizer=StandardScaler().setInputCol("feature_vector").setOutputCol("features")
df=Scalerizer.fit(df).transform(df)
df=df.select(['features','label'])

In [9]:
#split the data into training and test data.
train,test=df.randomSplit([0.7,0.3]) # spitting the dataframe data into training data(70%) and test data(30%)
train.count(),test.count()

(2736, 1225)

In [10]:
pwd


'C:\\Users\\varsh\\OneDrive\\Desktop\\MSDA\\SEM2\\MACHINE_LEARNING\\Project\\github\\Wine_Quality_Prediction'