In [24]:
import pyspark

In [25]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.rdd import reduce

In [26]:
spark = SparkSession.builder.appName('WineQualityPredictionApp').getOrCreate()

In [27]:
train_dataset= spark.read.format("com.databricks.spark.csv").csv(
    'TrainingDataset.csv', header=True, sep=";")
train_dataset.printSchema()

root
 |-- """""fixed acidity"""": string (nullable = true)
 |-- """"volatile acidity"""": string (nullable = true)
 |-- """"citric acid"""": string (nullable = true)
 |-- """"residual sugar"""": string (nullable = true)
 |-- """"chlorides"""": string (nullable = true)
 |-- """"free sulfur dioxide"""": string (nullable = true)
 |-- """"total sulfur dioxide"""": string (nullable = true)
 |-- """"density"""": string (nullable = true)
 |-- """"pH"""": string (nullable = true)
 |-- """"sulphates"""": string (nullable = true)
 |-- """"alcohol"""": string (nullable = true)
 |-- """"quality""""": string (nullable = true)



In [28]:
train_dataset.show()

+----------------------+------------------------+-------------------+----------------------+-----------------+---------------------------+----------------------------+---------------+----------+-----------------+---------------+----------------+
|"""""fixed acidity""""|""""volatile acidity""""|""""citric acid""""|""""residual sugar""""|""""chlorides""""|""""free sulfur dioxide""""|""""total sulfur dioxide""""|""""density""""|""""pH""""|""""sulphates""""|""""alcohol""""|""""quality"""""|
+----------------------+------------------------+-------------------+----------------------+-----------------+---------------------------+----------------------------+---------------+----------+-----------------+---------------+----------------+
|                   8.9|                    0.22|               0.48|                   1.8|            0.077|                         29|                          60|         0.9968|      3.39|             0.53|            9.4|               6|
|               

In [29]:
validation_dataset= spark.read.format("com.databricks.spark.csv").csv(
    'ValidationDataset.csv', header=True, sep=";")
validation_dataset.printSchema()

root
 |-- """fixed acidity"""": string (nullable = true)
 |-- """"volatile acidity"""": string (nullable = true)
 |-- """"citric acid"""": string (nullable = true)
 |-- """"residual sugar"""": string (nullable = true)
 |-- """"chlorides"""": string (nullable = true)
 |-- """"free sulfur dioxide"""": string (nullable = true)
 |-- """"total sulfur dioxide"""": string (nullable = true)
 |-- """"density"""": string (nullable = true)
 |-- """"pH"""": string (nullable = true)
 |-- """"sulphates"""": string (nullable = true)
 |-- """"alcohol"""": string (nullable = true)
 |-- """"quality""""": string (nullable = true)



In [30]:
#To clean out CSV headers if quotes are present
old_train_dataset_column_name = train_dataset.schema.names
old_validation_dataset_column_name = validation_dataset.schema.names
clean_train_dataset_column_name = []
clean_validation_dataset_column_name = []

for name in old_train_dataset_column_name:
    clean_train_dataset_column_name.append(name.replace('"',''))
clean_train_dataset_column_name

for name in old_validation_dataset_column_name:
    clean_validation_dataset_column_name.append(name.replace('"',''))

clean_validation_dataset_column_name

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [31]:
train_dataset = reduce(lambda train_dataset, idx: train_dataset.withColumnRenamed(old_train_dataset_column_name[idx], clean_train_dataset_column_name[idx]), range(len(clean_train_dataset_column_name)), train_dataset)
validation_dataset = reduce(lambda validation_dataset, idx: validation_dataset.withColumnRenamed(old_validation_dataset_column_name[idx], clean_validation_dataset_column_name[idx]), range(len(clean_validation_dataset_column_name)), validation_dataset)

In [32]:
train_dataset=train_dataset.distinct()
validation_dataset=validation_dataset.distinct()

In [33]:
total_columns = train_dataset.columns
tot_columns=validation_dataset.columns

In [34]:
from pyspark.sql.functions import col
def preprocess(dataset):
    return dataset.select(*(col(c).cast("double").alias(c) for c in dataset.columns))
train_dataset = preprocess(train_dataset)
validation_dataset = preprocess(validation_dataset)

In [35]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
stages = []
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

In [36]:
for column_name in total_columns[:-1]:
    stages = []
    vectorAssembler = VectorAssembler(inputCols=[column_name],outputCol=column_name+'_vect')
    stages.append(vectorAssembler)
    stages.append(MinMaxScaler(inputCol=column_name+'_vect', outputCol=column_name+'_scaled'))
    pipeline = Pipeline(stages=stages)
    train_dataset = pipeline.fit(train_dataset).transform(train_dataset).withColumn(
        column_name+"_scaled", unlist(column_name+"_scaled")).drop(column_name+"_vect").drop(column_name)

