## Machine learning example with Spark Dataframe and ML (not MLib)
This is a modification of an example I found online [here](http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/).  The example used the MLib library for machine learning applications. I wanted an example that relied primarily on the Dataframe object and not RDD - supposedly better performance - and the ML library instead of the MLib. It is my understanding that ML is recommended for dataframes.

This notebook has 2 sections: Importing and munging data and fitting the model.

In [1]:
# Packages to use
# Note in batch mode, a Spark context and Sql Context will need to be imported and setup

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

### Importing and preparing the data
I did minimal productive work here other than importing the data.  I did try some basic column operations here, but eventually I concluded that you should try to do as much of that type of work outside of Spark.  Spark has a lot of the functionality that you will find in an RDBMS, but I find it difficult to use and inconsistent in the results. My recommendation: do as much as you can in SQL before you get it into Spark for machine learning.

In [2]:
df = spark.read.csv('/Users/tim/pyspark/data/Sacramentorealestatetransactions.csv', header = True, inferSchema = True)

In [3]:
df.printSchema()

root
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- beds: integer (nullable = true)
 |-- baths: integer (nullable = true)
 |-- sq__ft: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- sale_date: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [4]:
# The number of rows
df.count()

985

In [5]:
# select specific fields
df_model = df.select('street', 'sale_date', 'price', 'baths', 'beds', 'sq__ft')
df_model.show(5)

+----------------+--------------------+-----+-----+----+------+
|          street|           sale_date|price|baths|beds|sq__ft|
+----------------+--------------------+-----+-----+----+------+
|    3526 HIGH ST|Wed May 21 00:00:...|59222|    1|   2|   836|
|     51 OMAHA CT|Wed May 21 00:00:...|68212|    1|   3|  1167|
|  2796 BRANCH ST|Wed May 21 00:00:...|68880|    1|   2|   796|
|2805 JANETTE WAY|Wed May 21 00:00:...|69307|    1|   2|   852|
| 6001 MCMAHON DR|Wed May 21 00:00:...|81900|    1|   2|   797|
+----------------+--------------------+-----+-----+----+------+
only showing top 5 rows



In [6]:
type(df_model)

pyspark.sql.dataframe.DataFrame

In [7]:
# The less pretty output
df_model.collect()[0:5]

[Row(street=u'3526 HIGH ST', sale_date=u'Wed May 21 00:00:00 EDT 2008', price=59222, baths=1, beds=2, sq__ft=836),
 Row(street=u'51 OMAHA CT', sale_date=u'Wed May 21 00:00:00 EDT 2008', price=68212, baths=1, beds=3, sq__ft=1167),
 Row(street=u'2796 BRANCH ST', sale_date=u'Wed May 21 00:00:00 EDT 2008', price=68880, baths=1, beds=2, sq__ft=796),
 Row(street=u'2805 JANETTE WAY', sale_date=u'Wed May 21 00:00:00 EDT 2008', price=69307, baths=1, beds=2, sq__ft=852),
 Row(street=u'6001 MCMAHON DR', sale_date=u'Wed May 21 00:00:00 EDT 2008', price=81900, baths=1, beds=2, sq__ft=797)]

The next few cells just have a few examples of my trying to manipulate the columns in Spark. This is what eventually convinced me to avoid doing this type of stuff in Spark - so much easier in SQL.  

In [8]:
# Just for kicks, I'd like to add a derived variable.  This is a map operation to create a new column
# This works in some older examples, but does not in 2.1.0 - so annoying

# foo = df_model.select('price').map(lambda x: x * 1000).show()


In [9]:
# the above did not work - let's try again
# this is kind of stupid.  convert to rdd to perform a map.
# but then you have to convert back to DF   STOOPID!!

# foo = df_model.select('price').rdd.map(lambda x: (x[0] * 100))
# foo.collect()[0:4]

In [10]:
# note: foo is not a list
# type(foo)

In [11]:
# is there a native dataframe method that will do it?
# this is the way to do it!
# but it appears to be not complete
# Need to import stuff for functions

# df_model = df_model.withColumn('price_new', (df_model.price * 1000))
# df_model.show(4)

### Linear regression with ML
My primary goal here is to simply get the table into the correct format for the ML libraries.  I did not do all the proper things you are supposed to do like set up training and validation splits. Nor did I do all of the regression diagnostics.  I'll probably save that for another day.

