# Computer Lab: Quantile Regression

Olivier Fercoq

# Preparation
Please read the file TPquantile_regression_beginning.pdf for instructions on how to launch this notebook

In [1]:
%pylab

Using matplotlib backend: MacOSX
Populating the interactive namespace from numpy and matplotlib


In [2]:
# import the data from txt file, the dist_data is of type spark rdd

In [3]:
filename = './data/ethylene_methane.txt'

In [4]:
dist_data = sc.textFile(filename)
dist_data.take(4)

['Time (seconds), Methane conc (ppm), Ethylene conc (ppm), sensor readings (16 channels) ',
 '0.00    0.00    0.00    -41.98  2067.64 -37.13  2.28    8.63    -26.62  -8.46   -0.33   3437.73 2728.14 4054.03 4007.89 4478.27 5056.98 3639.09 3128.49',
 '0.01    0.00    0.00    -46.50  2067.88 -28.56  13.69   -12.35  -25.81  -5.04   -5.04   3432.44 2734.47 4038.62 4019.40 4496.72 5051.81 3636.97 3115.03',
 '0.02    0.00    0.00    -36.16  2055.81 -10.89  8.63    -2.93   -30.34  -9.27   -2.12   3438.61 2719.97 4030.92 4025.48 4489.54 5057.35 3641.81 3105.24']

In [5]:
type(dist_data)

pyspark.rdd.RDD

In [6]:
# filter to remove the first header line

In [7]:
dist_data = dist_data.filter(lambda line: line[0:4] != 'Time')  # remove the first line
dist_data.first()

'0.00    0.00    0.00    -41.98  2067.64 -37.13  2.28    8.63    -26.62  -8.46   -0.33   3437.73 2728.14 4054.03 4007.89 4478.27 5056.98 3639.09 3128.49'

In [8]:
# we removed the first line, take the first data row and turn it to float numbers by map.

In [9]:
s = dist_data.first()
#print(s)
a = np.array(list(map(float, s.split())))
a

array([  0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
        -4.19800000e+01,   2.06764000e+03,  -3.71300000e+01,
         2.28000000e+00,   8.63000000e+00,  -2.66200000e+01,
        -8.46000000e+00,  -3.30000000e-01,   3.43773000e+03,
         2.72814000e+03,   4.05403000e+03,   4.00789000e+03,
         4.47827000e+03,   5.05698000e+03,   3.63909000e+03,
         3.12849000e+03])

In [10]:
# map handles the function for each row of rdd, lambda function turned each line to float array

In [11]:
dist_data = dist_data.map(lambda line: np.array(list(map(float, line.split()))))

In [12]:
dist_data.first()

array([  0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
        -4.19800000e+01,   2.06764000e+03,  -3.71300000e+01,
         2.28000000e+00,   8.63000000e+00,  -2.66200000e+01,
        -8.46000000e+00,  -3.30000000e-01,   3.43773000e+03,
         2.72814000e+03,   4.05403000e+03,   4.00789000e+03,
         4.47827000e+03,   5.05698000e+03,   3.63909000e+03,
         3.12849000e+03])

In [13]:
# We take about half of the data for the training set, row[0] is the index ?

In [14]:
train_data_full = dist_data.filter(lambda row: row[0] < 42082.60 / 2.)

In [16]:
train_data_full.first()

array([  0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
        -4.19800000e+01,   2.06764000e+03,  -3.71300000e+01,
         2.28000000e+00,   8.63000000e+00,  -2.66200000e+01,
        -8.46000000e+00,  -3.30000000e-01,   3.43773000e+03,
         2.72814000e+03,   4.05403000e+03,   4.00789000e+03,
         4.47827000e+03,   5.05698000e+03,   3.63909000e+03,
         3.12849000e+03])

In [17]:
# In order to make development smoother, we first consider 1/100th of the training data. 
# This can be changed when the algorithm is operational.

In [18]:
train_data_full.cache()  # Otherwise, all previous filter operations are re-run each time an action is taken.

