#### Data Description

- This tutorial makes use of the California Housing data set. It appeared in a 1997 paper titled Sparse Spatial Autoregressions, written by Pace, R. Kelley and Ronald Barry and published in the Statistics and Probability Letters journal. The researchers built this data set by using the 1990 California census data.

- The data contains one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people). In this sample a block group on average includes 1425.5 individuals living in a geographically compact area. 

These spatial data contain 20,640 observations on housing prices with 9 economic variables:

* **Longitude** refers to the angular distance of a geographic place north or south of the earth’s equator for each block group;
* **Latitude** refers to the angular distance of a geographic place east or west of the earth’s equator for each block group;
* **Housing median age** is the median age of the people that belong to a block group. 
* **Total rooms** is the total number of rooms in the houses per block group;
* **Total bedrooms** is the total number of bedrooms in the houses per block group;
* **Population** is the number of inhabitants of a block group;
* **Households** refers to units of houses and their occupants per block group;
* **Median income** is used to register the median income of people that belong to a block group; 
* **Median house value** is the dependent variable and refers to the median house value per block group.

**The Median house value** is the dependent variable and will be assigned the role of the **target variable** in your ML model.

#### Data Loading

In [3]:
# import necessary libs
import numpy  as np
import pandas as pd

# general spark modules
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.sql.functions import pandas_udf, PandasUDFType #https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

# spark ml modules 
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import StandardScaler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer

In [4]:
# just execute it once
ACCESS_KEY = ""
SECRET_KEY = ""
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = ""
MOUNT_NAME = ""

# only execute this line once
try: 
  dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)
except:
  pass

In [5]:
# Load in the data
rdd = sc.textFile("/mnt/%s/cal_housing.data" % MOUNT_NAME)

# Load in the header
header = sc.textFile("/mnt/%s/cal_housing.domain" % MOUNT_NAME)

# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

# Inspect the first 2 lines 
rdd.first()

In [6]:
# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [7]:
# Show the top 20 rows 
df.show(5)

In [8]:
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [9]:
df.printSchema()

In [10]:
# lets calculate some basic statistics about data
df.select('households', 'housingMedianAge', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms').describe().show()

#### Data Preprocessing

In [12]:
# conversion of target variable to improve stability of algorithms
@pandas_udf('double', PandasUDFType.SCALAR)
def log1p(v):
      return np.log1p(v)

df = df.withColumn('medianHouseValueLog', log1p(df.medianHouseValue))

In [13]:
display(df)

In [14]:
display(df)

In [15]:
# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

In [16]:
# Re-order and select columns
df_for_ml = df.select("medianHouseValueLog", 
                      "totalBedRooms", 
                      "totalRooms",
                      "population", 
                      "households", 
                      "medianIncome", 
                      "roomsPerHousehold", 
                      "populationPerHousehold", 
                      "bedroomsPerRoom")

In [17]:
# Define the `input_data` 
input_data = df_for_ml.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df_for_ml = spark.createDataFrame(input_data, ["label", "features"])

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df_for_ml)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df_for_ml)

# Inspect the result
scaled_df = scaled_df.select('label', col('features_scaled').alias("features"))

In [18]:
scaled_df.rdd.take(10)

#### Building A Machine Learning Models With Spark ML

* Linear Regression : https://en.wikipedia.org/wiki/Linear_regression, https://www.youtube.com/watch?v=zPG4NjIkCjc
* Decision trees    : http://www.r2d3.us/visual-intro-to-machine-learning-part-1/  (nice visualization for decision tree)
* Random forest     : https://en.wikipedia.org/wiki/Random_forest
* GB Decision Trees : https://en.wikipedia.org/wiki/Gradient_boosting
* Clustering        : https://spark.apache.org/docs/2.3.0/ml-clustering.html, https://www.datascience.com/blog/k-means-clustering
* Cross-validation  : https://en.wikipedia.org/wiki/Cross-validation_(statistics)
* Collaborative filtering: https://spark.apache.org/docs/2.3.0/mllib-collaborative-filtering.html, https://bugra.github.io/work/notes/2014-04-19/alternating-least-squares-method-for-collaborative-filtering/
* Metrics:
  * RMSE: https://en.wikipedia.org/wiki/Root-mean-square_deviation
  * R2:   https://en.wikipedia.org/wiki/Coefficient_of_determination

