In [15]:
import pyspark
import numpy as np
import pandas as pd

In [4]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.master('local').appName('myspark').config('spark.ui.port', '4050').getOrCreate()

In [8]:


df = spark.read.csv('/housing.csv', header = True , inferSchema = True)

In [9]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [10]:
from pyspark.sql.functions import monotonically_increasing_id

In [12]:
df = df.withColumn('id', monotonically_increasing_id())

df = df[['id'] + df.columns[:-1]]
df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4| 

In [13]:
df.count()

20640

In [18]:
# aggrigate function :
df.select('total_rooms').agg({'total_rooms': 'mean'}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



In [19]:
# mean value of each column

from pyspark.sql.functions import mean

In [20]:
df.select(*[mean(c) for c in df.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                null|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

In [21]:
df.columns

['id',
 'longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity']

In [23]:
#groupby ocean_proximity
df.groupby('ocean_proximity').agg({col : 'avg' for col in df.columns[3:-1] }).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|         ISLAND|             276.6|             668.0|              420.4|2.7444200000000003|            1574.6|               380440.0|                   42.4|
|     NEAR OCEAN|501.24454477050415|1354.0086531226486|  538.6156773211568| 4.005784800601957| 2583.700902934537|     249433.97742663656|     29.347253574115875|
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|     259212.31179039303|      37.73013100436681|
|      <1H OCEAN| 517.744964

In [27]:
# udf (User Defined Function)

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

def squared(value):
  return value*value

squared_udf = udf(squared, FloatType())
df.withColumn('total_squared_rooms', squared_udf('total_rooms')).show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_squared_rooms|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|           774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|          5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|          2152089.0|
|  3|  -122.25|   37.85|    

# Begining ML libs


# Prediction of median_house_value using features 'longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income'

In [48]:
train, test= df.randomSplit([0.7,0.3])

train , test

(DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string],
 DataFrame[id: bigint, longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string])

In [49]:
# getting numerical value columns

numerical_features_lst = train.columns
numerical_features_lst.remove('median_house_value') #because we are going to predict this column values
numerical_features_lst.remove('id')
numerical_features_lst.remove('ocean_proximity') # this column contains categorial values or string values

numerical_features_lst


['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [50]:
from pyspark.ml.feature import Imputer

In [54]:
imputer = Imputer(inputCols = numerical_features_lst,
                  outputCols= numerical_features_lst)


imputer = imputer.fit(train)
train = imputer.transform(train)
test= imputer.transform(test)
train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  4|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|          342200.0|       NEAR BAY|
|  5| 

In [56]:
# so how pyspark works in machine learning is , it doesn't like it being in multiple columns
# what we want is take all these columns in a single columns as a vector

In [57]:
from pyspark.ml.feature import VectorAssembler

In [58]:
# merging numerical feature list into a vector form

numerical_vector_assembler = VectorAssembler(inputCols= numerical_features_lst,
                                             outputCol= 'numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|    [-122.25,37.85,52...

In [59]:
train.select('numerical_feature_vector').take(5)

[Row(numerical_feature_vector=DenseVector([-122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0, 1138.0, 8.3014])),
 Row(numerical_feature_vector=DenseVector([-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 177.0, 7.2574])),
 Row(numerical_feature_vector=DenseVector([-122.25, 37.85, 52.0, 1274.0, 235.0, 558.0, 219.0, 5.6431])),
 Row(numerical_feature_vector=DenseVector([-122.25, 37.85, 52.0, 1627.0, 280.0, 565.0, 259.0, 3.8462])),
 Row(numerical_feature_vector=DenseVector([-122.25, 37.85, 52.0, 919.0, 213.0, 413.0, 193.0, 4.0368]))]

In [60]:
# applying some preprocessing techniques ,
# basically StandardScaler () which ensures that the features have the same scale, also the transfered features will have zero mean and unit variance


In [61]:
from pyspark.ml.feature import StandardScaler

In [63]:
scaler = StandardScaler(inputCol = 'numerical_feature_vector',
                         outputCol = 'scaled_numerical_feature_vector',
                         withStd = True, withMean= True)
scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3231687546323...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...|           [-1.3331533209390...|
|  3|

In [65]:
train.select('scaled_numerical_feature_vector').take(5)

[Row(scaled_numerical_feature_vector=DenseVector([-1.3232, 1.038, -0.6039, 2.0032, 1.3376, 0.8374, 1.647, 2.324])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3332, 1.0333, 1.8497, -0.5292, -0.8222, -0.8047, -0.8356, 1.7762])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3381, 1.0333, 1.8497, -0.616, -0.7161, -0.7513, -0.7271, 0.9292])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3381, 1.0333, 1.8497, -0.4572, -0.61, -0.7452, -0.6238, -0.0137])),
 Row(scaled_numerical_feature_vector=DenseVector([-1.3381, 1.0333, 1.8497, -0.7756, -0.768, -0.8763, -0.7943, 0.0863]))]

