In [341]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import MulticlassMetrics
import pandas as pd
import csv
import mlflow
import mlflow.pyfunc
import mlflow.sklearn
import numpy as np
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
import cloudpickle
from time import *
from pyspark.mllib.linalg import Vectors

In [342]:
spark = SparkSession.builder.appName("CS643").getOrCreate()

In [343]:
trainDF = spark.read.format("csv").load("ValidationDataset.csv" , inferSchema='true',header = True ,sep =";")

In [344]:
trainDF.printSchema()
print("Rows: %s" % trainDF.count())

root
 |-- """fixed acidity"""": double (nullable = true)
 |-- """"volatile acidity"""": double (nullable = true)
 |-- """"citric acid"""": double (nullable = true)
 |-- """"residual sugar"""": double (nullable = true)
 |-- """"chlorides"""": double (nullable = true)
 |-- """"free sulfur dioxide"""": integer (nullable = true)
 |-- """"total sulfur dioxide"""": integer (nullable = true)
 |-- """"density"""": double (nullable = true)
 |-- """"pH"""": double (nullable = true)
 |-- """"sulphates"""": double (nullable = true)
 |-- """"alcohol"""": double (nullable = true)
 |-- """"quality""""": integer (nullable = true)

Rows: 160


In [345]:
trainDF.limit(5).toPandas()

Unnamed: 0,"""""""fixed acidity""""""""","""""""""volatile acidity""""""""","""""""""citric acid""""""""","""""""""residual sugar""""""""","""""""""chlorides""""""""","""""""""free sulfur dioxide""""""""","""""""""total sulfur dioxide""""""""","""""""""density""""""""","""""""""pH""""""""","""""""""sulphates""""""""","""""""""alcohol""""""""","""""""""quality"""""""""""
0,7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25,67,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15,54,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17,60,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5


# Building the model

In [346]:
getCols = [c for c in trainDF.columns if c != '""""quality"""""']
assemeble = VectorAssembler(inputCols=getCols, outputCol="features")
wineData = assemeble.transform(trainDF)
wineData.limit(5).toPandas()

Unnamed: 0,"""""""fixed acidity""""""""","""""""""volatile acidity""""""""","""""""""citric acid""""""""","""""""""residual sugar""""""""","""""""""chlorides""""""""","""""""""free sulfur dioxide""""""""","""""""""total sulfur dioxide""""""""","""""""""density""""""""","""""""""pH""""""""","""""""""sulphates""""""""","""""""""alcohol""""""""","""""""""quality""""""""""",features
0,7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5,"[7.4, 0.7, 0.0, 1.9, 0.076, 11.0, 34.0, 0.9978..."
1,7.8,0.88,0.0,2.6,0.098,25,67,0.9968,3.2,0.68,9.8,5,"[7.8, 0.88, 0.0, 2.6, 0.098, 25.0, 67.0, 0.996..."
2,7.8,0.76,0.04,2.3,0.092,15,54,0.997,3.26,0.65,9.8,5,"[7.8, 0.76, 0.04, 2.3, 0.092, 15.0, 54.0, 0.99..."
3,11.2,0.28,0.56,1.9,0.075,17,60,0.998,3.16,0.58,9.8,6,"[11.2, 0.28, 0.56, 1.9, 0.075, 17.0, 60.0, 0.9..."
4,7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5,"[7.4, 0.7, 0.0, 1.9, 0.076, 11.0, 34.0, 0.9978..."


In [347]:
lr = LinearRegression(maxIter=30, regParam=0.3, elasticNetParam=0.3, featuresCol="features", labelCol='""""quality"""""')
lrModel = lr.fit(wineData)
for x in zip(getCols, lrModel.coefficients):
    print(x)

('"""fixed acidity""""', 0.0)
('""""volatile acidity""""', -0.814880195750423)
('""""citric acid""""', 9.889853024318596e-05)
('""""residual sugar""""', 0.0)
('""""chlorides""""', -0.24994146990870497)
('""""free sulfur dioxide""""', 0.0)
('""""total sulfur dioxide""""', -0.0014268768943000551)
('""""density""""', 0.0)
('""""pH""""', 0.0)
('""""sulphates""""', 0.0)
('""""alcohol""""', 0.22998287568348508)


In [348]:
predict = lrModel.transform(wineData)
predict.limit(5).toPandas()