freq = 100
#train_data = train_data_full.filter(lambda row: int(row[0]*100) % freq == 0)
train_data = train_data_full.filter(lambda row: int(row[0]) % freq == 0)
train_data.cache()  # Otherwise, all previous filter operations are re-run each time an action is taken.


PythonRDD[9] at RDD at PythonRDD.scala:48

In [19]:
train_data.first()

array([  0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
        -4.19800000e+01,   2.06764000e+03,  -3.71300000e+01,
         2.28000000e+00,   8.63000000e+00,  -2.66200000e+01,
        -8.46000000e+00,  -3.30000000e-01,   3.43773000e+03,
         2.72814000e+03,   4.05403000e+03,   4.00789000e+03,
         4.47827000e+03,   5.05698000e+03,   3.63909000e+03,
         3.12849000e+03])

In [20]:
# for standardize both train data and train_data_full, yet collect() only the train_data

In [21]:
def standardize(train_data):
    print('before centering', train_data.first())
    row_sums = train_data.reduce(lambda a, b: a + b)
    print('nb of columns', row_sums.shape)
    n_train = train_data.count()
    print('nb of observations', n_train)

    row_means = row_sums / n_train
    print('row_means', row_means)

    train_data = train_data.map(lambda r: r - row_means)
    print('after centering', train_data.first())

    row_stds = np.sqrt(train_data.map(lambda r: r ** 2).reduce(lambda a, b: a + b) / n_train)
    print('row_stds', row_stds)
    train_data = train_data.map(lambda r: r / row_stds)
    print('after standardizing', train_data.first())

    row_sums_should_be_0 = train_data.reduce(lambda a, b: a + b)
    print('this should be 0', row_sums_should_be_0)
    row_stds_should_be_1 = np.sqrt(train_data.map(lambda r: r ** 2).reduce(lambda a, b: a + b) / n_train)
    print('this should be 1', row_stds_should_be_1)

    return train_data

train_data = standardize(train_data)
train_data_NotDistributed = np.array(train_data.collect())  # collect() should be used only on small RDDs

train_data_full = standardize(train_data_full)

before centering [  0.00000000e+00   0.00000000e+00   0.00000000e+00  -4.19800000e+01
   2.06764000e+03  -3.71300000e+01   2.28000000e+00   8.63000000e+00
  -2.66200000e+01  -8.46000000e+00  -3.30000000e-01   3.43773000e+03
   2.72814000e+03   4.05403000e+03   4.00789000e+03   4.47827000e+03
   5.05698000e+03   3.63909000e+03   3.12849000e+03]
nb of columns (19,)
nb of observations 21102
row_means [  1.04996516e+04   6.44759895e+01   3.86201924e+00   2.43582046e+03
   1.76862098e+03   2.77203506e+03   3.05812133e+03   1.93607030e+03
   2.47621240e+03   2.68626355e+03   2.98321356e+03   3.46175147e+03
   2.76061554e+03   2.29518589e+03   2.04530061e+03   1.76770531e+03
   1.90159642e+03   2.29297087e+03   1.85923681e+03]