In [20]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([0.8,  0.2],  seed=1234)

##### Linear Regression

In [22]:
# Initializing of Linear Regression
lr = LinearRegression(labelCol="label", maxIter=10000, regParam=0.2, elasticNetParam=0.5)

# Fit the data to the model
linearModel = lr.fit(train_data)

# Summarize the model over the training set and print out some metrics
trainingSummary = linearModel.summary
print("RMSE train: %f" % trainingSummary.rootMeanSquaredError)
print("r2   train: %f" % trainingSummary.r2)

# Generate predictions
predicted = linearModel.transform(test_data)

# Select (prediction, true label) and compute test error
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2   = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predicted)
r2   = evaluator_r2.evaluate(predicted)

print("\nRMSE test: %f" % rmse)
print("r2   test: %f" % r2)

##### Linear Regression with Cross-Validation

In [24]:
# Initializinf of Linear Regression
lr = LinearRegression(labelCol="label")

# let's set desired parameters
paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam,        [0.2, 0.3, 0.5,  0.7])\
.addGrid(lr.elasticNetParam, [0.2, 0.5,  0.7, 0.8])\
.addGrid(lr.maxIter,         [100, 1000, 5000, 10000])\
.build()

# cross-validation settings
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=3,
                          seed=2018
                         )  

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train_data)

# Summarize the model over the training set and print out some metrics
trainingSummary = cvModel.bestModel.summary
print("RMSE train: %f" % trainingSummary.rootMeanSquaredError)
print("r2   train: %f" % trainingSummary.r2)

# Generate predictions
predicted = cvModel.transform(test_data)

# Select (prediction, true label) and compute test error
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2   = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predicted)
r2   = evaluator_r2.evaluate(predicted)

print("\nRMSE test: %f" % rmse)
print("r2   test: %f" % r2)

#### Decision-Tree with Cross-Validation

In [26]:
# Automatically identify categorical features, and index them.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(scaled_df)

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

paramGrid = ParamGridBuilder()\
    .addGrid(dt.maxDepth, [3,  10, 25])\
    .addGrid(dt.maxBins,  [10, 20, 32])\
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=3)  

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train_data)

# Generate predictions
predicted = cvModel.transform(test_data)

# Select (prediction, true label) and compute test error
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2   = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predicted)
r2   = evaluator_r2.evaluate(predicted)

print("\nRMSE test: %f" % rmse)
print("r2   test: %f" % r2)

In [27]:
tree = cvModel.bestModel.stages[1]

In [28]:
tree.featureImportances

#### Random forest

In [30]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(scaled_df)

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(train_data)

# Generate predictions
predicted = model.transform(test_data)

# Select (prediction, true label) and compute test error
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2   = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predicted)
r2   = evaluator_r2.evaluate(predicted)

print("\nRMSE test: %f" % rmse)
print("r2   test: %f" % r2)

In [31]:
# additonal outcome from trees is feature importances (can be used for feature selection)
rf_model = model.stages[1]
print(rf_model.featureImportances)

#### Gradient-boosted tree regression

In [33]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(scaled_df)

# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Train model.  This also runs the indexer.
model = pipeline.fit(train_data)

# Generate predictions
predicted = model.transform(test_data)

# Select (prediction, true label) and compute test error
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_r2   = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator_rmse.evaluate(predicted)
r2   = evaluator_r2.evaluate(predicted)

print("\nRMSE test: %f" % rmse)
print("r2   test: %f" % r2)

#### Clustering

In [35]:
# https://rsandstroem.github.io/sparkkmeans.html
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans().setK(3).setSeed(1)
model = kmeans.fit(scaled_df)

# Make predictions
predictions = model.transform(scaled_df)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [36]:
display(model, scaled_df)

In [37]:
display(model, scaled_df)

#### Collaborative filltering

In [39]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

# Load in the header
lines = spark.read.text("/mnt/%s/sample_movielens_ratings.txt" % MOUNT_NAME).rdd
parts = lines.map(lambda row: row.value.split("::"))

ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), 
                                     movieId=int(p[1]),
                                     rating=float(p[2]),
                                     timestamp=long(p[3])))

ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=10,
          regParam=0.01,
          userCol="userId",
          itemCol="movieId",
          ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [40]:
# let's take a look for some user recomendations
userSubsetRecs.show()