In [10]:
import os
import sys

spark_path = 'C:\spark\spark-2.4.5-bin-hadoop2.7'
os.environ['SPARK_HOME']= spark_path
os.environ['HADOOP_HOME']=spark_path
sys.path.append(spark_path+'\bin')
sys.path.append(spark_path+'\python')
sys.path.append(spark_path+'\python\pyspark')
sys.path.append(spark_path+'\python\lib')
sys.path.append(spark_path+'\python\lib\pyspark.zip')
sys.path.append(spark_path+'\python\lib\py4j-0.10.7-src.zip')

In [11]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)


In [12]:
wine_df = spark.read.csv('F:\Data\wine-quality-master\winequality\winequality-red.csv', header = True, inferSchema=True, sep =';')
wine_df.take(1)

[Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5)]

In [13]:
wine_df.select("alcohol","quality").describe().show()

+-------+------------------+------------------+
|summary|           alcohol|           quality|
+-------+------------------+------------------+
|  count|              1599|              1599|
|   mean|10.422983114446502|5.6360225140712945|
| stddev|1.0656675818473935|0.8075694397347051|
|    min|               8.4|                 3|
|    max|              14.9|                 8|
+-------+------------------+------------------+



In [14]:
import six
for i in wine_df.columns:
    if not( isinstance(wine_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to Target for ", i, wine_df.stat.corr('quality',i))

Correlation to Target for  fixed acidity 0.12405164911322263
Correlation to Target for  volatile acidity -0.3905577802640061
Correlation to Target for  citric acid 0.22637251431804048
Correlation to Target for  residual sugar 0.013731637340065798
Correlation to Target for  chlorides -0.12890655993005293
Correlation to Target for  free sulfur dioxide -0.05065605724427597
Correlation to Target for  total sulfur dioxide -0.18510028892653774
Correlation to Target for  density -0.17491922778336474
Correlation to Target for  pH -0.0577313912053826
Correlation to Target for  sulphates 0.25139707906925995
Correlation to Target for  alcohol 0.4761663240011364
Correlation to Target for  quality 1.0


In [15]:
wine_df = wine_df.drop("residual sugar").drop("free sulfur dioxide") \
                       .drop("pH").drop("density") \
                       .drop("chlorides").drop('fixed acidity')

In [16]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['volatile acidity', 'citric acid', 'total sulfur dioxide', 'sulphates', 'alcohol'], outputCol = 'features')
vwine_df = vectorAssembler.transform(wine_df)
vwine_df.take(3)

[Row(volatile acidity=0.7, citric acid=0.0, total sulfur dioxide=34.0, sulphates=0.56, alcohol=9.4, quality=5, features=DenseVector([0.7, 0.0, 34.0, 0.56, 9.4])),
 Row(volatile acidity=0.88, citric acid=0.0, total sulfur dioxide=67.0, sulphates=0.68, alcohol=9.8, quality=5, features=DenseVector([0.88, 0.0, 67.0, 0.68, 9.8])),
 Row(volatile acidity=0.76, citric acid=0.04, total sulfur dioxide=54.0, sulphates=0.65, alcohol=9.8, quality=5, features=DenseVector([0.76, 0.04, 54.0, 0.65, 9.8]))]

# Linear Regression

In [17]:
splits = vwine_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [18]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='quality', maxIter=10)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-1.192327680218766,-0.1406506561013906,-0.0023366479805107766,0.6712864466317191,0.29313869740937387]
Intercept: 2.910302795585135


In [19]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","quality","features").show()

+------------------+-------+--------------------+
|        prediction|quality|            features|
+------------------+-------+--------------------+
| 6.653389269734186|      7|[0.12,0.45,21.0,0...|
| 6.494977065005012|      6|[0.18,0.34,58.0,0...|
| 5.983445948188404|      6|[0.18,0.48,15.0,0...|
| 6.182057772049744|      6|[0.18,0.51,23.0,0...|
| 6.102516306198623|      6|[0.21,0.4,165.0,0...|
| 5.802704773859206|      7|[0.21,0.44,24.0,0...|
| 6.402419969272411|      6|[0.21,0.49,32.0,0...|
| 6.031437110315464|      5|[0.21,0.52,23.0,0...|
| 6.040904409828122|      6|[0.22,0.3,20.0,0....|
| 5.909176988307944|      6|[0.23,0.57,8.0,0....|
| 5.726800545549747|      6|[0.24,0.39,10.0,0...|
| 5.643793566410453|      6|[0.24,0.4,25.0,0....|
|5.9479263728914855|      5|[0.24,0.49,28.0,0...|
| 6.674847734571488|      7|[0.25,0.39,10.0,0...|
|  6.11382030178604|      6|[0.25,0.46,42.0,0...|
| 6.109334628243798|      6|[0.26,0.26,77.0,0...|
| 5.950712340576575|      6|[0.26,0.3,38.0,0....|


In [20]:
#Find R2 for Linear Regression
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="quality",metricName="r2")
evaluator.evaluate(predictions)

0.40604656203478895

In [21]:
sc.stop()