<a href="https://colab.research.google.com/github/mahfuz978/Apache-Spark/blob/main/SparkMLib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark MLib

Linear Regression model is one the oldest and widely used machine learning approach which assumes a relationship between dependent and independent variables. For example, a modeler might want to predict the forecast of the rain based on the humidity ratio. Linear Regression consists of the best fitting line through the scattered points on the graph and the best fitting line is known as the regression line.

The goal of this exercise to predict the housing prices by the given features. Let's predict the prices of the Boston Housing dataset by considering MEDV as the output variable and all the other variables as input.

In [13]:
!wget https://gist.githubusercontent.com/ghaiyur-musubi/eb32744bd0c24b62efe1ca84d50938fc/raw/b17119a2755876c4bdf88e10f015d5be84c5d003/BostonHousing.csv

--2021-08-24 17:13:15--  https://gist.githubusercontent.com/ghaiyur-musubi/eb32744bd0c24b62efe1ca84d50938fc/raw/b17119a2755876c4bdf88e10f015d5be84c5d003/BostonHousing.csv
Resolving gist.githubusercontent.com (gist.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to gist.githubusercontent.com (gist.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35735 (35K) [text/plain]
Saving to: ‘BostonHousing.csv.2’


2021-08-24 17:13:16 (12.1 MB/s) - ‘BostonHousing.csv.2’ saved [35735/35735]



In [14]:
!pip install pyspark



In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config("spark.ui.port", "4050")\
        .getOrCreate()

In [16]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

df = spark.read.csv("BostonHousing.csv", inferSchema = True, header = True)
df.show(20)

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|
|0.21124|12.5| 7.87|   0|0.524|5.631|100.0|6.0821|  5|311|   15.2

In [17]:
df.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [18]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|              crim|                zn|             indus|              chas|                nox|                rm|               age|              dis|              rad|               tax|           ptratio|                 b|             lstat|              medv|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|               506|               506|               506|               506|                506|               506|               506|              506|              

## Analyzing the data
Now that we have uploaded the dataset, we can start analyzing. For our linear regression model we need to import two modules from Pyspark i.e. Vector Assembler and Linear Regression. Vector Assembler is a transformer that assembles all the features into one vector from multiple columns that contain type double. We could have used StringIndexer if any of our columns contains string values to convert it into numeric values. Luckily, the BostonHousing dataset only contains double values, so we don't need to worry about StringIndexer for now.



Next step is to convert all the features from different columns into a single column and let's call this new vector column as 'Attributes' in the outputCol.


In [19]:
# input all the features into one vector column
assembler = VectorAssembler(inputCols = ['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = "Attributes")

output = assembler.transform(df)

#Input vs. Output
finalized_data = output.select("Attributes", "medv")

finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



Here, 'Attributes' are in the input features from all the columns and 'medv' is the target column. Next, we should split the training and testing data according to our dataset (0.8 and 0.2 in this case).

In [20]:
# split training and testing data
train_data, test_data = finalized_data.randomSplit([.8,.2])

regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')

# learn to fit the model in the training set
regressor = regressor.fit(train_data)

# to predict the prices on the training set
pred = regressor.evaluate(test_data)

#predict the model
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.01096,55.0,2.2...|22.0| 27.48688139596633|
|[0.01439,60.0,2.9...|29.1|31.628212136761007|
|[0.01501,90.0,1.2...|50.0| 45.93363674116101|
|[0.01538,90.0,3.7...|44.0|37.398435622287224|
|[0.02055,85.0,0.7...|24.7| 25.44992282340779|
|[0.0315,95.0,1.47...|34.9|30.367294499312635|
|[0.03427,0.0,5.19...|19.5|20.268570052446712|
|[0.03445,82.5,2.0...|24.1|29.459723839483704|
|[0.03466,35.0,6.0...|19.4|23.637147193230806|
|[0.03871,52.5,5.3...|23.2| 27.13580957592068|
|[0.03961,0.0,5.19...|21.1|20.849268799603927|
|[0.04203,28.0,15....|22.9|28.641847217669763|
|[0.04666,80.0,1.5...|30.3| 32.74254163744575|
|[0.05023,35.0,6.0...|17.1|20.248219345526834|
|[0.05302,0.0,3.41...|28.7|30.478950989955578|
|[0.05644,40.0,6.4...|32.4|37.324427626521796|
|[0.05735,0.0,4.49...|26.6| 27.49772395467309|
|[0.06129,20.0,3.3...|46.0|  41.4241344312391|
|[0.06211,40.

In [21]:
# we can alo print the coefficient and intercept of the regressiong model

coeff = regressor.coefficients

#X and Y intercepts

intr = regressor.intercept


print("the coefficient of the model is %a" %coeff)
print("the intercept of the model is %f" %intr )

the coefficient of the model is DenseVector([-0.1159, 0.0574, 0.0512, 1.6454, -22.2679, 3.4674, 0.0017, -1.5458, 0.3235, -0.0126, -0.9546, 0.0075, -0.5104])
the intercept of the model is 41.499323


## Basic statistical analysis

Once we are done with the basic linear regression operation, we can go a bit further and analyze our model statistically by importing RegressionEvaluator module from Pyspark.

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol = "medv", predictionCol = "prediction", metricName = "rmse")

# Root mean square error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" %rmse)


# mean square error
mse = eval.evaluate(pred.predictions, {eval.metricName: 'mse'})
print("MSE: %.3f" %mse)

#mean absolute error
mae = eval.evaluate(pred.predictions, {eval.metricName: 'mae'})
print("MAE: %.3f" %mae)


# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("R2: %.3f" %r2)

RMSE: 4.684
MSE: 21.942
MAE: 3.556
R2: 0.757
