In [1]:
from pyspark import SparkContext, SparkConf
conf=SparkConf().setMaster('yarn').setAppName("Lab6")
conf.set("spark.executor.memory", "4g")
conf.set("spark.driver.memory", "4g")
conf.set("spark.cores.max", "2")
conf.set("spark.yarn.dist.archives","spark.tar.gz#environment")
import os
os.environ['PYSPARK_PYTHON'] = "environment/bin/python"
sc=SparkContext(conf=conf)
import numpy as np

# Prepare data for modeling
+ read file to rdd
+ remove header
+ split row, convert all field to numeric
+ scaled data for better loss surface

In [2]:
rawRDD = (sc.textFile("hdfs://yarnmaster:9000/data",8))

Read data + remove header

In [3]:
%%time
# remove header and show the first 05 rows
tagsheader = rawRDD.first()
header = sc.parallelize([tagsheader])
nonHeaderRDD = rawRDD.subtract(header)
print(nonHeaderRDD.take(5))

['"83",50000,3650,3,1,2,"yes","no","no","no","no",0,"no"', '"97",53900,8250,3,1,1,"yes","no","no","no","no",2,"no"', '"98",59900,8250,3,1,1,"yes","no","yes","no","no",3,"no"', '"103",125000,4320,3,1,2,"yes","no","yes","yes","no",2,"no"', '"104",132000,3500,4,2,2,"yes","no","no","yes","no",2,"no"']
CPU times: user 22 ms, sys: 2.05 ms, total: 24.1 ms
Wall time: 4.16 s


#### Split data + convert to numeric
+ There is no missing value in ds -> no row to remove
+ the price column has 1 value = 1e+05 -> need to convert by hand
+ Non numeric column value is only YES or NO -> Convert to 1-0 directly, no need to use sklearn

In [4]:
def splitFunc(x):
    res = x.split(',')[1:]
    numeric_indexes = [0,1,2,3,4,10]
    non_numeric_indexes = [5,6,7,8,9,11]
    
    for i in numeric_indexes:
        if res[i] != "1e+05":
            res[i] = int(res[i])
        else:
            res[i] = 100000
    for i in non_numeric_indexes:
        res[i] = res[i][1:-1]
        if res[i] == 'yes':
            res[i] = 1
        else:
            res[i] = 0
    return res

splittedRDD = nonHeaderRDD.map(splitFunc)
data = splittedRDD.collect()
splittedRDD.take(5)

[[50000, 3650, 3, 1, 2, 1, 0, 0, 0, 0, 0, 0],
 [53900, 8250, 3, 1, 1, 1, 0, 0, 0, 0, 2, 0],
 [59900, 8250, 3, 1, 1, 1, 0, 1, 0, 0, 3, 0],
 [125000, 4320, 3, 1, 2, 1, 0, 1, 1, 0, 2, 0],
 [132000, 3500, 4, 2, 2, 1, 0, 0, 1, 0, 2, 0]]

### Scale data in range 0 - 1
#### i want to treat every feature equally -> normalize all features to 0-1
#### i want to keep price the same (no scaled) to make big penalty to error
+ max size = 10000 (estimate)
+ max #of bedrooms, bathrooms, stories = 3 (estimate)

=> Divide each column by max value to scaled data

In [5]:
def scaledFunc(x):
    x[0] = x[0]
    x[1] = x[1]/10000
    x[2] = x[2]/3
    x[3] = x[3]/3
    x[4] = x[4]/3
    return x

scaledRDD = splittedRDD.map(scaledFunc)
scaledRDD.take(1)

[[50000,
  0.365,
  1.0,
  0.3333333333333333,
  0.6666666666666666,
  1,
  0,
  0,
  0,
  0,
  0,
  0]]

In [6]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np

#### Convert data to labeledPoint

In [7]:
def parsePoint(x):
    return LabeledPoint(x[0], np.array(x[1:]))

In [8]:
labeledRDD = scaledRDD.map(parsePoint)
labeledRDD.take(1)

[LabeledPoint(50000.0, [0.365,1.0,0.3333333333333333,0.6666666666666666,1.0,0.0,0.0,0.0,0.0,0.0,0.0])]

## Train test split 80-20

In [9]:
weights = [.8, .2]
seed = 42

In [10]:
trainRDD, testRDD = labeledRDD.randomSplit(weights, seed)

#### Cache train rdd for later use (because we need to use it frequently)

In [11]:
trainRDD.cache()

PythonRDD[16] at RDD at PythonRDD.scala:53

In [12]:
nTrain = trainRDD.count()
nTest = testRDD.count()

In [13]:
print(nTrain, nTest)

446 100


## TRAIN THE MODEL

#### Function to calculate MSE

In [14]:
def squaredError(label, prediction):
    """Calculates the the squared error for a single prediction.

    Args:
        label (float): The correct value for this observation.
        prediction (float): The predicted value for this observation.

    Returns:
        float: The difference between the `label` and `prediction` squared.
    """
    return (label-prediction)**2

