In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
#importing Boston Housing Price dataset
from sklearn.datasets import load_boston

In [3]:
boston= load_boston()

In [4]:
## Lets prepare the dataframe
boston_df= pd.DataFrame(boston.data, columns = boston.feature_names)

# adding Target variable Price in the dataset
boston_df['Price']= boston.target

boston_df.head(5)

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,Price
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98,24.0
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14,21.6
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03,34.7
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94,33.4
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33,36.2


In [5]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('BostonHousing Prediction').getOrCreate()

In [6]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [7]:
# This line will create a spark dataframe from our pandas dataframe
spark_df = spark.createDataFrame(boston_df)

In [8]:
spark_df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- Price: double (nullable = true)



In [9]:
spark_df.columns

['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PTRATIO',
 'B',
 'LSTAT',
 'Price']

In [10]:
spark_df.show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|Price|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0|   18.7|394.12| 5.21| 28.7|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0|   15.2| 395.6|12.43| 22.9|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0|   15.2| 396.9|19.15| 27.1|
|0.21124|12.5| 7.87| 0.0|0.524|5

In [11]:
spark_df.describe().show()

+-------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+
|summary|              CRIM|                ZN|             INDUS|               CHAS|                NOX|                RM|               AGE|               DIS|              RAD|              TAX|          PTRATIO|                B|            LSTAT|             Price|
+-------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+
|  count|               506|               506|               506|                506|                506|               506|               506|               506|              506|

In [12]:
from pyspark.ml.feature import VectorAssembler
# Creating an instance of the VectorAssembler
assembler = VectorAssembler(inputCols=['CRIM','ZN','INDUS','CHAS',
                                       'NOX','RM','AGE','DIS','RAD',
                                       'TAX','PTRATIO','B','LSTAT'],
                            outputCol='features')

In [13]:
# transforming our spark dataframe
df_out = assembler.transform(spark_df)
# Viewing the first 5 rows
df_out.show(5)

+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+--------------------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|  TAX|PTRATIO|     B|LSTAT|Price|            features|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+--------------------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575|65.2|  4.09|1.0|296.0|   15.3| 396.9| 4.98| 24.0|[0.00632,18.0,2.3...|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421|78.9|4.9671|2.0|242.0|   17.8| 396.9| 9.14| 21.6|[0.02731,0.0,7.07...|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185|61.1|4.9671|2.0|242.0|   17.8|392.83| 4.03| 34.7|[0.02729,0.0,7.07...|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998|45.8|6.0622|3.0|222.0|   18.7|394.63| 2.94| 33.4|[0.03237,0.0,2.18...|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147|54.2|6.0622|3.0|222.0|   18.7| 396.9| 5.33| 36.2|[0.06905,0.0,2.18...|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+------+-----+-----+--------------------+
only showing top 5 rows



In [14]:
# While we could say something like this...
clean_df = df_out.select(['features', 'Price'])
# # I want to rename the 'PRICE' column to 'label' as well
# clean_df = df_out.select(['features', col('Price').alias('label')])
clean_df.show(5)

+--------------------+-----+
|            features|Price|
+--------------------+-----+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
|[0.03237,0.0,2.18...| 33.4|
|[0.06905,0.0,2.18...| 36.2|
+--------------------+-----+
only showing top 5 rows



In [15]:
clean_df

DataFrame[features: vector, Price: double]

In [16]:
# Creating our train and test sets
train, test = clean_df.randomSplit([0.7, 0.3], seed=42)

In [17]:
from pyspark.ml.regression import LinearRegression

In [18]:
# creating an instance of a linear regression model
lr_model = LinearRegression(featuresCol='features',labelCol='Price')

In [19]:
# fitting the model to the train set
fit_model = lr_model.fit(train)

In [20]:
test_results = fit_model.evaluate(test)

In [21]:
test_results.r2

0.6144518369550813