In [1]:
#Load Data

In [1]:
raw_data = sc.textFile('/tmp/millionsong.txt')

In [2]:
raw_data.first()

u'2001.0,0.884123733793,0.610454259079,0.600498416968,0.474669212493,0.247232680947,0.357306088914,0.344136412234,0.339641227335,0.600858840135,0.425704689024,0.60491501652,0.419193351817'

In [36]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from matplotlib.colors import ListedColormap, Normalize
from matplotlib.cm import get_cmap
import matplotlib.pyplot as plt

df = raw_data.map(lambda x: x.split(",")).map(lambda x: LabeledPoint(x[0],x[1:])).toDF(['features','label'])



In [4]:
df.selectExpr('MAX(label)','MIN(label)').show()

+----------+----------+
|MAX(label)|MIN(label)|
+----------+----------+
|    2011.0|    1922.0|
+----------+----------+



In [5]:
from pyspark.sql.functions import col

parsed_df = df.select(col('label')-1922, 'features')\
              .withColumnRenamed("(label - 1922)",'label')
parsed_df.head()

Row(label=79.0, features=DenseVector([0.8841, 0.6105, 0.6005, 0.4747, 0.2472, 0.3573, 0.3441, 0.3396, 0.6009, 0.4257, 0.6049, 0.4192]))

In [6]:
train_df, val_df, test_df = parsed_df.randomSplit([0.8,0.1,0.1])

train_df.cache() 
val_df.cache() 
test_df.cache()

DataFrame[label: double, features: vector]

In [7]:
avg = test_df.agg({"label":"mean"}).map(lambda x: x[0]).collect()[0]

In [8]:
#baseline model - use average to predict

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import lit

evaluator = RegressionEvaluator(predictionCol="prediction")

baseline_pred_label_df = train_df.select('label').withColumn('prediction',lit(avg))

print "Baseline Test RMSE is equal to %s" %(evaluator.evaluate(baseline_pred_label_df)) 

Baseline Test RMSE is equal to 21.5431516404


In [9]:
#Gradient Descent by hand

In [10]:
from pyspark.mllib.linalg import DenseVector

evaluator = RegressionEvaluator(predictionCol="prediction")
def calc_RMSE(dataset):
    """Calculates the root mean squared error for an dataset of (prediction, label) tuples.

    Args:
        dataset (DataFrame of (float, float)): A `DataFrame` consisting of (prediction, label) tuples.

    Returns:
        float: The square root of the mean of the squared errors.
    """
#    evaluator = RegressionEvaluator(predictionCol="prediction")
    return evaluator.evaluate(dataset)

def gradient_summand(weights, lp):
    """Calculates the gradient summand for a given weight and `LabeledPoint`."""
    summand = DenseVector((DenseVector.dot(lp.features,weights) - lp.label)*lp.features)
    return summand

def get_labeled_prediction(weights, observation):
    """Calculates predictions given a tuple of (labeledpoint,features) 
       and returns a (prediction, label) tuple."""
    
    prediction = float(DenseVector.dot(DenseVector(weights),observation.features))
    label = float(observation.label)
    
    return prediction,label

In [11]:
d = len(train_df.first().features)
w = np.zeros(d)
train_df.map(lambda x: get_labeled_prediction(w,x)).first()

(0.0, 79.0)

In [12]:
train_df.map(lambda x: gradient_summand(w,x)).first()

DenseVector([-71.8097, -49.933, -44.0369, -39.3628, -21.8353, -24.712, -35.4339, -35.4453, -51.3335, -38.6996, -46.7607, -35.5502])