after centering [ -1.04996516e+04  -6.44759895e+01  -3.86201924e+00  -2.47780046e+03
   2.99019024e+02  -2.80916506e+03  -3.05584133e+03  -1.92744030e+03
  -2.50283240e+03  -2.69472355e+03  -2.98354356e+03  -2.40214743e+01
  -3.24755360e+01   1.75884411e+03   1.96258939

In [22]:
train_data_NotDistributed.shape, train_data

((21102, 19), PythonRDD[19] at collect at <ipython-input-21-4759d30af17f>:27)

# Distributed least squares

### 3.2

In [23]:
X = train_data_NotDistributed[:, 3:]
y = train_data_NotDistributed[:, 1:3]
X.shape, y.shape

((21102, 16), (21102, 2))

In [24]:
# get closed-form solution
W = np.linalg.solve(a=X.T.dot(X), b=X.T.dot(y))

In [25]:
W.shape, W

((16, 2), array([[  0.20112332,   0.40003616],
        [  0.07544213,   0.07243659],
        [  6.80878733,  -1.50964233],
        [ -3.69864834,   6.17855383],
        [-10.68830581,  -1.12443226],
        [  3.74523502,  -1.53292001],
        [  0.60163843,  -6.83221707],
        [  2.867276  ,   3.19708374],
        [ -1.31779565,   0.45799037],
        [  1.07483576,  -0.87620455],
        [  1.64402582,  -1.17964847],
        [ -7.05906705,  -7.37769584],
        [  2.30394624,   1.89215007],
        [  5.1616917 ,   0.78613241],
        [  2.57448691,   5.48816121],
        [ -3.3299369 ,   2.8900424 ]]))

### 3.3

### on the reduced set

In [26]:
parallel_train_X = sc.parallelize(train_data_NotDistributed) # or use directly the train_data

In [27]:
XX = parallel_train_X.map(lambda a: np.outer(a[3:], a[3:])).reduce(lambda a,b: a+b)
yy = parallel_train_X.map(lambda a: np.outer(a[3:], a[1:3])).reduce(lambda a,b: a+b)

In [28]:
XX.shape, yy.shape

((16, 16), (16, 2))

In [29]:
W = np.linalg.inv(XX).dot(yy)

In [30]:
W

array([[  0.20112332,   0.40003616],
       [  0.07544213,   0.07243659],
       [  6.80878733,  -1.50964233],
       [ -3.69864834,   6.17855383],
       [-10.68830581,  -1.12443226],
       [  3.74523502,  -1.53292001],
       [  0.60163843,  -6.83221707],
       [  2.867276  ,   3.19708374],
       [ -1.31779565,   0.45799037],
       [  1.07483576,  -0.87620455],
       [  1.64402582,  -1.17964847],
       [ -7.05906705,  -7.37769584],
       [  2.30394624,   1.89215007],
       [  5.1616917 ,   0.78613241],
       [  2.57448691,   5.48816121],
       [ -3.3299369 ,   2.8900424 ]])

In [33]:
def solve_linear_parallel(rrd_train):
    XX = rrd_train.map(lambda a: np.outer(a[3:], a[3:])).reduce(lambda a,b: a+b)
    yy = rrd_train.map(lambda a: np.outer(a[3:], a[1:3])).reduce(lambda a,b: a+b)
    W = np.linalg.inv(XX).dot(yy)
    return W

solve_linear_parallel(train_data)

array([[  0.20112332,   0.40003616],
       [  0.07544213,   0.07243659],
       [  6.80878733,  -1.50964233],
       [ -3.69864834,   6.17855383],
       [-10.68830581,  -1.12443226],
       [  3.74523502,  -1.53292001],
       [  0.60163843,  -6.83221707],
       [  2.867276  ,   3.19708374],
       [ -1.31779565,   0.45799037],
       [  1.07483576,  -0.87620455],
       [  1.64402582,  -1.17964847],
       [ -7.05906705,  -7.37769584],
       [  2.30394624,   1.89215007],
       [  5.1616917 ,   0.78613241],
       [  2.57448691,   5.48816121],
       [ -3.3299369 ,   2.8900424 ]])

### on the full set

In [34]:
solve_linear_parallel(train_data_full)

array([[ 0.09682988,  0.0485206 ],
       [ 0.01240669, -0.01502376],
       [-0.14024083,  5.88192701],
       [ 0.34187629, -3.28065969],
       [-6.83667423,  2.31979054],
       [ 4.59287714, -1.15516419],
       [ 0.99581238, -5.54778859],
       [ 1.39054027,  1.35446226],
       [-0.867363  ,  0.33945828],
       [ 0.69831324, -0.46888552],
       [ 1.81161887, -2.64645593],
       [-3.18783061, -1.90203955],
       [ 0.22809633, -0.40600687],
       [ 2.78471187, -0.491782  ],
       [ 0.08956648,  4.89059757],
       [-1.11281948,  1.72824999]])