# Getting Started with Spark: California Housing Prices

This notebook consists mainly of the example code provided by <a href="">this</a> DataCamp tutorial, complemented by some personal notes and a few extra code tweaks on evaluation metrics at the end. This notebook serves as a simple exercise in order to help me get used to using Spark for my own Machine Learning projects. 

## Spark Load and Setup

First of all, we must load Spark through using the "findspark" tool.

In [106]:
# Import findspark
import findspark

# Initialize and provide path
findspark.init("/usr/local/spark")

Now, let's configure Spark by building our own SparkSession

In [107]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
sc = spark.sparkContext

## Data Load and Setup

Now that Spark is all ready to go, we must load and set up our data. This exercise uses the classic California Housing Prices dataset.

In [108]:
rdd = sc.textFile("/home/raven/code/cetax/spark/CaliforniaHousing/cal_housing.data")
header = sc.textFile("/home/raven/code/cetax/spark/CaliforniaHousing/cal_housing.domain")

First, let's take a look at the attributes in the header file by using the "collect()" method, which brings the entire RDD to a single machine. 

In [109]:
header.collect()

[u'longitude: continuous.',
 u'latitude: continuous.',
 u'housingMedianAge: continuous. ',
 u'totalRooms: continuous. ',
 u'totalBedrooms: continuous. ',
 u'population: continuous. ',
 u'households: continuous. ',
 u'medianIncome: continuous. ',
 u'medianHouseValue: continuous. ']

Now, we must look at our RDD, but "collect()" must not be called when the resulting set is too large because it can use up all of your machine's memory. "take()" would be a safer choice, since it shows only a sample of the total data. 

In [110]:
rdd.take(2)

[u'-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 u'-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

Clearly, the values were read as a single string, and we must separate them in order to work with the data.

In [111]:
rdd = rdd.map(lambda line: line.split(","))
rdd.take(2)

[[u'-122.230000',
  u'37.880000',
  u'41.000000',
  u'880.000000',
  u'129.000000',
  u'322.000000',
  u'126.000000',
  u'8.325200',
  u'452600.000000'],
 [u'-122.220000',
  u'37.860000',
  u'21.000000',
  u'7099.000000',
  u'1106.000000',
  u'2401.000000',
  u'1138.000000',
  u'8.301400',
  u'358500.000000']]

Perfect! As seen above, our values were properly separated. Now, considering that RDDs are more appropriate for low-level unstructured data transformation and DataFrames are better for uses that require higher level query expressions, we will now convert our data from an RDD to a DataFrame. Also, DataFrames are optimized to bring better performance.

In [112]:
from pyspark.sql import Row
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

Let's now take a look at our newly created DataFrame:

In [113]:
df.show()
df.printSchema()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

The DataFrame seems to be in order, but the Schema shows that me must correct the inferred data types. 

In [114]:
from pyspark.sql.types import *

def convertColumn(df, names, newType): # converts column in given DataFrame to a give data type
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# all of the columns in this dataset
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

df = convertColumn(df, columns, FloatType()) # converts them all to flaot
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



Done! Now that the data has been loaded and set up, let's start playing around with our dataset. 

## Exploring the Data

In order to better understand the SQL queries that can be used with Spark, we'll start with some simple examples: 

In [115]:
df.select('population','totalBedRooms').show(10) # show 10 rows (entries) from the selected categories

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [116]:
# return count of all possible values for housingMedianAge, sorting from highest to lowest hMA
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



In [117]:
df.describe().show() # returns basic dataset stats

+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|         latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|            20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean|499.5396802325581|28.639486434108527|35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.3297528316098| 12.58555761211163|2.135952380602968|  2.003531742932898|115395.61587441359|1.

As seen above, the "describe()" method provides an easy way to use basic statistics in order to analyze our dataset, and we can see that our features have a very wide range of values (high difference between min and max), so we must normalize them before getting to work. We can also note that the target variable (medianHouseValue) is rather large, so it would probably be best to scale it so the model can work properly. 

In [118]:
from pyspark.sql.functions import *
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)
df.select("medianHouseValue").show(10)

+----------------+
|medianHouseValue|
+----------------+
|           4.526|
|           3.585|
|           3.521|
|           3.413|
|           3.422|
|           2.697|
|           2.992|
|           2.414|
|           2.267|
|           2.611|
+----------------+
only showing top 10 rows



As seen above, all of our target values were divided by 100000, allowing us to work with much smaller, more manageable numbers. 

Before normalizing the data, let's do a little bit of feature engineering by adding three new variables to our dataset: Rooms per Household, Population per Household and Bedrooms per Room. 

In [119]:
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
        .withColumn("populationPerHousehold", col("population")/col("households")) \
        .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

Now that all of our variables have been created and added to our DataFrame, we must choose which features we will be using in our model. The DataCamp tutorial suggests leaving out longitude, latitude, houseMedianAge and totalRooms. Note that our target variable is put first. 

In [120]:
df = df.select("medianHouseValue",  
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

Lett's separate features and target so we can perform the standardization:

In [121]:
from pyspark.ml.linalg import DenseVector
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:]))) # separates the first column from the rest
df = spark.createDataFrame(input_data, ["target", "features"]) # assigns labels to target and features

Finally, we standardize the features. 

In [122]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(target=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(target=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

There! Our values seem to be in the same range now. Finally, let's get to the fun part: machine learning. 

## Building a Machine Learning Model

As usual in all Machine Learning projects, the first thing we must do is split our data into two sets: one for training and one for testing. 

In [123]:
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234) # 80% train 20% test

Now, we set up a linear regression model and train it using the training set.

In [124]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol="target", maxIter=10,
                      regParam=0.3, elasticNetParam=0.8)
linearModel = lr.fit(train_data)

Now that the model has been trained, let's first evaluate it on the training data itself:

In [125]:
from pyspark.mllib.evaluation import RegressionMetrics
def evaluate(data):
    predicted = linearModel.transform(data)
    predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
    targets = predicted.select("target").rdd.map(lambda x: x[0])
    predictionAndLabel = sc.parallelize(predictions.zip(targets).collect())
    metrics = RegressionMetrics(predictionAndLabel)
    print("RMSE = %s" % metrics.rootMeanSquaredError)
    print("R-squared = %s" % metrics.r2)
    
evaluate(train_data)
evaluate(test_data)

RMSE = 0.876533568446
R-squared = 0.422822277559
RMSE = 0.881817830311
R-squared = 0.416546960942


Clearly, the model is not performing that well, but it serves well as a learning example of basic Machine Learning in Spark. Let's now finish the exercise:

In [126]:
spark.stop()