Unnamed: 0,"""""""fixed acidity""""""""","""""""""volatile acidity""""""""","""""""""citric acid""""""""","""""""""residual sugar""""""""","""""""""chlorides""""""""","""""""""free sulfur dioxide""""""""","""""""""total sulfur dioxide""""""""","""""""""density""""""""","""""""""pH""""""""","""""""""sulphates""""""""","""""""""alcohol""""""""","""""""""quality""""""""""",features,prediction
0,7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5,"[7.4, 0.7, 0.0, 1.9, 0.076, 11.0, 34.0, 0.9978...",5.393699
1,7.8,0.88,0.0,2.6,0.098,25,67,0.9968,3.2,0.68,9.8,5,"[7.8, 0.88, 0.0, 2.6, 0.098, 25.0, 67.0, 0.996...",5.286428
2,7.8,0.76,0.04,2.3,0.092,15,54,0.997,3.26,0.65,9.8,5,"[7.8, 0.76, 0.04, 2.3, 0.092, 15.0, 54.0, 0.99...",5.404267
3,11.2,0.28,0.56,1.9,0.075,17,60,0.998,3.16,0.58,9.8,6,"[11.2, 0.28, 0.56, 1.9, 0.075, 17.0, 60.0, 0.9...",5.791148
4,7.4,0.7,0.0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5,"[7.4, 0.7, 0.0, 1.9, 0.076, 11.0, 34.0, 0.9978...",5.393699


In [349]:
evaluateData = RegressionEvaluator(labelCol='""""quality"""""', predictionCol="prediction", metricName="rmse")
rmse = evaluateData.evaluate(predict)
print("RMSE = %g" %rmse)

RMSE = 0.681953


### now need to compare to another model to see how well it actualy is

In [350]:
#this gets the avg wine quality
avgQ = trainDF.groupBy().avg('""""quality"""""').first()[0]
print(avgQ)
#need to create a baseline to est the model
baseline = wineData.select(col('""""quality"""""'), lit(avgQ).alias('prediction'))
baselineRmse = evaluateData.evaluate(baseline)
print("RMSE of baseline = %g" %baselineRmse)

5.73125
RMSE of baseline = 0.827057


In [351]:
(trainDF, testDF) = trainDF.randomSplit([0.7, 0.3])
pipe = Pipeline(stages=[assemeble, lr])
lrPipeModel = pipe.fit(trainDF)
trainPredDF = lrPipeModel.transform(trainDF)
testPredDF = lrPipeModel.transform(testDF)
print("RMSE on training DF = %g" % evaluateData.evaluate(trainPredDF))
print("RMSE on test DF = %g" % evaluateData.evaluate(testPredDF))

RMSE on training DF = 0.683055
RMSE on test DF = 0.666021


In [352]:
search = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.3, 0.9]) \
    .addGrid(lr.elasticNetParam, [0.4, 0.6, 0.8]).build()
crossValidate = CrossValidator(estimator=pipe, estimatorParamMaps= search, evaluator=evaluateData, numFolds=3)
model = crossValidate.fit(trainDF)
cvTestPred = model.transform(testDF)
print("RMSE on test data cross validate = %g" % evaluateData.evaluate(cvTestPred))
print(model.avgMetrics)

RMSE on test data cross validate = 0.674542
[0.7111600851588912, 0.7111600851588912, 0.7111600851588912, 0.7051034952421694, 0.725221797181878, 0.7504106245048159, 0.8139591179772736, 0.8370786326843926, 0.8370786326843926]


## Random Forest

In [353]:
forest = RandomForestRegressor(featuresCol="features", labelCol='""""quality"""""', numTrees=100, maxBins=128, maxDepth=20)
rfPipe = Pipeline(stages=[assemeble, forest])
rfPipeModel = rfPipe.fit(trainDF)
rfTrainPred = rfPipeModel.transform(trainDF)
rfTestPred = rfPipeModel.transform(testDF)

print("RF RMSE on training data = %g" % evaluateData.evaluate(rfTrainPred))
print("RF RMSE on test data = %g" % evaluateData.evaluate(rfTestPred))

RF RMSE on training data = 0.242234
RF RMSE on test data = 0.589819


## Gettting the F1 score

In [354]:
transformData = trainDF.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[0:-1])))
split = [0.01, 1.0-.01]
trainData, testData = transformData.randomSplit(split, 1)

In [355]:
start = time()
model = RandomForest.trainClassifier(trainData, numClasses=10, \
                                    categoricalFeaturesInfo={}, \
                                    numTrees=3, featureSubsetStrategy="auto", impurity="gini", \
                                    maxDepth=16, maxBins=227612, seed=1)
end = time()
timePassed = end - start
print(timePassed)
predictions = model.predict(testData.map(lambda x: x.features))
labelsPredictions = testData.map(lambda x: x.label).zip(predictions)
accuracy = labelsPredictions.filter(lambda x: x[0] == x[1]).count()/float(testData.count())
print(accuracy*100)

1.0837018489837646
36.84210526315789


In [356]:
metrics = MulticlassMetrics(labelsPredictions)
precision = metrics.precision
recall = metrics.recall
f1Score = metrics.weightedFMeasure()
print("This is using Random Forest")
print("The F1 Score is: %g" %f1Score)



This is using Random Forest
The F1 Score is: 0.538462
