Wine quality dataset from: https://archive.ics.uci.edu/ml/datasets/Wine+Quality

In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', delimiter=';')\
    .load('../data/winequality-white.csv')

In [4]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [5]:
df.count()

4898

In [8]:
df.registerTempTable('wine')
sqlContext.sql("SELECT quality, avg(alcohol) AS avg_alcohol FROM wine GROUP BY quality").collect()

[Row(quality=3, avg_alcohol=10.345),
 Row(quality=4, avg_alcohol=10.152453987730064),
 Row(quality=5, avg_alcohol=9.808840082361012),
 Row(quality=6, avg_alcohol=10.575371549893838),
 Row(quality=7, avg_alcohol=11.367935606060609),
 Row(quality=8, avg_alcohol=11.635999999999997),
 Row(quality=9, avg_alcohol=12.18)]

In [28]:
data = sqlContext.sql("SELECT *, CAST(quality AS DOUBLE) AS label FROM wine")
data.cache()

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int, label: double]

In [29]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["sulphates", "density", "alcohol"],
    outputCol="features")
output = assembler.transform(data)
print(output.select("features", "label").first())

Row(features=DenseVector([0.45, 1.001, 8.8]), label=6.0)


In [35]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

(trainingData, testData) = data.randomSplit([0.7, 0.3])

lr = LinearRegression(maxIter=30, regParam=0.3, elasticNetParam=0.1, featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[assembler, lr])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

In [38]:
lrModel = model.stages[1]
# summary only
print(lrModel.coefficients)
print(lrModel.intercept)

[0.147558953905,-15.0475774569,0.198215589004]
18.6759974748


In [37]:
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.810256
