In [2]:
import pyspark as spark
from pyspark.sql import SparkSession
from pyspark import SparkContext 

from pyspark.sql import functions as f

import matplotlib.pyplot as plt

In [3]:
spark = SparkSession \
        .builder\
        .getOrCreate()

In [4]:
# File location and type
file_location = "/FileStore/tables/winequality_red-42ff5.csv"
file_type = "csv"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load(file_location)

display(df)

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,5
7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5
7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7
7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7
7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5


In [5]:
df.printSchema()

In [6]:
df.count()

In [7]:
df.describe('fixed acidity','volatile acidity','citric acid','residual sugar','chlorides','free sulfur dioxide').show()
df.describe('total sulfur dioxide','density','pH','sulphates','alcohol','quality').show()


In [8]:
df.summary('25%','75%').show()

In [9]:

df_stats = df.select(
  f.skewness(f.col('fixed acidity')),
  f.skewness(f.col('volatile acidity')),
  f.skewness(f.col('citric acid')),
  f.skewness(f.col('residual sugar')),
  f.skewness(f.col('chlorides')),
  f.skewness(f.col('free sulfur dioxide')),
  f.skewness(f.col('total sulfur dioxide')),
  f.skewness(f.col('density')),
  f.skewness(f.col('pH')),
  f.skewness(f.col('sulphates')),
  f.skewness(f.col('alcohol')),
  f.skewness(f.col('quality'))
).collect()

print(df_stats)

In [10]:
from pyspark.mllib.stat import Statistics
import pandas as pd

features = df.rdd.map(lambda row: row[0:])
corr_map = Statistics.corr(features, method = "pearson")
corr_df = pd.DataFrame(corr_map)
corr_df.index, corr_df.columns = df.columns, df.columns

print(corr_df.to_string())

In [11]:
df.head() #the first row
df.describe().show() #show row, mean, std but unable to read
df.dropna().count() #Return new df omitting rows with null values
df.na.fill(-1).show(5) #Replace null values
spark.stop()

In [12]:
display(df.groupBy("quality").avg("fixed acidity").orderBy("quality"))

quality,avg(fixed acidity)
3,8.36
4,7.779245283018868
5,8.167254038179149
6,8.347178683385575
7,8.872361809045225
8,8.566666666666665


In [13]:
display(df.groupBy("quality").avg("volatile acidity").orderBy("quality"))

quality,avg(volatile acidity)
3,0.8845000000000001
4,0.6939622641509429
5,0.5770411160058732
6,0.4974843260188096
7,0.4039195979899498
8,0.4233333333333334


In [14]:
display(df.groupBy("quality").avg("citric acid").orderBy("quality"))

quality,avg(citric acid)
3,0.1709999999999999
4,0.1741509433962264
5,0.2436857562408219
6,0.2738244514106587
7,0.3751758793969849
8,0.3911111111111111


In [15]:
display(df.groupBy("quality").avg("residual sugar").orderBy("quality"))

quality,avg(residual sugar)
3,2.635
4,2.69433962264151
5,2.528854625550658
6,2.477194357366772
7,2.7206030150753797
8,2.577777777777777


In [16]:
display(df.groupBy("quality").avg("chlorides").orderBy("quality"))

quality,avg(chlorides)
3,0.1225
4,0.0906792452830188
5,0.0927356828193832
6,0.0849561128526645
7,0.0765879396984924
8,0.0684444444444444


In [17]:
display(df.groupBy("quality").avg("total sulfur dioxide").orderBy("quality"))

quality,avg(total sulfur dioxide)
3,24.9
4,36.24528301886792
5,56.51395007342144
6,40.86990595611285
7,35.02010050251256
8,33.44444444444444


In [18]:
display(df.groupBy("quality").avg("density").orderBy("quality"))

quality,avg(density)
3,0.997464
4,0.9965424528301888
5,0.9971036270190888
6,0.9966150626959256
7,0.9961042713567828
8,0.9952122222222224


In [19]:
display(df.groupBy("quality").avg("pH").orderBy("quality"))

quality,avg(pH)
3,3.398
4,3.381509433962264
5,3.304948604992654
6,3.318072100313484
7,3.290753768844219
8,3.2672222222222214


In [20]:
display(df.groupBy("quality").avg("sulphates").orderBy("quality"))

quality,avg(sulphates)
3,0.5700000000000001
4,0.5964150943396227
5,0.6209691629955947
6,0.6753291536050158
7,0.7412562814070353
8,0.7677777777777778


In [21]:
display(df.groupBy("quality").avg("alcohol").orderBy("quality"))

quality,avg(alcohol)
3,9.955000000000002
4,10.26509433962264
5,9.899706314243751
6,10.629519331243465
7,11.465912897822443
8,12.094444444444443


In [22]:
# Convert to float format
def string_to_float(x):
    return float(x)

#catelogy the data
def catelogy(r):
    if (0<= r <= 6.5):
        label = "bad"
    elif(6.5 < r <= 10):
        label = "good"
    else:
        label = "n/a"
    return label


In [23]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

string_to_float_udf = udf(string_to_float, DoubleType())
quality_udf = udf(lambda x: catelogy(x), StringType())

df = df.withColumn("quality", quality_udf("quality"))
df.show(5)

In [24]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [25]:
transformed = transData(df)
transformed.show(5)

In [26]:
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(transformed)
labelIndexer.transform(transformed).show(5)

In [27]:
featureIndexer =VectorIndexer(inputCol="features", \
                                  outputCol="indexedFeatures", \
                                  maxCategories=4).fit(transformed)
featureIndexer.transform(transformed).show(5)

In [28]:
#TODO
from pyspark.ml.feature import PCA
data = transformed
pca = PCA(k=6, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(data)

result = model.transform(data).select("pcaFeatures")
result.show(truncate=False)


In [29]:
(trainingData, testData) = transformed.randomSplit([0.8, 0.2])
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))
trainingData.show(5)
testData.show(5)

In [30]:
from pyspark.ml.classification import DecisionTreeClassifier

# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", 
                               outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree,labelConverter])
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)



# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)



from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", 
                                              predictionCol="prediction", 
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[-2]
print(rfModel)  # summary only

In [31]:
from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", 
                               outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)


from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", 
                                              predictionCol="prediction", 
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[-2]
print(rfModel)  # summary only

In [32]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol='indexedFeatures', 
                labelCol='indexedLabel')

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", 
                               outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, nb,labelConverter])
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", 
                                              predictionCol="prediction", 
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(accuracy)
print("Test Error = %g" % (1.0 - accuracy))


In [33]:

from pyspark.ml.classification import LinearSVC

svm = LinearSVC(featuresCol='indexedFeatures', 
                labelCol='indexedLabel')
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", 
                               outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, svm,labelConverter])
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", 
                                              predictionCol="prediction", 
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(accuracy)
print("Test Error = %g" % (1.0 - accuracy))
