## California Housing data analysis

This document explores California Housing data, and builds a predictive model by following the steps listed at https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning

The data set has been downloaded, unzipped and saved at the current working directory. Two files were contained in the unzipped folder:

* cal_housing.data
* cal_housing.domain

The *cal_housing.data* contains the actual data and *cal_housing.domain* contains the column names and the data types of the columns.

## Importing all the required libraries

In [1]:
from pyspark import SparkContext, SparkConf

## Creating Spark context

In [2]:
conf = SparkConf()
sc = SparkContext(conf=conf)

## Reading & exploring the data

Note that the two files cal_housing.data and cal_housing.domain must be present in the current working directory (where this ipython note book is being executed)

In [3]:
rdd = sc.textFile("cal_housing.data")
header = sc.textFile("cal_housing.domain")

The collect() method will get all the data from RDD into the driver machine. Be careful before you issue this command, as the driver machine might not have the RAM and/or Disk Space to handle the materialized RDD. 

Let us execute the collect() on the header RDD, which contains just the columns information.

In [4]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

Let us display 2 rows from rdd (data from cal_housing.data file)

In [5]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

Let us split the data into separate fields using "," as the delimiter, and display the top 2 rows again.

In [6]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

# Inspect the first 2 lines 
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

Let us convert the RDD to a DataFrame, so that we can apply the functions applicable to Pandas DataFrame for easier data exploration.

In [7]:
# SQLContext or HiveContext in Spark 1.x
from pyspark.sql import SparkSession
#from pyspark import SparkContext

#sc = SparkContext()

#rdd = sc.parallelize([("a", 1)])
hasattr(rdd, "toDF")
## False

spark = SparkSession(sc)

# Import findspark 
#import findspark

# Initialize and provide path
#findspark.init()

# Or use this alternative
#findspark.init()

# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

Display some data from the data frame

In [8]:
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

The display looks very clean. Let us get the data types of the columns in the data frame.

In [9]:
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



All the data was considered as string. But all the columns should be numeric type. Let us change the data types of the columns to float type.

In [10]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

Let us make sure that the data types are correct.

In [11]:
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



We can see that all the columns were set to float type.

We can use _df.select()_ method to get the desired columns from the data frame. Let us display 10 rows from the data frame, by selecting _population_ and _totalBedrooms_ columns.

In [12]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



We can also execute some complex statements by grouping by column(s), and ordering the results as per one or more columns in ascending/descending order.

In the below command, we obtained the counts of the median house age (_housingMedianAge_), and ordered the data by _housingMedianAge_, in descending order.

In [13]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



We can use _describe()_ to get the summary details of all the columns of the data frame.

In [14]:
df.describe().show()

+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|        households|  housingMedianAge|          latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|             20640|             20640|             20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean| 499.5396802325581|28.639486434108527| 35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.32975283161136|12.585557612111613|2.1359523806029554| 2.0035317429328914|115395.61

We can see that various columns have different ranges. So we need to standardize the data. First we will express the target variable _medianHouseValue_ in the uints of \$100000.

In [15]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

We can see that the _medianHouseValue_ is expressed in the units of \$100K.

## Feature Engineering

Let us add the following new columns:

* Rooms per household which refers to the number of rooms in households per block group;

* Population per household, which basically gives you an indication of how many people live in households per block group; 

* Bedrooms per room which will give you an idea about how many rooms are bedrooms per block group;

In [17]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

You see that, for the first row, there are about 6.98 rooms per household, the households in the block group consist of about 2.5 people and the amount of bedrooms is quite low with 0.14

Let us reorder the columns by making the target variable (_medianHouseValue_) as the first column.

In [18]:
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

Let us separate the target variable (_medianHouseValue_) and other variables using the DenseVector. The DenseVector will combine all the features into a matrix called features, and the target variable as label.

In [19]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

Let us display initial 2 rows of the transformed data frame.

In [21]:
df.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
+-----+--------------------+
only showing top 2 rows



We can see that the data is separated as _label_ and _features_. Now we can address the dependent (or target) column and independent (or features) separately. Let us scale the features using standard scaler.

In [22]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

We can see that a new dense vector called features_scaled has been added, and the values of this dense vector are scaled values of features dense vector.

## Building a Machine Learning model

Now that we scaled the input features, we can build a machine learning model to predict the target variable (_medianHouseValue_). First we will split the data into 80:20 (training:test).

In [23]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

**Fitting a Linear Regressor**

Let us fit a linear regression model on the training data

In [39]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

We can use the fitted model to predict the data using the test data. The _transform()_ method will help us to make prediction.

In [40]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.4491508524918457, 0.14999),
 (1.5705029404692372, 0.14999),
 (2.148727956912464, 0.14999),
 (1.5831547768979277, 0.344),
 (1.5182107797955968, 0.398)]

The following code block will get the intercept and coefficients of the linear model.

In [41]:
# Coefficients for the model
print(linearModel.coefficients)

# Intercept for the model
print(linearModel.intercept)

[0.0,0.0,0.0,0.276239709215,0.0,0.0,0.0]
0.9903995774620005


Using _summary_ attribute, we can get the _RMSE (Root Mean Squared Error)_ and $R^2$.

In [30]:
# Get the RMSE
print(linearModel.summary.rootMeanSquaredError)

# Get the R2
print(linearModel.summary.r2)

0.8692118678997669
0.4240895287218379


The $R^2$ is not great. Let us get the RMSE for the test data. We will define a function to get this value.

In [44]:
def calculate_rmse(predictionAndLabel):
    import numpy as np
    temp = 0
    for (p, a) in predictionAndLabel:
        temp = temp + (p-a)**2
    print(np.sqrt(temp/float(len(predictionAndLabel))))

In [45]:
#Get the test RMSE
calculate_rmse(predictionAndLabel)

0.919952799767


**Fitting a Random Forest Regressor**

Let us check if our test RMSE gets better (reduced), if we fit a RandomForest Regressor.

In [49]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features_scaled")
rf_model = rf.fit(train_data)

In [50]:
# Generate predictions
predicted = rf_model.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.2943833502895121, 0.14999),
 (1.3758379828577711, 0.14999),
 (2.182857573628307, 0.14999),
 (1.4519958256357335, 0.344),
 (1.2533288567243193, 0.398)]

In [51]:
#Get the test RMSE of RF Regressor
calculate_rmse(predictionAndLabel)

0.752201467145


Compared to Linear regressor, the Random Forest Regressor has obtained a better RMSE. 

This concludes the model development for the California Housing data set. 
Let us terminate the spark session.

In [52]:
spark.stop()