## Least Squares Regression
Spark exposes two interfaces to data:
- An RDD interface which represents a collection of rows which can be any python object.
- A dataframe interface which is similar to Pandas (but with less functionality) and built on top of the RDD interface

In [2]:
# load this data using the dataframe interface to Spark
diamonds = (
  sqlContext.read.format('csv')
    .options(header='true', inferSchema='true')
    .load('/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv')
    .cache()
  )

In [3]:
display(diamonds)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


In [4]:
diamonds.count()

In [5]:
diamonds.columns

In [6]:
# examine a subset of the columns and only expensive diamonds
# use the Dataframe API to subselect the data.
data = (
  diamonds
    .where(diamonds['price'] > 1000)
    .select(['cut', 'color', 'carat', 'clarity', 'price'])
)

In [7]:
display(data.sample(False, 0.01).select(['carat', 'price']))

carat,price
0.72,2776
0.77,2798
0.73,2805
0.72,2819
0.73,2841
0.9,2857
0.79,2876
0.74,2906
0.73,2916
0.71,2921


In [8]:
# access the underlying RDD for any dataframe
rdd = data.rdd
rdd

In [9]:
# try a few functions on the rdd. 
# This RDD is composed of Row objects that behave like python dictionaries and even have a convenient toDict() function that returns a native Python dictionary. 
# keys in the row correspond to the columns in the dataframe.
rdd.count()

In [10]:
# take an RDD and transform each row into a tuple consisting of the String corresponding to the color of the diamond in that row and the integer 1. 
# The reduceByKey function assums that the input RDD consists of tuples of (key, value) pairs. It takes a function that takes two values (or intermediate value) and combines them to produce a new value. 
# Notice that nothing happens when we run this cell:
counts_rdd = rdd.map(lambda row: (row['color'], 1)).reduceByKey(lambda a, b: a + b)
counts_rdd

In [11]:
# compute an actual value we instead must invoke an action like collect() or count().
counts_rdd.collect()

In [12]:
# specify the set of features and which features are categorical.
features = ['cut', 'color', 'carat', 'clarity']
categorical = set(['cut', 'color', 'clarity'])
label = 'price'

In [13]:
def xty_map(row):
  row = row.asDict()
  for i in features:
    # If the features is not categorical (e.g., carat) then:
    #    the features name is just the name of the feature and 
    #    the value is just the value
    # otherwise the feature is categorical and we implement a one-hot-encoding
    #    the feature name is the name + "_" + value (e.g., cut_Ideal)
    #    the feature value is 1.0
    (ki, vi) = (i, row[i]) if i not in categorical else (i+"_"+row[i], 1.0)
    yield (ki, vi * row[label])

In [14]:
def xtx_map(row):
  row = row.asDict()
  # Iterating over features = ['cut', 'carat', ...]
  # we are computing the outer product over each feature value:
  for i in features:
    (ki, vi) = (i, row[i]) if i not in categorical else (i+"_"+row[i], 1.0)
    for j in features:
      (kj, vj) = (j, row[j]) if j not in categorical else (j+"_"+row[j], 1.0)
      yield ((ki,kj), vi * vj)

In [15]:
row = data.take(1)[0]
row

In [16]:
[a for a in xtx_map(row)]

In [17]:
[a for a in xty_map(row)]

In [18]:
# use the flatMap function which is nearly identical to the map function except that it may output multiple values which are all concatenated to the output.
# The following lines will trigger computation across the data center by shipping our xtx_map and xty_map functions to remote workers wher ethe data is stored.
xtx_data = ( data.rdd
  .flatMap(xtx_map)
  .reduceByKey(lambda a, b: a + b)
  .collect()
)

xty_data = ( data.rdd
  .flatMap(xty_map)
  .reduceByKey(lambda a, b: a + b)
  .collect()
)

In [19]:
# compute the index in the feature vector for each of the string features.
index = dict(zip([r[0] for r in xty_data], range(len(xty_data))))
p = len(index)
print("Dimensions", p)
index

In [20]:
# Using the index which translates each feature name to a particular location in an x vector we then fill out the entries of the local XTY and XTX matrices. 
# Notice that this is done entirely in this python notebook and not by calling Spark. For very high dimensional problems this could be an issue.
import numpy as np
XTY = np.zeros((p,1))
for (k,v) in xty_data:
  XTY[index[k]] = v

XTX = np.zeros((p,p))
for ((k1,k2),v) in xtx_data:
  XTX[index[k1], index[k2]] = v

In [21]:
#  compute the optimal theta
theta = np.linalg.solve(XTX, XTY)

In [22]:
# define a new user defined function to run on the PySpark dataframe and render predictions
def predict(row):
  row = row.asDict()
  pred = 0.0
  for i in features:
    (ki, vi) = (i, row[i]) if i not in categorical else (i+"_"+row[i], 1.0)
    pred += vi * theta[index[ki],0]
  return float(pred)

In [23]:
# add the predictions directly into our dataframe. 
# The following block of code creates a user defined function (UDF) and then uses the UDF to add an additional column to the Dataframe.
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import FloatType

# A UDF must have a defined return type
predict_udf = udf(predict, FloatType())

# the Struct of features states which columns in the dataframe are given to the UDF
data = data.withColumn("pred", predict_udf(struct(features)))

In [24]:
from pyspark.sql.functions import mean
data = data.withColumn("resid", data['pred'] - data['price'])
np.sqrt(data.select(mean(data['resid'] * data['resid'])).collect()[0][0])

In [25]:
display(data.sample(False, 0.1).select('resid'))

resid
488.91895
198.46631
587.1675
877.9092
-229.83374
553.85767
1022.9768
-51.443848
468.16455
1045.188


In [26]:
display(data.sample(False, 0.01))

cut,color,carat,clarity,price,pred,resid
Premium,F,0.71,SI1,2809,2638.3445,-170.65552
Very Good,H,0.77,SI1,2830,2280.7979,-549.20215
Ideal,D,0.72,SI1,2883,3333.9124,450.91235
Ideal,E,0.81,SI2,2938,2962.8545,24.854492
Ideal,F,0.7,VS1,2940,3960.0754,1020.07544
Ideal,D,0.75,SI1,2954,3628.0828,674.08276
Ideal,F,0.72,VS2,2956,3772.0295,816.02954
Very Good,F,0.7,VS1,2959,3762.4482,803.44824
Ideal,G,0.64,IF,2960,4468.2856,1508.2856
Ideal,I,0.71,VVS2,3007,3053.9968,46.996826


In [27]:
from pyspark.sql.functions import lit
avg_price = data.select(mean(data['price'])).collect()[0][0]
data = data.withColumn("mean_resid", data['pred'] - lit(avg_price))
np.sqrt(data.select(mean(data['mean_resid'] * data['mean_resid'])).collect()[0][0])