In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls




0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:12 http://archive.ubunt

In [None]:
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)


In [None]:
cases = spark.read.load("/content/drive/MyDrive/Colab Notebooks/Datasets/winequalityN_Sorted.csv",format="csv", sep=",", inferSchema="true", header="true")

In [None]:
df = cases.dropna("any")

In [None]:
# Create a new boolean variable that indicates if the quality is >5 or not.
import pyspark.sql.functions as F
Df = df.withColumn('quality>5',F.when(F.col("quality")>5,True).otherwise(False))
Df.select("quality","quality>5").show()

+-------+---------+
|quality|quality>5|
+-------+---------+
|      6|     true|
|      6|     true|
|      6|     true|
|      6|     true|
|      6|     true|
|      6|     true|
|      6|     true|
|      6|     true|
|      6|     true|
|      6|     true|
|      5|    false|
|      5|    false|
|      5|    false|
|      7|     true|
|      5|    false|
|      7|     true|
|      6|     true|
|      6|     true|
|      5|    false|
|      8|     true|
+-------+---------+
only showing top 20 rows



In [None]:
df.groupby().count().show()

+-----+
|count|
+-----+
| 4870|
+-----+



In [None]:
#find the main statistical index of the most interesting variables.
df.describe("density","pH","sulphates","alcohol","quality").show()


+-------+--------------------+-------------------+-------------------+------------------+------------------+
|summary|             density|                 pH|          sulphates|           alcohol|           quality|
+-------+--------------------+-------------------+-------------------+------------------+------------------+
|  count|                4870|               4870|               4870|              4870|              4870|
|   mean|  0.9940257823408565| 3.1881540041067744|0.48970020533881065|10.516772073917489| 5.878028747433265|
| stddev|0.002993059100085...|0.15090206237498682| 0.1141960587930611|1.2312505075298654|0.8856990114907607|
|    min|             0.98711|               2.72|               0.22|               8.0|                 3|
|    max|             1.03898|               3.82|               1.08|              14.2|                 9|
+-------+--------------------+-------------------+-------------------+------------------+------------------+



In [None]:
# Which is the wine the lowest and grestest number of sulphates.
df.groupby().min('sulphates').show()
df.groupby().max('sulphates').show()

+--------------+
|min(sulphates)|
+--------------+
|          0.22|
+--------------+

+--------------+
|max(sulphates)|
+--------------+
|          1.08|
+--------------+



In [None]:
# Compute mean of free sulfur dioxide & total sulfue dioxide.
df.groupby().mean('free sulfur dioxide','total sulfur dioxide').show()

+------------------------+-------------------------+
|avg(free sulfur dioxide)|avg(total sulfur dioxide)|
+------------------------+-------------------------+
|       35.31714579055441|       138.34014373716633|
+------------------------+-------------------------+



In [None]:
#Compute linear regression for predicting quality variable.
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

featureassembler=VectorAssembler(inputCols=["fixed acidity","volatile acidity","citric acid","residual sugar","chlorides","free sulfur dioxide",
                                            "total sulfur dioxide","density","pH","sulphates","alcohol"],outputCol="Features")
output=featureassembler.transform(df)

In [None]:
finalized_data=output.select("Features","quality")
train_data,test_data=finalized_data.randomSplit([0.70,0.30])

In [None]:
regressor=LinearRegression(featuresCol='Features', labelCol='quality')
regressor=regressor.fit(train_data)

In [None]:
pred_results=regressor.evaluate(test_data)
pred_results.predictions.show()

+--------------------+-------+------------------+
|            Features|quality|        prediction|
+--------------------+-------+------------------+
|[4.2,0.17,0.36,1....|      7| 7.120212835147328|
|[4.4,0.32,0.39,4....|      8| 6.709437992665613|
|[4.8,0.17,0.28,2....|      7|  6.35936607243147|
|[4.8,0.34,0.0,6.5...|      6|  5.65541951281719|
|[4.9,0.235,0.27,1...|      6| 5.727242189063048|
|[4.9,0.335,0.14,1...|      5| 5.675637478930469|
|[4.9,0.345,0.34,1...|      5| 5.418609886166649|
|[4.9,0.345,0.34,1...|      5| 5.418609886166649|
|[4.9,0.47,0.17,1....|      6| 5.862301636327885|
|[5.0,0.17,0.56,1....|      7| 6.223420832738071|
|[5.0,0.2,0.4,1.9,...|      6|    6.570583441661|
|[5.0,0.235,0.27,1...|      6|5.7394849489000705|
|[5.0,0.24,0.34,1....|      7| 6.829827666888178|
|[5.0,0.27,0.32,4....|      7| 6.792915095035426|
|[5.0,0.27,0.32,4....|      7| 6.792915095035426|
|[5.0,0.27,0.4,1.2...|      6| 5.604243807211418|
|[5.0,0.29,0.54,5....|      8|6.7544525415451915|


In [None]:
#Compute RandomForestClassifier for predicting quality variable.
train, test = output.randomSplit([0.7, 0.3], seed = 2018)

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'Features', labelCol='quality')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.dtypes

[('fixed acidity', 'double'),
 ('volatile acidity', 'double'),
 ('citric acid', 'double'),
 ('residual sugar', 'double'),
 ('chlorides', 'double'),
 ('free sulfur dioxide', 'double'),
 ('total sulfur dioxide', 'double'),
 ('density', 'double'),
 ('pH', 'double'),
 ('sulphates', 'double'),
 ('alcohol', 'double'),
 ('quality', 'int'),
 ('Features', 'vector'),
 ('rawPrediction', 'vector'),
 ('probability', 'vector'),
 ('prediction', 'double')]

In [None]:
predictions.show(10)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+--------------------+--------------------+----------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|            Features|       rawPrediction|         probability|prediction|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+--------------------+--------------------+----------+
|          4.2|           0.215|       0.23|           5.1|    0.041|               64.0|               157.0|0.99688|3.42|     0.44|    8.0|      3|[4.2,0.215,0.23,5...|[0.0,0.0,0.0,0.01...|[0.0,0.0,0.0,9.01...|       6.0|
|          4.4|            0.32|       0.39|           4.3|     0.03|               31.0|               