In [37]:
train_dataset.show(5)

[Stage 268:>                                                        (0 + 1) / 1]

+-------+--------------------+-----------------------+------------------+---------------------+----------------+--------------------------+---------------------------+--------------+---------+----------------+--------------+
|quality|fixed acidity_scaled|volatile acidity_scaled|citric acid_scaled|residual sugar_scaled|chlorides_scaled|free sulfur dioxide_scaled|total sulfur dioxide_scaled|density_scaled|pH_scaled|sulphates_scaled|alcohol_scaled|
+-------+--------------------+-----------------------+------------------+---------------------+----------------+--------------------------+---------------------------+--------------+---------+----------------+--------------+
|    5.0|               0.409|                  0.226|              0.35|                0.103|            0.08|                      0.31|                      0.254|         0.634|    0.504|           0.174|         0.182|
|    5.0|               0.364|                  0.414|              0.24|                0.393|     

                                                                                

In [38]:
for column_name in total_columns[:-1]:
    stages = []
    vectorAssembler = VectorAssembler(inputCols=[column_name],outputCol=column_name+'_vect')
    stages.append(vectorAssembler)
    stages.append(MinMaxScaler(inputCol=column_name+'_vect', outputCol=column_name+'_scaled'))
    pipeline = Pipeline(stages=stages)
    validation_dataset = pipeline.fit(validation_dataset).transform(validation_dataset).withColumn(
        column_name+"_scaled", unlist(column_name+"_scaled")).drop(column_name+"_vect").drop(column_name)

In [39]:
validation_dataset.show(5)

[Stage 337:>                                                        (0 + 1) / 1]

+-------+--------------------+-----------------------+------------------+---------------------+----------------+--------------------------+---------------------------+--------------+---------+----------------+--------------+
|quality|fixed acidity_scaled|volatile acidity_scaled|citric acid_scaled|residual sugar_scaled|chlorides_scaled|free sulfur dioxide_scaled|total sulfur dioxide_scaled|density_scaled|pH_scaled|sulphates_scaled|alcohol_scaled|
+-------+--------------------+-----------------------+------------------+---------------------+----------------+--------------------------+---------------------------+--------------+---------+----------------+--------------+
|    5.0|               0.422|                  0.072|             0.547|                0.056|           0.109|                     0.098|                      0.043|         0.521|    0.424|           0.179|          0.26|
|    6.0|               0.284|                  0.301|             0.347|                0.049|     

                                                                                

In [40]:
vectorAssembler = VectorAssembler(
    inputCols=[column_name+"_scaled" for column_name in total_columns[:-1]],
    outputCol='features')

In [41]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

In [42]:
labelIndexer = StringIndexer(inputCol=total_columns[-1], outputCol="indexedLabel").fit(train_dataset)


rf = RandomForestClassifier(labelCol='indexedLabel', featuresCol="features", numTrees=200)

labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)


pipeline = Pipeline(stages=[labelIndexer, vectorAssembler, rf, labelConverter])

In [43]:
model = pipeline.fit(train_dataset)

23/12/06 19:44:13 WARN DAGScheduler: Broadcasting large task binary with size 1408.1 KiB


In [44]:
model.write().overwrite().save("models/rf")

In [45]:
predictions = model.transform(validation_dataset)

# Select example rows to display.
predictions.select("predictedLabel", total_columns[-1], "features").show(5)

+--------------+-------+--------------------+
|predictedLabel|quality|            features|
+--------------+-------+--------------------+
|           6.0|    5.0|[0.422,0.072,0.54...|
|           5.0|    6.0|[0.284,0.301,0.34...|
|           6.0|    5.0|[0.991,0.524,0.65...|
|           5.0|    5.0|[0.229,0.337,0.32...|
|           5.0|    5.0|[0.661,0.205,0.65...|
+--------------+-------+--------------------+
only showing top 5 rows



23/12/06 19:44:14 WARN DAGScheduler: Broadcasting large task binary with size 1680.6 KiB


In [46]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print("Accuracy is: ", accuracy)
print("F1 Score: ", f1_score)

23/12/06 19:44:14 WARN DAGScheduler: Broadcasting large task binary with size 1703.8 KiB


Accuracy is:  0.5661764705882353
F1 Score:  0.52860319906795


23/12/06 19:44:14 WARN DAGScheduler: Broadcasting large task binary with size 1703.8 KiB