In [13]:
def linreg_gradient_descent(train_data, num_iters):
    """Calculates the weights and error for a linear regression model trained with gradient descent.

    Returns a tuple of (weights, training errors).  Weights will be the
            final weights (one weight per feature) for the model, and training errors will contain
            an error (RMSE) for each iteration of the algorithm.
    """
    # The length of the training data
    n = train_data.count()
    # The number of features in the training data
    d = len(train_data.first().features)
    w = np.zeros(d)
    alpha = 1.0
    # We will compute and store the training error after each iteration
    error_train = np.zeros(num_iters)
    for i in range(num_iters):
        preds_and_labels_train = train_data.map(lambda x: get_labeled_prediction(w,x))
        preds_and_labels_train_df = preds_and_labels_train.toDF(["prediction", "label"])
        error_train[i] = calc_RMSE(preds_and_labels_train_df)

        # Calculate the `gradient`.  Make use of the `gradient_summand` function you wrote in (3a).
        # Note that `gradient` should be a `DenseVector` of length `d`.
        gradient = train_data.map(lambda x: gradient_summand(w,x)).sum()

        # Update the weights
        alpha_i = alpha / (n * np.sqrt(i+1))
        w = w - alpha_i * gradient
        
    return w, error_train
    

linreg_gradient_descent(train_df, 100)

