In [1]:
import os
import sys

spark_path = '/Users/pradmishra/Downloads/spark-2.1.0-bin-hadoop2.7'
os.environ['SPARK_HOME']= spark_path
os.environ['HADOOP_HOME']=spark_path
sys.path.append(spark_path+'/bin')
sys.path.append(spark_path+'/python')
sys.path.append(spark_path+'/python/pyspark')
sys.path.append(spark_path+'/python/lib')
sys.path.append(spark_path+'/python/lib/pyspark.zip')
sys.path.append(spark_path+'/python/lib/py4j-0.10.4-src.zip')

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)


In [5]:
wine_df = spark.read.csv('/Users/pradmishra/Documents/winequality-red.csv', header = True, inferSchema=True, sep =';')
wine_df.take(1)

[Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5)]

In [6]:
wine_df.select("alcohol","quality").describe().show()

+-------+------------------+------------------+
|summary|           alcohol|           quality|
+-------+------------------+------------------+
|  count|              1599|              1599|
|   mean|10.422983114446502|5.6360225140712945|
| stddev|1.0656675818473935|0.8075694397347051|
|    min|               8.4|                 3|
|    max|              14.9|                 8|
+-------+------------------+------------------+



In [8]:
import six
for i in wine_df.columns:
    if not( isinstance(wine_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to Target for ", i, wine_df.stat.corr('quality',i))

('Correlation to Target for ', 'fixed acidity', 0.12405164911322263)
('Correlation to Target for ', 'volatile acidity', -0.3905577802640061)
('Correlation to Target for ', 'citric acid', 0.22637251431804048)
('Correlation to Target for ', 'residual sugar', 0.013731637340065798)
('Correlation to Target for ', 'chlorides', -0.12890655993005293)
('Correlation to Target for ', 'free sulfur dioxide', -0.05065605724427597)
('Correlation to Target for ', 'total sulfur dioxide', -0.18510028892653774)
('Correlation to Target for ', 'density', -0.17491922778336474)
('Correlation to Target for ', 'pH', -0.0577313912053826)
('Correlation to Target for ', 'sulphates', 0.25139707906925995)
('Correlation to Target for ', 'alcohol', 0.4761663240011364)
('Correlation to Target for ', 'quality', 1.0)


In [9]:
wine_df = wine_df.drop("residual sugar").drop("free sulfur dioxide") \
                       .drop("pH").drop("density") \
                       .drop("chlorides").drop('fixed acidity')

In [10]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['volatile acidity', 'citric acid', 'total sulfur dioxide', 'sulphates', 'alcohol'], outputCol = 'features')
vwine_df = vectorAssembler.transform(wine_df)
vwine_df.take(3)

[Row(volatile acidity=0.7, citric acid=0.0, total sulfur dioxide=34.0, sulphates=0.56, alcohol=9.4, quality=5, features=DenseVector([0.7, 0.0, 34.0, 0.56, 9.4])),
 Row(volatile acidity=0.88, citric acid=0.0, total sulfur dioxide=67.0, sulphates=0.68, alcohol=9.8, quality=5, features=DenseVector([0.88, 0.0, 67.0, 0.68, 9.8])),
 Row(volatile acidity=0.76, citric acid=0.04, total sulfur dioxide=54.0, sulphates=0.65, alcohol=9.8, quality=5, features=DenseVector([0.76, 0.04, 54.0, 0.65, 9.8]))]

# Linear Regression

In [11]:
splits = vwine_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [12]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='quality', maxIter=10)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-1.23193967273,-0.00623134264415,-0.00201689017455,0.69964838906,0.284847164855]
Intercept: 2.94453679405


In [13]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","quality","features").show()

+------------------+-------+--------------------+
|        prediction|quality|            features|
+------------------+-------+--------------------+
|  6.67295927293042|      7|[0.12,0.45,21.0,0...|
| 6.592076034536409|      6|[0.16,0.44,31.0,0...|
| 6.325598700656357|      6|[0.16,0.64,52.0,0...|
| 6.426045312002804|      6|[0.18,0.37,109.0,...|
| 6.454609438896978|      5|[0.18,0.4,67.0,0....|
| 6.228836496622495|      6|[0.18,0.51,23.0,0...|
| 6.320390021914953|      6|[0.19,0.42,30.0,0...|
| 6.230954559801308|      7|[0.21,0.37,10.0,0...|
|6.0673756257525095|      5|[0.21,0.52,23.0,0...|
| 6.158161515419437|      6|[0.22,0.24,28.0,0...|
| 6.158161515419437|      6|[0.22,0.24,28.0,0...|
| 6.481272713052332|      6|[0.23,0.37,60.0,0...|
| 6.126148505571308|      5|[0.23,0.4,67.0,0....|
| 6.505716630256307|      6|[0.23,0.42,17.0,0...|
|  6.03470657845862|      6|[0.24,0.33,73.0,0...|
| 6.364884614894949|      6|[0.24,0.34,22.0,0...|
| 6.412862713794567|      7|[0.24,0.42,22.0,1...|


In [14]:
#Find R2 for Linear Regression
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="quality",metricName="r2")
evaluator.evaluate(predictions)

0.34785214379554497

In [None]:
sc.stop()