def calcRMSE(labelsAndPreds):
    """Calculates the root mean squared error for an `RDD` of (label, prediction) tuples.

    Args:
        labelsAndPred (RDD of (float, float)): An `RDD` consisting of (label, prediction) tuples.

    Returns:
        float: The square root of the mean of the squared errors.
    """
    return np.sqrt(labelsAndPreds.map(lambda x: squaredError(x[0], x[1])).sum()/labelsAndPreds.count())

#### Function to do the inference

In [15]:
def getLabeledPrediction(weights, observation):
    """Calculates predictions and returns a (label, prediction) tuple.

    Note:
        The labels should remain unchanged as we'll use this information to calculate prediction
        error later.

    Args:
        weights (np.ndarray): An array with one weight for each features in `trainData`.
        observation (LabeledPoint): A `LabeledPoint` that contain the correct label and the
            features for the data point.

    Returns:
        tuple: A (label, prediction) tuple.
    """
    
    return (observation.label, weights.dot(observation.features))

#### Function to calculate the gradient

In [16]:
def gradientSummand(weights, lp):
    """Calculates the gradient summand for a given weight and `LabeledPoint`.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        weights (DenseVector): An array of model weights (betas).
        lp (LabeledPoint): The `LabeledPoint` for a single observation.

    Returns:
        DenseVector: An array of values the same length as `weights`.  The gradient summand.
    """
    return (weights.dot(lp.features) - lp.label) * lp.features

#### Train loop function
+ I experimented with different alpha value and found 0.5 is a good ones

In [17]:
def linregGradientDescent(trainData, numIters, alpha=0.5):
    """Calculates the weights and error for a linear regression model trained with gradient descent.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        trainData (RDD of LabeledPoint): The labeled data for use in training the model.
        numIters (int): The number of iterations of gradient descent to perform.

    Returns:
        (np.ndarray, np.ndarray): A tuple of (weights, training errors).  Weights will be the
            final weights (one weight per feature) for the model, and training errors will contain
            an error (RMSE) for each iteration of the algorithm.
    """
    # The length of the training data
    n = nTrain
    # The number of features in the training data
    d = 11
    w = np.ones(d)*1000
    # We will compute and store the training error after each iteration
    errorTrain = np.zeros(numIters)
    for i in range(numIters):
        # Use getLabeledPrediction from (3b) with trainData to obtain an RDD of (label, prediction) tuples. 
        # Note that the weights all equal 0 for the first iteration, so the predictions will have large errors to start.
        labelsAndPredsTrain = trainData.map(lambda x: getLabeledPrediction(w, x))
        errorTrain[i] = calcRMSE(labelsAndPredsTrain)

        # Calculate the `gradient`.  Make use of the `gradientSummand` function you wrote in (3a).
        # Note that `gradient` sould be a `DenseVector` of length `d`.
        gradient = trainData.map(lambda x: gradientSummand(w, x)).sum()/ n
        # Update the weights
        w -= alpha * gradient
        
    return w, errorTrain

In [18]:
%%time
numIters = 30
weightsLR0, errorTrainLR0 = linregGradientDescent(trainRDD, numIters)

CPU times: user 527 ms, sys: 104 ms, total: 631 ms
Wall time: 53.3 s


In [19]:
print(errorTrainLR0)

[69142.04074196 60838.25518856 53717.53315813 47615.58336835
 42397.14081003 37948.61936465 34172.88944735 30985.45905976
 28311.64343748 26084.49998288 24243.40797551 22733.2043569
 21503.76940994 20509.92033466 19711.44943075 19073.15696539
 18564.77512193 18160.7399029  17839.82068516 17584.64929084
 17381.20068253 17218.27203952 17086.99418223 16980.39540761
 16893.02622696 16820.64538632 16759.9626746  16708.43162931
 16664.08451207 16625.4021711 ]


In [20]:
print(weightsLR0)

[18604.06268639 15767.27159834 19503.45243351 18240.86214189
 10368.46507724  5955.71681927  5266.15796703  8882.02828151
 14653.91068647  6081.02775781  9253.26756527]


## Evaluate test set with RMSE

In [21]:
labelsAndPredsTest = testRDD.map(lambda x: getLabeledPrediction(np.ones(11)*1000, x))
print("Initial error: ", calcRMSE(labelsAndPredsTest))
labelsAndPredsTest = testRDD.map(lambda x: getLabeledPrediction(weightsLR0, x))
print("Final error: ", calcRMSE(labelsAndPredsTest))

Initial error:  61475.99608307891
Final error:  13901.918535607321


In [22]:
labelsAndPredsTest.take(5)

[(43000.0, 51773.98575540007),
 (50000.0, 58611.08828318928),
 (74500.0, 69376.76050906078),
 (87250.0, 68242.25089627532),
 (46200.0, 62091.25623232394)]