# Regression using Spark ML Lib

## Reference for Spark:
https://spark.apache.org/docs/latest/ml-guide.html





In [1]:
# !pip install pyspark

In [None]:
# !pip install -q findspark

In [None]:
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [49]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [3]:
spark

In [4]:
df = spark.read.format("csv").load("/content/drive/MyDrive/DATA/housing.csv", header=True, inferSchema=True)

df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [5]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [6]:
df.columns[:-1]

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [7]:
# Adding an index

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn('id', monotonically_increasing_id()) # Adds a column called id which increases by 1 in every row as the right most column
#df.show(3)
df = df[['id'] + df.columns[:-1]]

df.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

In [8]:
df.count()

20640

In [9]:
df.select('total_rooms').agg({'total_rooms': 'median'}).show()

+-------------------+
|median(total_rooms)|
+-------------------+
|             2127.0|
+-------------------+



In [10]:
df.select('total_rooms').agg({'total_rooms': 'avg'}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [11]:
from pyspark.sql.functions import mean

df.select(*[mean(c) for c in df.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                NULL|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

In [12]:
df.groupby('ocean_proximity').agg({col:'avg' for col in df.columns[3:-1]}).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|         ISLAND|             276.6|             668.0|              420.4|2.7444200000000003|            1574.6|               380440.0|                   42.4|
|     NEAR OCEAN|501.24454477050415|1354.0086531226486|  538.6156773211568| 4.005784800601957| 2583.700902934537|     249433.97742663656|     29.347253574115875|
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|     259212.31179039303|      37.73013100436681|
|      <1H OCEAN| 517.744964

In [13]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def squared(value):
    return value * value

squared_udf = udf(squared, FloatType())

df.withColumn('total_rooms_squared', squared_udf('total_rooms')).show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_rooms_squared|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|           774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|          5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|          2152089.0|
|  3|  -122.25|   37.85|    

In [14]:
df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [15]:
# Creating a train test split

train, test = df.randomSplit([0.7, 0.3])
train, test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

In [17]:
# Creating a numerical column list to separate out the categorical column as the processing of categorical columns is different from numerical.

numerical_features_lst = train.columns
numerical_features_lst.remove('median_house_value')
numerical_features_lst.remove('id')
numerical_features_lst.remove('ocean_proximity')

numerical_features_lst

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [18]:
# Imputing missing values in the numerical columns.

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = numerical_features_lst,
                  outputCols = numerical_features_lst)

imputer = imputer.fit(train)
train = imputer.transform(train)
test = imputer.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

In [20]:
# Now spark does not understand rows and columns and hence we will pack the contents of each column into a vector and add it as a new column.

from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols = numerical_features_lst,
                                             outputCol = 'numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------

In [21]:
train.select('numerical_feature_vector').take(2)

[Row(numerical_feature_vector=DenseVector([-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 126.0, 8.3252])),
 Row(numerical_feature_vector=DenseVector([-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 177.0, 7.2574]))]

In [22]:
# Standardizing the numerical columns using standard scaler

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol = 'numerical_feature_vector',
                        outputCol = 'scaled_numerical_feature_vector',
                        withStd = True, withMean = True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)



In [23]:
train.select('scaled_numerical_feature_vector').take(3)

[Row(scaled_numerical_feature_vector=DenseVector([-1.3252, 1.0558, 0.9773, -0.8093, -0.9785, -0.9779, -0.9807, 2.3435])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3302, 1.0417, 1.853, -0.5387, -0.8326, -0.8234, -0.8468, 1.7807])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3352, 1.0417, 1.853, -0.6277, -0.7249, -0.7683, -0.7366, 0.93]))]

In [25]:
# Now working on categorical variables

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'ocean_proximity',
                        outputCol = 'ocean_category_index')

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3252311793352...|                 3.0|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          3521

In [26]:
train.select('ocean_category_index').take(3)

[Row(ocean_category_index=3.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=3.0)]

In [30]:
set(train.select('ocean_category_index').collect())

{Row(ocean_category_index=0.0),
 Row(ocean_category_index=1.0),
 Row(ocean_category_index=2.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=4.0)}

In [31]:
# One hot encoding the ocean_category_index

from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder(inputCol = 'ocean_category_index',
                                outputCol = 'ocean_category_one_hot')

one_hot_encoder = one_hot_encoder.fit(train)
train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)



In [32]:
set(train.select('ocean_category_one_hot').collect())

{Row(ocean_category_one_hot=SparseVector(4, {0: 1.0})),
 Row(ocean_category_one_hot=SparseVector(4, {1: 1.0})),
 Row(ocean_category_one_hot=SparseVector(4, {2: 1.0})),
 Row(ocean_category_one_hot=SparseVector(4, {3: 1.0})),
 Row(ocean_category_one_hot=SparseVector(4, {}))}

