<a href="https://colab.research.google.com/github/tnflynt/Data_Analyst_Portfolio/blob/main/Boston_house_price_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Boston house price prediction

### The objective of this exercise is to predict the median value of houses in the suburbs of Boston

**Step 1**: Install Spark

In [None]:
!sudo apt update
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark

[33m0% [Working][0m            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Waiting for header[0m                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
[33m0% [Connecting to archive.ubuntu.com] [Waiting for headers] [Connecting to ppa.[0m                                                                               Hit:3 http://security.ubuntu.com/ubuntu bionic-security InRelease
                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to ppa.launch[0m[33m0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)[0m                          

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

**Step 2**: Mount Google drive so that we can access the dataset. The dataset sits in Google drive

In [None]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Step 3**: Read the contents of Google drive

In [None]:
# After executing the cell above, Drive
# files will be present in "/content/drive/My Drive".
!ls "/content/drive/My Drive"

 airline.csv	        cal_housing.data		      diabetes.csv
 bank.csv	        cal_housing.domain		      heart.csv
 boston_data.csv       'Colab Notebooks'
 boston_test_data.csv   Copy_of_Introduction_to_Spark.ipynb


**Step 4**: Import SparkSession which provides a single point of entry to interact with underlying Spark functionality

In [None]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create spark_session
spark_session = SparkSession.builder.getOrCreate()

**Step 5**: Read in the data files

In [None]:
# Load in the training data
df_train = spark_session.read.option("header", "true").csv("/content/drive/My Drive/boston_data.csv")

In [None]:
# Load in the test data
df_test = spark_session.read.option("header", "true").csv("/content/drive/My Drive/boston_test_data.csv")

**Step 6**: Display the contents of the DataFrame

In [None]:
df_train.show()
df_test.show()

+-------+----+-----+----+-----+-----+-----+------+----+-----+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis| rad|  tax|ptratio| black|lstat|medv|
+-------+----+-----+----+-----+-----+-----+------+----+-----+-------+------+-----+----+
|0.15876| 0.0|10.81| 0.0|0.413|5.961| 17.5|5.2873| 4.0|305.0|   19.2|376.94| 9.88|21.7|
|0.10328|25.0| 5.13| 0.0|0.453|5.927| 47.2| 6.932| 8.0|284.0|   19.7| 396.9| 9.22|19.6|
| 0.3494| 0.0|  9.9| 0.0|0.544|5.972| 76.7|3.1025| 4.0|304.0|   18.4|396.24| 9.97|20.3|
|2.73397| 0.0|19.58| 0.0|0.871|5.597| 94.9|1.5257| 5.0|403.0|   14.7|351.85|21.45|15.4|
|0.04337|21.0| 5.64| 0.0|0.439|6.115| 63.0|6.8147| 4.0|243.0|   16.8|393.97| 9.43|20.5|
| 0.0837|45.0| 3.44| 0.0|0.437|7.185| 38.9|4.5667| 5.0|398.0|   15.2| 396.9| 5.39|34.9|
|0.19073|22.0| 5.86| 0.0|0.431|6.718| 17.5|7.8265| 7.0|330.0|   19.1|393.74| 6.56|26.2|
|0.26938| 0.0|  9.9| 0.0|0.544|6.266| 82.8|3.2628| 4.0|304.0|   18.4|393.39|  7.9|21.6|
|10.0623| 0.0| 18.1| 0.0|0.584|6

**Step 7**: Display the data types

In [None]:
df_train.dtypes

[('crim', 'string'),
 ('zn', 'string'),
 ('indus', 'string'),
 ('chas', 'string'),
 ('nox', 'string'),
 ('rm', 'string'),
 ('age', 'string'),
 ('dis', 'string'),
 ('rad', 'string'),
 ('tax', 'string'),
 ('ptratio', 'string'),
 ('black', 'string'),
 ('lstat', 'string'),
 ('medv', 'string')]

In [None]:
df_test.dtypes

[('crim', 'string'),
 ('zn', 'string'),
 ('indus', 'string'),
 ('chas', 'string'),
 ('nox', 'string'),
 ('rm', 'string'),
 ('age', 'string'),
 ('dis', 'string'),
 ('rad', 'string'),
 ('tax', 'string'),
 ('ptratio', 'string'),
 ('black', 'string'),
 ('lstat', 'string')]

In [None]:
# Import all from `sql.types`
from pyspark.sql.types import *


**Step 8**: Function that converts the data types of the DataFrame columns

In [None]:
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

In [None]:
# Assign all column names, except for "chas" (categorical variable), to `columns`
columns_train = ['crim','zn','indus','nox','rm','age','dis','rad','tax','ptratio','black','lstat','medv']
columns_test = ['crim','zn','indus','nox','rm','age','dis','rad','tax','ptratio','black','lstat']

**Step 9**: Convert the data types of the above mentioned columns into a float type

In [None]:
from pyspark.sql.types import *
# Conver the 'df_train' and 'df_test' columns to `FloatType()`
df_train = convertColumn(df_train, columns_train, FloatType())
df_test = convertColumn(df_test, columns_test, FloatType())

**Step 10**: Confirm that the data type has been converted into float

In [None]:
df_train.dtypes

[('crim', 'float'),
 ('zn', 'float'),
 ('indus', 'float'),
 ('chas', 'string'),
 ('nox', 'float'),
 ('rm', 'float'),
 ('age', 'float'),
 ('dis', 'float'),
 ('rad', 'float'),
 ('tax', 'float'),
 ('ptratio', 'float'),
 ('black', 'float'),
 ('lstat', 'float'),
 ('medv', 'float')]

In [None]:
df_test.dtypes

[('crim', 'float'),
 ('zn', 'float'),
 ('indus', 'float'),
 ('chas', 'string'),
 ('nox', 'float'),
 ('rm', 'float'),
 ('age', 'float'),
 ('dis', 'float'),
 ('rad', 'float'),
 ('tax', 'float'),
 ('ptratio', 'float'),
 ('black', 'float'),
 ('lstat', 'float')]

In [None]:
# Print the schema of 'df_train'
df_train.printSchema()

root
 |-- crim: float (nullable = true)
 |-- zn: float (nullable = true)
 |-- indus: float (nullable = true)
 |-- chas: string (nullable = true)
 |-- nox: float (nullable = true)
 |-- rm: float (nullable = true)
 |-- age: float (nullable = true)
 |-- dis: float (nullable = true)
 |-- rad: float (nullable = true)
 |-- tax: float (nullable = true)
 |-- ptratio: float (nullable = true)
 |-- black: float (nullable = true)
 |-- lstat: float (nullable = true)
 |-- medv: float (nullable = true)



In [None]:
df_train.describe().show()

+-------+----------------+------------------+-----------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|            crim|                zn|            indus|               chas|                nox|                rm|               age|               dis|              rad|               tax|           ptratio|             black|             lstat|              medv|
+-------+----------------+------------------+-----------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|             404|               404|              404|                404|                404|               404|               404|               404|              404|

**Step 11**: Processing the data

In [None]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Create a "medv_fake" column for test data. This column will support Dense Vector creation for test data.
df_test = df_test.withColumn("medv_fake", col("age")*0)

# Show the first 2 lines of `df_test`
df_test.take(2)

[Row(crim=0.07885999977588654, zn=80.0, indus=4.949999809265137, chas='0.0', nox=0.41100001335144043, rm=7.1479997634887695, age=27.700000762939453, dis=5.116700172424316, rad=4.0, tax=245.0, ptratio=19.200000762939453, black=396.8999938964844, lstat=3.559999942779541, medv_fake=0.0),
 Row(crim=0.08872999995946884, zn=21.0, indus=5.639999866485596, chas='0.0', nox=0.4390000104904175, rm=5.9629998207092285, age=45.70000076293945, dis=6.814700126647949, rad=4.0, tax=243.0, ptratio=16.799999237060547, black=395.55999755859375, lstat=13.449999809265137, medv_fake=0.0)]

In [None]:
# Re-order and select columns
df_train = df_train.select('medv','crim','zn','indus','nox','rm','age','dis','rad','tax','ptratio','black','lstat')
df_test = df_test.select('medv_fake','crim','zn','indus','nox','rm','age','dis','rad','tax','ptratio','black','lstat')

**Step 12**: Specifying the label and the features - the label in this case is the dependent variable i.e. **medv**

In [None]:
# Import `DenseVector`
# A Dense Vector is used to store arrays of values for use in PySpark.
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data_train = df_train.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
input_data_test = df_test.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace 'df_train' and 'df_test' with the new DataFrames
df_train = spark_session.createDataFrame(input_data_train,["label", "features"])
df_test = spark_session.createDataFrame(input_data_test,["label_fake", "features"])

label = df_train.rdd.map(lambda x: x.label)
features = df_train.rdd.map(lambda x: x.features)
df_train.show()
df_test.show()

+------------------+--------------------+
|             label|            features|
+------------------+--------------------+
|21.700000762939453|[0.15875999629497...|
|19.600000381469727|[0.10328000038862...|
|20.299999237060547|[0.34940001368522...|
|15.399999618530273|[2.73396992683410...|
|              20.5|[0.04337000101804...|
|34.900001525878906|[0.08370000123977...|
|26.200000762939453|[0.19073000550270...|
|21.600000381469727|[0.26938000321388...|
|14.100000381469727|[10.0622997283935...|
|              17.0|[1.41384994983673...|
|10.399999618530273|[25.9405994415283...|
|23.299999237060547|[0.09251999855041...|
|              21.0|[1.00244998931884...|
|22.200000762939453|[0.11027000099420...|
| 8.699999809265137|[15.1772003173828...|
|23.700000762939453|[5.70817995071411...|
|22.200000762939453|[0.10289999842643...|
|              12.0|[15.0234003067016...|
|              21.5|[1.65659999847412...|
|23.299999237060547|[1.42501997947692...|
+------------------+--------------

**Step 13**: Scaling the features using 'StandardScaler' - standardizes a feature of the model by subtracting the mean and then scaling to unit variance. Unit variance means dividing all the values by the standard deviation.

In [None]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df_train.select('features'))

# Transform the data in 'df_train' and 'df_test' with the scaler
scaled_df_train = scaler.transform(df_train)
scaled_df_test = scaler.transform(df_test)

# Inspect the result
scaled_df_train.take(2)

[Row(label=21.700000762939453, features=DenseVector([0.1588, 0.0, 10.81, 0.413, 5.961, 17.5, 5.2873, 4.0, 305.0, 19.2, 376.94, 9.88]), features_scaled=DenseVector([0.0178, 0.0, 1.5862, 3.5203, 8.8203, 0.6235, 2.5059, 0.4528, 1.7829, 8.929, 3.9892, 1.4267])),
 Row(label=19.600000381469727, features=DenseVector([0.1033, 25.0, 5.13, 0.453, 5.927, 47.2, 6.932, 8.0, 284.0, 19.7, 396.9, 9.22]), features_scaled=DenseVector([0.0115, 1.1336, 0.7528, 3.8612, 8.77, 1.6817, 3.2854, 0.9055, 1.6601, 9.1615, 4.2005, 1.3314]))]

In [None]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(featuresCol = 'features_scaled', labelCol="label", maxIter=100, regParam=0.3, elasticNetParam=0.8)


In [None]:
scaled_df_train.show()

+------------------+--------------------+--------------------+
|             label|            features|     features_scaled|
+------------------+--------------------+--------------------+
|21.700000762939453|[0.15875999629497...|[0.01775060126223...|
|19.600000381469727|[0.10328000038862...|[0.01154750660144...|
|20.299999237060547|[0.34940001368522...|[0.03906563661302...|
|15.399999618530273|[2.73396992683410...|[0.30567908268275...|
|              20.5|[0.04337000101804...|[0.00484910312912...|
|34.900001525878906|[0.08370000123977...|[0.00935831054628...|
|26.200000762939453|[0.19073000550270...|[0.02132509672103...|
|21.600000381469727|[0.26938000321388...|[0.03011877763075...|
|14.100000381469727|[10.0622997283935...|[1.12504330075642...|
|              17.0|[1.41384994983673...|[0.15807941099689...|
|10.399999618530273|[25.9405994415283...|[2.90036059420350...|
|23.299999237060547|[0.09251999855041...|[0.01034445478317...|
|              21.0|[1.00244998931884...|[0.11208169854

In [None]:
scaled_df_test.show()

+----------+--------------------+--------------------+
|label_fake|            features|     features_scaled|
+----------+--------------------+--------------------+
|       0.0|[0.07885999977588...|[0.00881716077241...|
|       0.0|[0.08872999995946...|[0.00992070348976...|
|       0.0|[1.38798999786376...|[0.15518806741635...|
|       0.0|[0.30346998572349...|[0.03393030257839...|
|       0.0|[0.22926999628543...|[0.02563416717328...|
|       0.0|[0.06465999782085...|[0.00722949020987...|
|       0.0|[0.13117000460624...|[0.01466582579785...|
|       0.0|[1.23247003555297...|[0.13779972713089...|
|       0.0|[0.43571001291275...|[0.04871576521585...|
|       0.0|[0.08186999708414...|[0.00915370186127...|
|       0.0|[0.52013999223709...|[0.05815569298443...|
|       0.0|[24.8017005920410...|[2.77302285278845...|
|       0.0|[1.25179004669189...|[0.13995985450625...|
|       0.0|[0.54010999202728...|[0.06038849414187...|
|       0.0|[11.9511003494262...|[1.33622588749265...|
|       0.

In [None]:
# Fit the data to the model
linearModel = lr.fit(scaled_df_train)

**Step 14**: Make the predictions

In [None]:
# Make predictions on test data
predicted_test = linearModel.transform(scaled_df_test)

In [None]:
predicted_test.select('features', 'prediction').show(20)

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[0.07885999977588...| 30.94251988528655|
|[0.08872999995946...| 21.55932152726045|
|[1.38798999786376...|10.537025559884444|
|[0.30346998572349...|24.831114673068654|
|[0.22926999628543...| 18.77069896676209|
|[0.06465999782085...| 28.59375693988012|
|[0.13117000460624...|20.403357752291278|
|[1.23247003555297...|16.981480439722297|
|[0.43571001291275...|13.706084775092394|
|[0.08186999708414...| 35.43455078363393|
|[0.52013999223709...|  40.1643794170824|
|[24.8017005920410...|11.982263099779866|
|[1.25179004669189...|13.284864416938635|
|[0.54010999202728...| 33.20809195953153|
|[11.9511003494262...| 17.82101771173071|
|[12.8023004531860...| 11.48820141407007|
|[0.04293999820947...|24.672908655334496|
|[0.17170999944210...| 18.43592511996215|
|[0.03960999846458...|21.829903478439814|
|[11.1080999374389...| 3.736057245186565|
+--------------------+------------

In [None]:
# Retrieve the predictions and the "known" labels.
#Since the test data comes without "known" labels, we will show the predictions and "known" labels of the training data.
predicted_train = linearModel.transform(scaled_df_train)
predictions = predicted_train.select("prediction").rdd.map(lambda x: x[0])
labels = predicted_train.select("label").rdd.map(lambda x: x[0])

In [None]:
# Combine the predictions and the label
predictionAndLabel = predictions.zip(labels).collect()

**Step 15**: Output the predictions and the associated labels

In [None]:
# Print out first 15 instances of `predictionAndLabel`
predictionAndLabel[:15]

[(22.188195479993183, 21.700000762939453),
 (20.971198514672466, 19.600000381469727),
 (23.29960416057812, 20.299999237060547),
 (17.027312202324573, 15.399999618530273),
 (24.254350726806027, 20.5),
 (33.394339929220095, 34.900001525878906),
 (25.839337014625237, 26.200000762939453),
 (25.532812578386995, 21.600000381469727),
 (17.937967030928146, 14.100000381469727),
 (22.325655239361993, 17.0),
 (6.66441033550244, 10.399999618530273),
 (27.897723148694926, 23.299999237060547),
 (22.48868735560656, 21.0),
 (24.383802795837347, 22.200000762939453),
 (9.835835943899497, 8.699999809265137)]

**Step 16**: Evaluating the model

**RMSE**: RMSE measures the differences between predicted values by the model and the actual values.The smaller the RMSE value is, the closer predicted and actual values are.

In [None]:
linearModel.summary.rootMeanSquaredError

4.605529599147375

**R-Squared** known as "Co-efficient of determination" illustrates the extent of the variability in the "Medv" that can be explained by the Linear Regression model. The higher the R-squared, the better the model fits the underlying data.

In [None]:
linearModel.summary.r2

0.7277148668145534

72.8% of the variability in the "Medv" is explained by the Linear Regression model. Since the RMSE and the R_Squared are quiet good, therefore, it can be concluded that the model generated is good in predicting the median value of houses in the surburbs of Boston.