For someone who is used to fitting models in R or even regular Python, this was an amazingly frustrating experience.  I hope this example spares you some of that frustration.

Note.  The cell right below this is the MLib way to start building the input dataset.  Most of the examples I found online used this approach.  I'm leaving it here for reference, but it is not what I used.

In [12]:
# function to label.  why is this not a built-in function???
# building this from all the mlib examples
# this seems to be the way for RDDs but doesn't work for spark dataframes

# def labelData(data):
#     return data.map(lambda row: LabeledPoint(row[-1], row[:-1]))

# df_design = labelData(df_model)

In [13]:
df_model.columns

['street', 'sale_date', 'price', 'baths', 'beds', 'sq__ft']

In [14]:
# Get the predictoes for the regression model
df_x = df_model.drop('price', 'price_new', 'street', 'sale_date')
df_x.show(5)

+-----+----+------+
|baths|beds|sq__ft|
+-----+----+------+
|    1|   2|   836|
|    1|   3|  1167|
|    1|   2|   796|
|    1|   2|   852|
|    1|   2|   797|
+-----+----+------+
only showing top 5 rows



In [15]:
# Rename the price column to 'label'
df_model = df_model.withColumnRenamed('price', 'label')

# Create feature vector - a vector comprising the values of the three predictors
pred_col = df_x.columns
assembler = VectorAssembler(inputCols = pred_col, outputCol = "features")
df_model_fit = assembler.transform(df_model)

df_model_fit.show(5)

+----------------+--------------------+-----+-----+----+------+----------------+
|          street|           sale_date|label|baths|beds|sq__ft|        features|
+----------------+--------------------+-----+-----+----+------+----------------+
|    3526 HIGH ST|Wed May 21 00:00:...|59222|    1|   2|   836| [1.0,2.0,836.0]|
|     51 OMAHA CT|Wed May 21 00:00:...|68212|    1|   3|  1167|[1.0,3.0,1167.0]|
|  2796 BRANCH ST|Wed May 21 00:00:...|68880|    1|   2|   796| [1.0,2.0,796.0]|
|2805 JANETTE WAY|Wed May 21 00:00:...|69307|    1|   2|   852| [1.0,2.0,852.0]|
| 6001 MCMAHON DR|Wed May 21 00:00:...|81900|    1|   2|   797| [1.0,2.0,797.0]|
+----------------+--------------------+-----+-----+----+------+----------------+
only showing top 5 rows



In [16]:
type(df_model_fit)

pyspark.sql.dataframe.DataFrame

In [17]:
# The original columns are still there
df_model_fit.columns

['street', 'sale_date', 'label', 'baths', 'beds', 'sq__ft', 'features']

In [18]:
# Create the linear regression object and 
# Use the fit method. The result is a model
# object
lr = LinearRegression()
model = lr.fit(df_model_fit)

In [19]:
# p-values
model.summary.pValues

[5.009326287108706e-13, 0.17614914553420657, 0.0035325570725432964, 0.0]

In [20]:
# Predicted values appended to the original table
model.summary.predictions.show(20)

+--------------------+--------------------+--------+-----+----+------+----------------+------------------+
|              street|           sale_date|   label|baths|beds|sq__ft|        features|        prediction|
+--------------------+--------------------+--------+-----+----+------+----------------+------------------+
|        3526 HIGH ST|Wed May 21 00:00:...| 59222.0|    1|   2|   836| [1.0,2.0,836.0]|184127.19166461288|
|         51 OMAHA CT|Wed May 21 00:00:...| 68212.0|    1|   3|  1167|[1.0,3.0,1167.0]|182508.27125869133|
|      2796 BRANCH ST|Wed May 21 00:00:...| 68880.0|    1|   2|   796| [1.0,2.0,796.0]|183354.65708525322|
|    2805 JANETTE WAY|Wed May 21 00:00:...| 69307.0|    1|   2|   852| [1.0,2.0,852.0]|184436.20549635676|
|     6001 MCMAHON DR|Wed May 21 00:00:...| 81900.0|    1|   2|   797| [1.0,2.0,797.0]|183373.97044973721|
|  5828 PEPPERMILL CT|Wed May 21 00:00:...| 89921.0|    1|   3|  1122|[1.0,3.0,1122.0]| 181639.1698569117|
| 6048 OGDEN NASH WAY|Wed May 21 00:0