In [33]:
# Now we will combine the categorical and numerical columns into a single vector as a dataset for the regression model.

assembler = VectorAssembler(inputCols = ['scaled_numerical_feature_vector',
                                         'ocean_category_one_hot'],
                            outputCol = 'final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)

In [34]:
train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|    [-122.23,37.88,41...|           [-1.3252311793352...|          

In [35]:
train.select('final_feature_vector').take(2)

[Row(final_feature_vector=DenseVector([-1.3252, 1.0558, 0.9773, -0.8093, -0.9785, -0.9779, -0.9807, 2.3435, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.3302, 1.0417, 1.853, -0.5387, -0.8326, -0.8234, -0.8468, 1.7807, 0.0, 0.0, 0.0, 1.0]))]

In [36]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'final_feature_vector',
                      labelCol = 'median_house_value')

lr

LinearRegression_e7aedd27df27

In [37]:
lr = lr.fit(train)
lr

LinearRegressionModel: uid=LinearRegression_e7aedd27df27, numFeatures=12

In [39]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction',
                                                     'predicted_median_house_value')

pred_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          45260

In [40]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction',
                                                    'predicted_median_house_value')

pred_train_df.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          45260

In [42]:
# Creating a pandas dataframe from spark df to do any visualizations

pred_test_pd_df = pred_test_df.toPandas()

pred_test_pd_df.head(5)

Unnamed: 0,id,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,numerical_feature_vector,scaled_numerical_feature_vector,ocean_category_index,ocean_category_one_hot,final_feature_vector,predicted_median_house_value
0,1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY,"[-122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0,...","[-1.3202403800819447, 1.0463692385454966, -0.6...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3202403800819447, 1.0463692385454966, -0.6...",421754.498983
1,4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY,"[-122.25, 37.85, 52.0, 1627.0, 280.0, 565.0, 2...","[-1.3352127778417888, 1.0416773668902513, 1.85...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3352127778417888, 1.0416773668902513, 1.85...",253384.884804
2,8,-122.26,37.84,42.0,2555.0,665.0,1206.0,595.0,2.0804,226700.0,NEAR BAY,"[-122.26, 37.84, 42.0, 2555.0, 665.0, 1206.0, ...","[-1.3402035770950727, 1.0369854952350057, 1.05...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3402035770950727, 1.0369854952350057, 1.05...",198998.354432
3,12,-122.26,37.85,52.0,2491.0,474.0,1098.0,468.0,3.075,213500.0,NEAR BAY,"[-122.26, 37.85, 52.0, 2491.0, 474.0, 1098.0, ...","[-1.3402035770950727, 1.0416773668902513, 1.85...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3402035770950727, 1.0416773668902513, 1.85...",228564.217657
4,14,-122.26,37.85,52.0,2643.0,626.0,1212.0,620.0,1.9167,159200.0,NEAR BAY,"[-122.26, 37.85, 52.0, 2643.0, 626.0, 1212.0, ...","[-1.3402035770950727, 1.0416773668902513, 1.85...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.3402035770950727, 1.0416773668902513, 1.85...",200714.970978


In [43]:
# To obtain the performance metrics using spark rdd we need a column with tuples of predicted values and actual values

predictions_and_actuals = pred_test_df[['predicted_median_house_value',
                                        'median_house_value']]

predictions_and_actuals_rdd = predictions_and_actuals.rdd

predictions_and_actuals_rdd.take(2)



[Row(predicted_median_house_value=421754.49898345233, median_house_value=358500.0),
 Row(predicted_median_house_value=253384.88480382168, median_house_value=342200.0)]

In [46]:
# predictions_and_actuals_rdd.show(3)

'''---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-44-05ab3d677d55> in <cell line: 1>()
----> 1 predictions_and_actuals_rdd.show(3)

AttributeError: 'RDD' object has no attribute 'show'''

"---------------------------------------------------------------------------\nAttributeError                            Traceback (most recent call last)\n<ipython-input-44-05ab3d677d55> in <cell line: 1>()\n----> 1 predictions_and_actuals_rdd.show(3)\n\nAttributeError: 'RDD' object has no attribute 'show"

In [47]:
# Creating tuple of actual value and predicted value for each datapoint/row
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)

predictions_and_actuals_rdd.take(2)

[(421754.49898345233, 358500.0), (253384.88480382168, 342200.0)]

In [48]:
# Obtain regression metrics

from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
Mean Squared Error:      {0}
Root Mean Squared Error: {1}
Mean Absolute Error:     {2}
R**2:                    {3}
'''.format(metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           metrics.r2
           )

print(s)




Mean Squared Error:      4806670540.636229
Root Mean Squared Error: 69330.15606960819
Mean Absolute Error:     49812.25967238698
R**2:                    0.646457269228458