In [66]:
# Now dealing with categorical values (ocean_proximity)

In [67]:
from pyspark.ml.feature import StringIndexer

In [69]:
indexer = StringIndexer(inputCol = 'ocean_proximity',
                        outputCol = 'ocean_category_index')

indexer = indexer.fit(train)

train = indexer.transform(train)
test = indexer.transform(test)

train.show(10)

IllegalArgumentException: ignored

In [72]:
# ignore the eroor above , its nothing just I ran it twice

**Grabing all the stuffs from the distribution to a single node**

*machine learning algorithm particularly do not like it though*

In [73]:
set(train.select('ocean_category_index').collect())

{Row(ocean_category_index=0.0),
 Row(ocean_category_index=1.0),
 Row(ocean_category_index=2.0),
 Row(ocean_category_index=3.0),
 Row(ocean_category_index=4.0)}

In [74]:
# performing one hot encoding

In [75]:
from pyspark.ml.feature import OneHotEncoder

In [76]:
one_hot_encoder = OneHotEncoder(inputCol = 'ocean_category_index',
                                outputCol = 'ocean_category_one_hot')

one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

train.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3231687546323...|                 3.0|         (4,[3],[1.0])|
|  2|  -122.24|   37.85|    

In [78]:
# Combining One hot vector which is our categorical feature with numerical feature vector

In [79]:
assembler = VectorAssembler(inputCols = ['scaled_numerical_feature_vector',
                                         'ocean_category_one_hot'],
                            outputCol = 'final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)


In [80]:
train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3231687546323...|          

In [81]:
train.select('final_feature_vector').take(2)

[Row(final_feature_vector=DenseVector([-1.3232, 1.038, -0.6039, 2.0032, 1.3376, 0.8374, 1.647, 2.324, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.3332, 1.0333, 1.8497, -0.5292, -0.8222, -0.8047, -0.8356, 1.7762, 0.0, 0.0, 0.0, 1.0]))]

In [102]:
# -------------------------------------------Linear Regression ------------------------------------

In [83]:
from pyspark.ml.regression import LinearRegression

In [85]:
lr = LinearRegression(featuresCol = 'final_feature_vector',
                      labelCol = 'median_house_value')
lr

LinearRegression_ad4b2b651a55

In [87]:
# fitting the training data

In [86]:
lr = lr.fit(train)
lr

LinearRegressionModel: uid=LinearRegression_ad4b2b651a55, numFeatures=12

In [88]:
# prediction

In [90]:
pred_train_df = lr.transform(train).withColumnRenamed('prediction',
                                                   'Predicted_median_house_value')
pred_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|Predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          35850

In [95]:
# Above , we have our predicted value !!

In [91]:
pred_test_df = lr.transform(test).withColumnRenamed('prediction',
                                                   'Predicted_median_house_value')
pred_test_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|Predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          45260

In [96]:
# evaluationg the model

In [97]:
predictions_and_actuals = pred_test_df[['predicted_median_house_value',
                                        'median_house_value']]

#converting ino rdd
predictions_and_actuals_rdd = predictions_and_actuals.rdd


predictions_and_actuals_rdd.take(2)

[Row(predicted_median_house_value=408668.1939272259, median_house_value=452600.0),
 Row(predicted_median_house_value=230479.83710502833, median_house_value=213500.0)]

In [99]:
# creating tuples of prediction vs actual
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)
predictions_and_actuals_rdd.take(3)


[(408668.1939272259, 452600.0),
 (230479.83710502833, 213500.0),
 (185089.80587450537, 140000.0)]

In [100]:
# evaluatins :

from pyspark.mllib.evaluation import RegressionMetrics

In [101]:
metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''

Mean Squared Error:      {0}
Root Mean Squared Error: {1}
Mean Absolute Error:     {2}
R**2:                    {3}
'''.format(metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           metrics.r2
           )
print(s)





Mean Squared Error:      4773067562.954082
Root Mean Squared Error: 69087.39076672443
Mean Absolute Error:     50194.145956662
R**2:                    0.6410952029039387 

