# Wine Quality Classification - Spark R Jupyter Notebook

## Set up environment

In [1]:
# Start Spark session

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master="local[*]", sparkConfig=list(spark.driver.memory="2g"))


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var, window

The following objects are masked from ‘package:base’:

    as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
    rank, rbind, sample, startsWith, subset, summary, transform, union

Spark package found in SPARK_HOME: /share/apps/compute/spark/spark-2.4.0-bin-hadoop2.6


Launching java with spark-submit command /share/apps/compute/spark/spark-2.4.0-bin-hadoop2.6/bin/spark-submit   --driver-memory "2g" sparkr-shell /tmp/Rtmp5RJo0K/backend_port17c375ac4150 


Java ref type org.apache.spark.sql.SparkSession id 1 

In [2]:
# Print software versions

R.Version()$version.string
Sys.getenv("SPARK_HOME")
sparkR.version()

## Read in data

In [3]:
# Read data into a Spark dataframe
# Data adapted from: https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv

sdf <- read.df("../winequality-white.csv", "csv", header="true", inferSchema="true")

In [4]:
# Cache dataframe

cache(sdf)

SparkDataFrame[id:int, fixed_acidity:double, volatile_acidity:double, citric_acid:double, residual_sugar:double, chlorides:double, free_sulfur_dioxide:double, total_sulfur_dioxide:double, density:double, pH:double, sulphates:double, alcohol:double, quality:string]

In [5]:
# Examine schema

schema(sdf)

StructType
|-name = "id", type = "IntegerType", nullable = TRUE
|-name = "fixed_acidity", type = "DoubleType", nullable = TRUE
|-name = "volatile_acidity", type = "DoubleType", nullable = TRUE
|-name = "citric_acid", type = "DoubleType", nullable = TRUE
|-name = "residual_sugar", type = "DoubleType", nullable = TRUE
|-name = "chlorides", type = "DoubleType", nullable = TRUE
|-name = "free_sulfur_dioxide", type = "DoubleType", nullable = TRUE
|-name = "total_sulfur_dioxide", type = "DoubleType", nullable = TRUE
|-name = "density", type = "DoubleType", nullable = TRUE
|-name = "pH", type = "DoubleType", nullable = TRUE
|-name = "sulphates", type = "DoubleType", nullable = TRUE
|-name = "alcohol", type = "DoubleType", nullable = TRUE
|-name = "quality", type = "StringType", nullable = TRUE

## Prepare data

In [6]:
# Split into train & test sets

seed <- 12345
train_df <- sample(sdf, withReplacement=FALSE, fraction=0.7, seed=seed)
test_df <- except (sdf, train_df)
dim(train_df)
dim(test_df)

## Train random forest model

In [7]:
model <- spark.randomForest(train_df, quality ~ ., type="classification", numTrees=30, seed=seed)
head(summary(model))

## Evaluate model

In [8]:
# Apply model to test data

predictions_sdf <- predict(model, test_df)
class(predictions_sdf)

In [9]:
# Convert results from Spark DataFrame to R data.frame

predictions_df <- as.data.frame(predictions_sdf)
class(predictions_df)
head(predictions_df)

id,fixed_acidity,volatile_acidity,citric_acid,residual_sugar,chlorides,free_sulfur_dioxide,total_sulfur_dioxide,density,pH,sulphates,alcohol,quality,rawPrediction,probability,prediction
<int>,<dbl>,<dbl>,<dbl>,<dbl>,<dbl>,<dbl>,<dbl>,<dbl>,<dbl>,<dbl>,<dbl>,<chr>,<list>,<list>,<chr>
877,6.9,0.36,0.34,4.2,0.018,57,119,0.9898,3.28,0.36,12.7,good,<environment: 0x55f22862b990>,<environment: 0x55f224988c70>,good
1057,7.5,0.21,0.34,1.2,0.06,26,111,0.9931,3.51,0.47,10.7,average,<environment: 0x55f228630928>,<environment: 0x55f224de6e00>,average
2429,7.3,0.22,0.26,1.5,0.04,32,172,0.99194,3.27,0.48,11.2,average,<environment: 0x55f228638d18>,<environment: 0x55f224dfc5a8>,average
4027,7.6,0.19,0.37,13.1,0.033,52,151,0.99726,3.18,0.79,10.4,average,<environment: 0x55f22863f1b0>,<environment: 0x55f224df7808>,average
4493,6.0,0.19,0.37,9.7,0.032,17,50,0.9932,3.08,0.66,12.0,average,<environment: 0x55f228644ae8>,<environment: 0x55f2248e16b8>,average
4535,6.4,0.24,0.27,1.5,0.04,35,105,0.98914,3.13,0.3,12.4,average,<environment: 0x55f22864c458>,<environment: 0x55f2248b4c30>,average


In [10]:
# Calculate accuracy

accuracy <- mean(predictions_df$quality == predictions_df$prediction)
sprintf ("Accuracy on Test Data:  %f", accuracy)

In [11]:
# Confusion matrix

table(predictions_df$quality, predictions_df$prediction)

         
          average bad good
  average     444 164   73
  bad         167 337    8
  good        165  11  123

## Save model

In [12]:
# Save model (NOTE:  Existing model will be overwritten)

write.ml(model, "wine-model-sparkR", overwrite=TRUE)

## Stop cluster

In [13]:
# Stop Spark cluster

sparkR.stop()