(array([ 25.54267199,  24.28569575,  -3.73016841,   8.63990605,
          6.56753476,  -8.49258512,  18.21909824,   2.12252844,
          9.91043433,   4.86732809,  11.50202736,   1.9461365 ]),
 array([  58.07764142,  105.32248399,  111.01517104,   76.87970407,
          39.39049974,   22.76882326,   20.40322172,   20.23965109,
          20.16885342,   20.10417476,   20.04391336,   19.98743327,
          19.9342358 ,   19.88392114,   19.83616298,   19.79069097,
          19.74727821,   19.70573211,   19.66588755,   19.62760176,
          19.59075028,   19.55522389,   19.52092612,   19.48777128,
          19.45568284,   19.42459216,   19.39443735,   19.36516245,
          19.33671663,   19.30905356,   19.28213089,   19.25590979,
          19.23035456,   19.20543228,   19.18111252,   19.15736711,
          19.13416988,   19.11149651,   19.08932431,   19.06763209,
          19.04640006,   19.02560965,   19.00524345,   18.98528508,
          18.96571915,   18.94653113,   18.92770733,   18.

####  Train the model
#### Now let's train a linear regression model on all of our training data and evaluate its accuracy on the validation set.
#### Note that the test set will not be used here. If we evaluated the model on the test set, we would bias our final results.

In [15]:
num_iters = 50
weights_LR0, error_train_LR0 = linreg_gradient_descent(train_df,num_iters)

preds_and_labels = (val_df
                      .map(lambda x: get_labeled_prediction(weights_LR0,x)))
preds_and_labels_df = sqlContext.createDataFrame(preds_and_labels, ["prediction", "label"])
rmse_val_LR0 = calc_RMSE(preds_and_labels_df)

print 'Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}'.format(avg,
                                                                       rmse_val_LR0)

Validation RMSE:
	Baseline = 53.403
	LR0 = 18.403


# MLlib implemenatation

In [16]:
from pyspark.ml.regression import LinearRegression
# Values to use when training the linear regression model

num_iters = 500  # iterations
reg = 1e-1  # regParam
alpha = .2  # elasticNetParam
use_intercept = True  # intercept

# TODO: Replace <FILL IN> with appropriate code
lin_reg = LinearRegression(maxIter=num_iters, regParam=reg, 
                           elasticNetParam=0.1, fitIntercept=True)
first_model = lin_reg.fit(train_df)

# coeffsLR1 stores the model coefficients; interceptLR1 stores the model intercept
coeffs_LR1 = first_model.coefficients
intercept_LR1 = first_model.intercept
print coeffs_LR1, intercept_LR1

[22.7826076628,27.2671667664,-66.4716850827,53.9043829814,-8.77708333401,-49.3084390785,32.6676055812,-22.0419154042,3.06020516062,-4.62732222397,-11.8928196002,-13.7315891729] 65.6379221505


In [17]:
pred = first_model.transform(train_df)
pred.select('label','prediction').toPandas().head(10)



Unnamed: 0,label,prediction
0,79,66.80251
1,79,69.722341
2,79,64.049553
3,79,67.092058
4,79,64.94205
5,79,66.935711
6,85,70.47138
7,80,60.532097
8,82,49.532286
9,81,64.972118


In [18]:
from pyspark.sql import Row
validation = first_model.transform(val_df).select('prediction','label')
rmse_val_LR1 = evaluator.evaluate(validation)

In [19]:
print avg, rmse_val_LR0, rmse_val_LR1

53.4030075188 18.4027353417 15.5050063501


# Grid Search

In [39]:
best_RMSE = rmse_val_LR1
best_reg_param = reg
best_model = first_model

num_iters = 500  # iterations
alpha = .2  # elasticNetParam
use_intercept = True  # intercept

for reg in [1e-10,1e-5,.99]:
    lin_reg = LinearRegression(maxIter=num_iters, regParam=reg, elasticNetParam=alpha, fitIntercept=use_intercept)
    model = lin_reg.fit(train_df)
    val_pred_df = model.transform(val_df)

    rmse_val_grid = evaluator.evaluate(val_pred_df)

    if rmse_val_grid < best_RMSE:
        best_RMSE = rmse_val_grid
        best_reg_param = reg
        best_model = model

rmse_val_LR_grid = best_RMSE

print ('Validation RMSE:\n\t\tBaseline = {0:.3f}\n\t\tLR0 = {1:.3f}\n\t\tLR1 = {2:.3f}\n' +
       '\t\tLRGrid = {3:.3f}').format(avg, rmse_val_LR0, rmse_val_LR1, rmse_val_LR_grid)

Validation RMSE:
		Baseline = 53.403
		LR0 = 18.403
		LR1 = 15.505
		LRGrid = 15.505


In [76]:
from itertools import product

def two_way_interactions(lp):
    """Creates a new `LabeledPoint` that includes two-way interactions.

    Note:
        For features [x, y] the two-way interactions would be [x^2, x*y, y*x, y^2] and these
        would be appended to the original [x, y] feature list.

    Args:
        lp (LabeledPoint): The label and features for this observation.

    Returns:
        LabeledPoint: The new `LabeledPoint` should have the same label as `lp`.  Its features
            should include the features from `lp` followed by the two-way interaction features.
    """
    i_j = list(product(range(len(lp.features)),range(len(lp.features))))
    
    two_way = LabeledPoint(lp.label, np.hstack((lp.features,[lp.features[i] * lp.features[j] for (i,j) in i_j])))
    
    return two_way

In [86]:
two_way_train_df = train_df.map(lambda x: LabeledPoint(x[0],x[1]))\
                           .map(lambda x: two_way_interactions(x)).toDF(['features','label'])
two_way_val_df = val_df.map(lambda x: LabeledPoint(x[0],x[1]))\
                           .map(lambda x: two_way_interactions(x)).toDF(['features','label'])
two_way_test_df = test_df.map(lambda x: LabeledPoint(x[0],x[1]))\
                           .map(lambda x: two_way_interactions(x)).toDF(['features','label'])

In [87]:
two_way_train_df.head()

Row(features=DenseVector([0.909, 0.6321, 0.5574, 0.4983, 0.2764, 0.3128, 0.4485, 0.4487, 0.6498, 0.4899, 0.5919, 0.45, 0.8263, 0.5745, 0.5067, 0.4529, 0.2512, 0.2843, 0.4077, 0.4078, 0.5906, 0.4453, 0.538, 0.409, 0.5745, 0.3995, 0.3523, 0.3149, 0.1747, 0.1977, 0.2835, 0.2836, 0.4107, 0.3096, 0.3741, 0.2844, 0.5067, 0.3523, 0.3107, 0.2777, 0.1541, 0.1744, 0.25, 0.2501, 0.3622, 0.2731, 0.3299, 0.2508, 0.4529, 0.3149, 0.2777, 0.2483, 0.1377, 0.1559, 0.2235, 0.2236, 0.3238, 0.2441, 0.2949, 0.2242, 0.2512, 0.1747, 0.1541, 0.1377, 0.0764, 0.0865, 0.124, 0.124, 0.1796, 0.1354, 0.1636, 0.1244, 0.2843, 0.1977, 0.1744, 0.1559, 0.0865, 0.0979, 0.1403, 0.1403, 0.2033, 0.1532, 0.1852, 0.1408, 0.4077, 0.2835, 0.25, 0.2235, 0.124, 0.1403, 0.2012, 0.2012, 0.2915, 0.2197, 0.2655, 0.2018, 0.4078, 0.2836, 0.2501, 0.2236, 0.124, 0.1403, 0.2012, 0.2013, 0.2915, 0.2198, 0.2656, 0.2019, 0.5906, 0.4107, 0.3622, 0.3238, 0.1796, 0.2033, 0.2915, 0.2915, 0.4222, 0.3183, 0.3846, 0.2924, 0.4453, 0.3096, 0.2731, 0.2

In [89]:
num_iters = 500
reg = 1e-10
alpha = .2
use_intercept = True

lin_reg = LinearRegression(maxIter=num_iters, regParam=reg, elasticNetParam=alpha, fitIntercept=use_intercept)
model_interact = lin_reg.fit(two_way_train_df)
preds_and_labels_interact_df = model_interact.transform(two_way_val_df)
rmse_val_interact = evaluator.evaluate(preds_and_labels_interact_df)

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n\tLRGrid = ' +
       '{3:.3f}\n\tLRInteract = {4:.3f}').format(avg, rmse_val_LR0, rmse_val_LR1,
                                                 rmse_val_LR_grid, rmse_val_interact)

Validation RMSE:
	Baseline = 53.403
	LR0 = 18.403
	LR1 = 15.505
	LRGrid = 15.505
	LRInteract = 14.882


In [92]:
preds_and_labels_test_df = model_interact.transform(two_way_test_df)
rmse_test_interact = evaluator.evaluate(preds_and_labels_test_df)

print ('Test RMSE:\n\tBaseline = {0:.3f}\n\tLRInteract = {1:.3f}'
       .format(avg, rmse_test_interact))

Test RMSE:
	Baseline = 53.403
	LRInteract = 14.487


In [94]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import PolynomialExpansion

num_iters = 500
reg = 1e-10
alpha = .2
use_intercept = True

polynomial_expansion = PolynomialExpansion(degree=2)
linear_regression = LinearRegression(maxIter=num_iters, regParam=reg, elasticNetParam=alpha,
                                     fitIntercept=use_intercept, featuresCol='polyFeatures')

pipeline = Pipeline(stages=[polynomial_expansion,linear_regression])
pipeline_model = pipeline.fit(train_df)

predictions_df = pipeline_model.transform(test_df)

evaluator = RegressionEvaluator()
rmse_test_pipeline = evaluator.evaluate(predictions_df, {evaluator.metricName: "rmse"})
print('RMSE for test data set using pipelines: {0:.3f}'.format(rmse_test_pipeline))

Py4JJavaError: An error occurred while calling o8828.transform.
: java.util.NoSuchElementException: Failed to find a default value for inputCol
	at org.apache.spark.ml.param.Params$$anonfun$getOrDefault$2.apply(params.scala:647)
	at org.apache.spark.ml.param.Params$$anonfun$getOrDefault$2.apply(params.scala:647)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.ml.param.Params$class.getOrDefault(params.scala:646)
	at org.apache.spark.ml.PipelineStage.getOrDefault(Pipeline.scala:43)
	at org.apache.spark.ml.param.Params$class.$(params.scala:651)
	at org.apache.spark.ml.PipelineStage.$(Pipeline.scala:43)
	at org.apache.spark.ml.UnaryTransformer.transformSchema(Transformer.scala:106)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:68)
	at org.apache.spark.ml.UnaryTransformer.transform(Transformer.scala:117)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
