Create a spark session and load the Housing Data set

In [0]:
from pyspark.sql import SparkSession


In [0]:
# File location and type
file_location = "/FileStore/tables/red_or_white_wine-6.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
 
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
 

Data pre-processing

In [0]:
# Import the required libraries
 
from pyspark.sql.functions import datediff,date_format,to_date,to_timestamp

In [0]:
import pyspark.sql.functions as f

In [0]:
df=df.withColumn('type',df.type.cast('integer'))

In [0]:
data = df.select(['fixed acidity',
'volatile acidity',
'citric acid',
'residual sugar',
'chlorides',
'free sulfur dioxide',
'total sulfur dioxide',
'density',
'pH',
'sulphates',
'alcohol',
'quality',
'type'
])

In [0]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)
 |-- type: integer (nullable = true)



In [0]:
df=df.dropna()

In [0]:
print((df.count(),len(df.columns)))

(0, 13)


In [0]:
# Create a 70-30 train test split
 
train_data,test_data=data.randomSplit([0.7,0.3])

In [0]:
print((train_data.count(),len(train_data.columns)))

(4522, 13)


In [0]:
print((test_data.count(),len(test_data.columns)))

(1975, 13)


In [0]:
# Import the required libraries
 
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler,StringIndexer ,OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

Building the Baseline Linear Regression model

In [0]:
type_indexer = StringIndexer(inputCol='type',outputCol='type_index',handleInvalid='keep')

In [0]:
assembler= VectorAssembler(
                inputCols=['fixed acidity',
'volatile acidity',
'citric acid',
'residual sugar',
'chlorides',
'free sulfur dioxide',
'total sulfur dioxide',
'density',
'pH',
'sulphates',
'alcohol',
'type_index'
],
outputCol="features")

In [0]:
pipe = Pipeline(stages= [type_indexer,assembler])


In [0]:
fitted_pipe=pipe.fit(train_data)

In [0]:
train_data=fitted_pipe.transform(train_data)

In [0]:
#Create an object for the Linear Regression model
 
lr_model = LinearRegression(labelCol='quality')

In [0]:
# Fit the model on the train data
 
fit_model = lr_model.fit(train_data.select(['features','quality']))

In [0]:
# Transform the test data using the model to predict the duration
 
test_data=fitted_pipe.transform(test_data)

In [0]:
# Store the results in a dataframe
 
results = fit_model.transform(test_data)

In [0]:
results.select(['quality','prediction']).show()

+-------+------------------+
|quality|        prediction|
+-------+------------------+
|      6| 6.669103853012459|
|      7|  6.25753590966783|
|      7| 7.014398087572744|
|      5| 6.359744807923747|
|      6|6.5779451397449975|
|      7| 6.658887538918037|
|      6|5.7511157350986295|
|      6| 6.016863238391828|
|      6| 5.651383738514923|
|      4| 6.111362134112056|
|      5| 5.799682679020492|
|      5| 5.405706814685004|
|      6|6.5138638942652705|
|      7| 6.621671144827694|
|      8| 6.623148256566871|
|      6| 5.710553469385481|
|      6| 5.736984229694741|
|      6| 5.963770118039925|
|      4| 5.054275389173043|
|      6| 5.827769574460426|
+-------+------------------+
only showing top 20 rows



Evaluating the model

In [0]:
test_results = fit_model.evaluate(test_data)

In [0]:
test_results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
| -0.6691038530124587|
|  0.7424640903321702|
|-0.01439808757274...|
| -1.3597448079237466|
| -0.5779451397449975|
|  0.3411124610819627|
|  0.2488842649013705|
|-0.01686323839182...|
| 0.34861626148507696|
| -2.1113621341120563|
| -0.7996826790204921|
|-0.40570681468500425|
| -0.5138638942652705|
|  0.3783288551723061|
|  1.3768517434331287|
|  0.2894465306145193|
|  0.2630157703052589|
| 0.03622988196007526|
| -1.0542753891730428|
| 0.17223042553957413|
+--------------------+
only showing top 20 rows



In [0]:
test_results.rootMeanSquaredError

Out[52]: 0.728893078628563