# Joint project AMD + SM2L

https://docs.google.com/document/d/1oqoIyRUI_digfIokf53fox0I1eWiTFzQaae-PxMre0Y/edit

The task is to implement from scratch a learning algorithm for **regression** with **square loss** (e.g., **ridge regression**). The label to be predicted must be selected among the following 5 attributes, removing the remaining 4 from the dataset:
- PERNP (Person's earnings)
- PINCP (Person's income)
- WAGP (Wages or salary income past 12 months)
- HINCP (Household income)
- FINCP (Family income)

This code is run inside the Docker container provided in the course.

## Dataset

The project is based on the analysis of the «2013 American Community Survey» dataset published on Kaggle and released under the public domain license (CC0).

https://www.kaggle.com/census/2013-american-community-survey

The American Community Survey is an ongoing survey from the US Census Bureau. In this survey, approximately 3.5 million households per year are asked detailed questions about who they are and how they live. Many topics are covered, including ancestry, education, work, transportation, internet use, and residency.

There are two types of survey data provided, housing and population:
- For the housing data, each row is a housing unit, and the characteristics are properties like rented vs. owned, age of home, etc.
- For the population data, each row is a person and the characteristics are properties like age, gender, whether they work, method/length of commute, etc.

Each data set is divided in two pieces, "a" and "b":
- "a" contains states 1 to 25;
- "b" contains states 26 to 50.

Both data sets have weights associated with them. Weights are included to account for the fact that individuals are not sampled with equal probably (people who have a greater chance of being sampled have a lower weight to reflect this):
- Weight variable for the housing data: WGTP
- Weight variable for the population data: PWGTP

### Setup Kaggle API

In [5]:
# install kaggle API for providing the dataset
!pip install --user kaggle

Processing /home/jovyan/.cache/pip/wheels/aa/e7/e7/eb3c3d514c33294d77ddd5a856bdd58dc9c1fabbed59a02a2b/kaggle-1.5.6-py3-none-any.whl
Installing collected packages: kaggle
Successfully installed kaggle-1.5.6


In [2]:
# add kaggle to PATH environment variable
import os
os.environ["PATH"] += os.pathsep + "/home/jovyan/.local/bin"

In [14]:
!mkdir ~/.kaggle
!echo '{"username":"teresatanzi","key":"a64ec0d865925975d3318adf576216b7"}' >> ~/.kaggle/kaggle.json
!chmod 600 ~/.kaggle/kaggle.json

In [18]:
!export KAGGLE_USERNAME=teresatanzi
!export KAGGLE_KEY=a64ec0d865925975d3318adf576216b7

In [4]:
# uncomment to uninstall kaggle
#!python -c "print('y')" | pip uninstall kaggle

Found existing installation: kaggle 1.5.6
Uninstalling kaggle-1.5.6:
  Would remove:
    /home/jovyan/.local/bin/kaggle
    /home/jovyan/.local/lib/python3.7/site-packages/kaggle-1.5.6.dist-info/*
    /home/jovyan/.local/lib/python3.7/site-packages/kaggle/*
Proceed (y/n)?   Successfully uninstalled kaggle-1.5.6


In [66]:
!pyspark --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
                        
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_252
Branch HEAD
Compiled by user centos on 2020-02-02T19:38:06Z
Revision cee4ecbb16917fa85f02c635925e2687400aa56b
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.


### Dataset download

The dataset should not be added to the repository, but downloaded during code execution, for instance via the kaggle API 

https://github.com/Kaggle/kaggle-api

In [8]:
!mkdir ./data
!kaggle datasets download census/2013-american-community-survey -p ./data

Downloading 2013-american-community-survey.zip to ./data
100%|███████████████████████████████████████▉| 916M/916M [02:56<00:00, 5.74MB/s]
100%|████████████████████████████████████████| 916M/916M [02:56<00:00, 5.45MB/s]


In [9]:
import zipfile

with zipfile.ZipFile("./data/2013-american-community-survey.zip","r") as zip_ref:
    zip_ref.extractall("./data/2013-american-community-survey")

### Reading the data

The task is to implement from scratch a learning algorithm for regression with square loss (e.g., ridge regression). The label to be predicted must be selected among the following 5 attributes, removing the remaining 4 from the dataset:

- pusa dataset:
    - PERNP (Person's earnings)
    - PINCP (Person's income)
    - WAGP (Wages or salary income past 12 months)
- husa dataset:
    - HINCP (Household income)
    - FINCP (Family income)

In [1]:
import pyspark

sc = pyspark.SparkContext('local[*]')

In [2]:
import os.path

baseDir = os.path.join('./data/2013-american-community-survey')
#inputPathA = os.path.join('ss13pusa.csv')
#inputPathB = os.path.join('ss13pusb.csv')
inputPathA = os.path.join('ss13husa.csv')
inputPathB = os.path.join('ss13husb.csv')
fileNameA = os.path.join(baseDir, inputPathA)
fileNameB = os.path.join(baseDir, inputPathB)

#### Dataframe

In [3]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [4]:
dfA = sqlContext.read.csv(fileNameA, header=True)
dfB = sqlContext.read.csv(fileNameB, header=True)

df = dfB.union(dfA)

In [5]:
n = df.count()

In [6]:
headerList = df.columns
print(len(headerList))

231


In [7]:
df.printSchema()

root
 |-- RT: string (nullable = true)
 |-- SERIALNO: string (nullable = true)
 |-- DIVISION: string (nullable = true)
 |-- PUMA: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- ADJHSG: string (nullable = true)
 |-- ADJINC: string (nullable = true)
 |-- WGTP: string (nullable = true)
 |-- NP: string (nullable = true)
 |-- TYPE: string (nullable = true)
 |-- ACCESS: string (nullable = true)
 |-- ACR: string (nullable = true)
 |-- AGS: string (nullable = true)
 |-- BATH: string (nullable = true)
 |-- BDSP: string (nullable = true)
 |-- BLD: string (nullable = true)
 |-- BROADBND: string (nullable = true)
 |-- BUS: string (nullable = true)
 |-- COMPOTHX: string (nullable = true)
 |-- CONP: string (nullable = true)
 |-- DIALUP: string (nullable = true)
 |-- DSL: string (nullable = true)
 |-- ELEP: string (nullable = true)
 |-- FIBEROP: string (nullable = true)
 |-- FS: string (nullable = true)
 |-- FULP: string (nullable = true)
 |-- GA

In [44]:
df.show(n = 1)

+---+--------+--------+-----+------+---+-------+-------+-----+---+----+------+----+----+----+----+---+--------+----+--------+----+------+----+----+-------+---+----+----+--------+---+----+------+----+-----+----+----+----+----+--------+----+----+----+-----+----+------+---------+----+----+----+---+---+----+----+----+---+----+---+----+------+------+------+-----+-----+------+-----+-----+---+---+---------+-----+-----+------+------+---+----+-----+---+---+----+---+---+---+-----+-------+---+---+---+---+---+-------+-----+----+----+----+----+----+----+-------+--------+--------+-----+-----+------+-----+-----+----------+-----+----------+-----+--------+-----+-----+---------+----+-----+-----+----------+-----+-----+-----+--------+----+-------+------+-----+------+------+----+----------+-----+------+-----+------+-----+------+--------+-----------+------+----+------+------+------+-----+-----+-----+------+------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+---

## Preprocessing

We should trasform data in objects belonging to the class LabeledPoint, but first we have to solve some issues:

- we have to deal with missing values: we can substitute those with mean or median of the corresponding column; <br>
https://towardsdatascience.com/how-to-handle-missing-data-8646b18db0d4
- we have to deal with categorical values: it should be fine to just discard the feature, because only the first feature is categorical and it has a constant value for all the entry;
- we have to deal with missing values in the labels: we could discard data points with no label, because they don't really help in the training process.

In [7]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, when, count, col
import numpy as np
from pyspark.ml.feature import Imputer

def preprocess (df, na_threshold, label, imputer_strategy):
    """Preprocess a PySpark DataFrame, dealing with null values.

    Args:
        df (PySpark DataFrame): DataFrame read by the csv file.
        na_threshold (float between 0 and 1): threshold thad establish if a feature should be dropped or not
            based on its percentage of null values.
        label (string): feature that corresponds to the chosen label to be predicted.
        imputer_strategy (string): it can be `mean` or `median`, based on the imputation strategy we choose.

    Returns:
        PySpark DataFrame: Restult of the processing of the original DataFrame.
    """
    
    # remove RT: it is categorical and it has a constant value for all the data points in the dataset
    df = df.drop('RT')
    
    # remove all the features that correspond to possible labels, but are not the chosen one
    possible_labels = ['PERNP', 'PINCP', 'WAGP'] if inputPathA == 'ss13pusa.csv' else ['HINCP', 'FINCP']        
    possible_labels.remove(label)
    
    for i in possible_labels:
        df = df.drop(i)
    
    # cast features into double
    for i in df.columns:
        df = df.withColumn(i, df[i].cast(DoubleType()))
        
    # compute the dataframe containing, for each of the features, the percentange of null values
    null_df = df.select([(count(when(col(c).isNull(), c))/n).alias(c) for c in df.columns])
    
    # remove form the original dataframe all the attributes that have more null values than a given threshold
    scheme = df.columns
    null_distr = null_df.collect()[0].asDict().values()
    
    for i in np.where(np.array(list(null_distr)) > na_threshold)[0]:
        df = df.drop(scheme[i])
        
    print('We reduced the number of attributes from {} to {}.'.format(len(headerList), len(df.columns)))
    
    # remove all the data points with null value for the label
    df = df.filter(df[label].isNotNull())
    
    # replace all the missing values with the mean value of the corresponding feature
    imputer = Imputer()
    imputer.setInputCols(df.columns)
    imputer.setOutputCols(df.columns)
    imputer.setStrategy(imputer_strategy)

    df = imputer.fit(df).transform(df)

    return df
        
na_threshold = .6
label = 'HINCP'
imputer_strategy = 'mean'

df = preprocess(df, na_threshold, label, imputer_strategy)

We reduced the number of attributes from 231 to 215.


Preprocessing is almost done. We may want to:

- Remove column SERIALNO (it is just a unique number for every observation);
- Tune the threshold for the feature dropping based on the percentage of missing values.

### Labeled points

Let us now cast all the data point into `LabeledPoint`. The label is the one we select from the 5 different possible labels given by the project, while all the other possible labels have to be removed from the feature vector.

In [8]:
from pyspark.mllib.regression import LabeledPoint

def parsePoint(row):
    """Converts a row of a pyspark dataframe into a `LabeledPoint`.

    Args:
        row: Row of a dataframe composed all of double values. One of those values corresponds to the label, while
            other possible lables have to be removed.

    Returns:
        LabeledPoint: The line is converted into a `LabeledPoint`, which consists of a label and
            features.
    """
    
    # cast row into a dictionary
    row_dict = row.asDict()
    label_value = row_dict[label]
    
    del row_dict[label]
    feature_list = list(row_dict.values())
    
    return LabeledPoint(label_value, feature_list)

#parsedSamplePoints = list(map(parsePoint, df.take(5)))
#firstPointFeatures = parsedSamplePoints[0].features
#firstPointLabel = parsedSamplePoints[0].label
#print('{}, {}'.format(firstPointFeatures, firstPointLabel))

parsedData = df.rdd.map(lambda s: parsePoint(s))

parsedExamplePoint = parsedData.take(1)
examplePointFeatures = parsedExamplePoint[0].features
examplePointLabel = parsedExamplePoint[0].label
print('{}, {}'.format(examplePointFeatures, examplePointLabel))

d = len(examplePointFeatures)
print(d)

[76.0,4.0,1802.0,2.0,29.0,1000000.0,1007549.0,229.0,2.0,1.0,3.0,1.3062936394368012,1.0,2.0,7.0,1.5566063335988354,1.9844443820931135,2.0,1.9596121838007086,1.6977955999890137,80.0,1.890910489164767,2.0,2.0,3.0,1.0,3.0,910.6082502817558,1.0,1.44533933917438,1.7879980881910393,1.9832184350023345,1.0,4.0,1.0,9.0,1.932411217006784,1.0,1.0,1.0,3.0,1.0,248086.86971966035,2.0,360.0,5.0,3.123169246051848,0.0,1.0,3.134906236307153,0.31369008632643547,1.0,5.0,0.0,4.0,4.0,4.0,1.0,1.0,1.0,1.0,0.0,3.0029736555778195,0.0,1.0,0.0,24.084478191098956,0.0,1.0,0.0,0.0,0.0,0.0,2.0,1227.739407457235,1.0,0.0,0.0,32.50750385017082,1.4557422569258112,6.382029815328111,5.843664053098351,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,383.0,63.0,431.0,340.0,211.0,212.0,84.0,68.0,326.0,168.0,192.0,243.0,352.0,250.0,291.0,202.0,84.0,281.0,274.0,76.0,418.0,

In [65]:
#parsedData.take(5)

## Learning algorithm

### Training, validation and test sets

In [9]:
# lentissimo (può darsi che sia solo per i count)

weights = [.8, .1, .1]
seed = 42
parsedTrainData, parsedValData, parsedTestData = parsedData.randomSplit(weights, seed = seed)
parsedTrainData.cache()
parsedValData.cache()
parsedTestData.cache()
#nTrain = parsedTrainData.count()
#nVal = parsedValData.count()
#nTest = parsedTestData.count()

#print('{} {} {} {}'.format(nTrain, nVal, nTest, nTrain + nVal + nTest))
#print(parsedData.count())

PythonRDD[77] at RDD at PythonRDD.scala:53

In [10]:
parsedTrainData.take(1)

[LabeledPoint(37100.0, [76.0,4.0,1802.0,2.0,29.0,1000000.0,1007549.0,229.0,2.0,1.0,3.0,1.3062936394368012,1.0,2.0,7.0,1.5566063335988354,1.9844443820931135,2.0,1.9596121838007086,1.6977955999890137,80.0,1.890910489164767,2.0,2.0,3.0,1.0,3.0,910.6082502817558,1.0,1.44533933917438,1.7879980881910393,1.9832184350023345,1.0,4.0,1.0,9.0,1.932411217006784,1.0,1.0,1.0,3.0,1.0,248086.86971966035,2.0,360.0,5.0,3.123169246051848,0.0,1.0,3.134906236307153,0.31369008632643547,1.0,5.0,0.0,4.0,4.0,4.0,1.0,1.0,1.0,1.0,0.0,3.0029736555778195,0.0,1.0,0.0,24.084478191098956,0.0,1.0,0.0,0.0,0.0,0.0,2.0,1227.739407457235,1.0,0.0,0.0,32.50750385017082,1.4557422569258112,6.382029815328111,5.843664053098351,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,383.0,63.0,431.0,340.0,211.0,212.0,84.0,68.0,326.0,168.0,192.0,243.0,352.0,250.0,291.0,202.0,84.0,

In [11]:
parsedValData.take(1)

[LabeledPoint(94050.0, [695.0,4.0,2001.0,2.0,29.0,1000000.0,1007549.0,74.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,2.0,2.0,1.0,80.0,2.0,2.0,2.0,3.0,1.0,3.0,1600.0,1.0,2.0,1.0,2.0,1.0,5.0,1.0,9.0,2.0,1.0,1.0,1.0,1.0,1.0,95000.0,2.0,60.0,6.0,1.0,0.0,0.0,4.0,0.0,1.0,1.0,0.0,4.0,4.0,4.0,1.0,1.0,1.0,6.0,0.0,2.0,0.0,0.0,0.0,10.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,808.0,0.0,0.0,1.0,19.0,2.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,116.0,20.0,22.0,61.0,74.0,95.0,126.0,95.0,23.0,67.0,22.0,23.0,123.0,65.0,102.0,68.0,69.0,159.0,85.0,82.0,27.0,123.0,143.0,73.0,66.0,69.0,21.0,70.0,116.0,76.0,117.0,137.0,25.0,70.0,29.0,79.0,69.0,21.0,76.0,61.0,26.0,134.0,135.0,82.0,62.0,61.0,22.0,71.0,120.0,75.0,121.0,110.0,26.0,86.0,25.0,101.0,71.0,20.0,71.0,74.0,122.0,23.0,23.0,74.0,80.0,71.0,131.0,72.0,23.0,82.0,19.0,23.0,152.0,83.0,120.0,78.0,74.0,125

In [12]:
parsedTestData.take(1)

[LabeledPoint(41000.0, [802.0,4.0,1004.0,2.0,29.0,1000000.0,1007549.0,73.0,3.0,1.0,1.0,1.0,1.0,4.0,2.0,2.0,2.0,2.0,2.0,1.0,360.0,2.0,2.0,2.0,50.0,2.0,3.0,1300.0,1.0,2.0,1.0,2.0,1.0,10.0,1.0,9.0,2.0,1.0,1.0,1.0,1.0,1.0,300000.0,2.0,50.0,13.0,2.0,1.0,1.0,4.0,0.0,1.0,1.0,0.0,4.0,4.0,4.0,1.0,1.0,1.0,2.0,0.0,3.0,0.0,0.0,0.0,68.0,0.0,1.0,0.0,0.0,1.0,0.0,3.0,2314.0,0.0,0.0,1.0,68.0,2.0,7.0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,104.0,23.0,67.0,147.0,69.0,62.0,62.0,114.0,66.0,104.0,14.0,83.0,22.0,88.0,69.0,63.0,30.0,19.0,122.0,68.0,134.0,71.0,25.0,80.0,20.0,23.0,124.0,78.0,89.0,67.0,86.0,97.0,69.0,108.0,21.0,21.0,76.0,93.0,64.0,14.0,60.0,123.0,84.0,18.0,92.0,90.0,67.0,17.0,74.0,32.0,126.0,83.0,93.0,77.0,88.0,78.0,130.0,119.0,24.0,76.0,20.0,65.0,136.0,62.0,115.0,121.0,26.0,63.0,22.0,94.0,95.0,19.0,59.0,22.0,108.0,122.0,68.0,

Maybe we should shift labels in order to make them begin from 0 (?)

In [14]:
# lento

onlyLabels = parsedData.map(lambda p: p.label)
minLbl = onlyLabels.min()
maxLbl = onlyLabels.max()
print(maxLbl, minLbl)

2090000.0 -19770.0


### Average baseline model

We predict for every data point the avarage label of the points in training set.

In [15]:
averageTrainLbl = (parsedTrainData
                    .map(lambda p: p.label)
                    .mean())

print(averageTrainLbl)

75229.84229664435


#### Square Loss

\begin{equation}
    l(\underline{w}) = \sum_{j=1}^n (\hat{y}^{(j)} - y^{(j)})^2
\end{equation}

Where:

- $\underline{w}$ is the weight vector;
- $\hat{y}^{(j)} = \underline{w} \cdot \underline{x}^{(j)}$ is our prediction with respect to the $j$-th observation;
- $y^{(j)}$ is the actual label of the $j$-th observation.

\begin{equation}
    l(\underline{w}) = \sum_{j=1}^n (\underline{w} \cdot \underline{x}^{(j)} - y^{(j)})^2
\end{equation}

In [13]:
def squaredError(label, prediction):
    """Calculates the the squared error for a single prediction.

    Args:
        label (float): The correct value for this observation.
        prediction (float): The predicted value for this observation.

    Returns:
        float: The difference between the `label` and `prediction` squared.
    """
    
    return (label - prediction) ** 2

def calcRMSE(labelsAndPreds):
    """Calculates the root mean squared error for an `RDD` of (label, prediction) tuples.

    Args:
        labelsAndPred (RDD of (float, float)): An `RDD` consisting of (label, prediction) tuples.

    Returns:
        float: The square root of the mean of the squared errors.
    """
    
    return np.sqrt(labelsAndPreds.map(lambda p: squaredError(*p)).mean())

In [15]:
labelsAndPreds = sc.parallelize([(3., 1.), (1., 2.), (2., 2.)])
# RMSE = sqrt[((3-1)^2 + (1-2)^2 + (2-2)^2) / 3] = 1.291
exampleRMSE = calcRMSE(labelsAndPreds)
print(exampleRMSE)

1.2909944487358056


In [14]:
labelsAndPredsTrain = parsedTrainData.map(lambda p: (p.label, averageTrainLbl))
rmseTrainBase = calcRMSE(labelsAndPredsTrain)

labelsAndPredsVal = parsedValData.map(lambda p: (p.label, averageTrainLbl))
rmseValBase = calcRMSE(labelsAndPredsVal)

labelsAndPredsTest = parsedTestData.map(lambda p: (p.label, averageTrainLbl))
rmseTestBase = calcRMSE(labelsAndPredsTest)

print('Baseline Train RMSE = {0:.3f}'.format(rmseTrainBase))
print('Baseline Validation RMSE = {0:.3f}'.format(rmseValBase))
print('Baseline Test RMSE = {0:.3f}'.format(rmseTestBase))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 65, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1065, in <lambda>
    return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/statcounter.py", line 42, in __init__
    for v in values:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-14-4267e37cc023>", line 1, in <lambda>
NameError: name 'averageTrainLbl' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1065, in <lambda>
    return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/statcounter.py", line 42, in __init__
    for v in values:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-14-4267e37cc023>", line 1, in <lambda>
NameError: name 'averageTrainLbl' is not defined

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$15.apply(RDD.scala:990)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


### Ridge regression

Learn the best weight vector $\underline{w}^{*}$:

\begin{equation}
\underline{w}^{*} = \operatorname*{argmin}_{\underline{w}} ||X \cdot \underline{w} - \underline{y}||^2 + \lambda ||\underline{w}||^2
\end{equation}

where:

- $X$ is a matrix where each row is a data point;
- $\underline{y}$ is the vector of the labels;
- $\lambda > 0$ is the regularization parameter (it has to be chosen before hands).

We can find the optimal value $\underline{w}^{*}$ computing the gradient of the function:

\begin{equation}
\underline{w}^{*} = (X^T X + \lambda I)^{-1} X^T \underline{y}
\end{equation}

#### Gradient descent

If the number $d$ of features and the number $n$ of data points in our dataset are too big, the complexity in time and space of our algorithm may be too high and we may not be able to compute the optimal value for $\underline{w}$. To solve this problem, we can rely on the gradient descent procedure:

1. choose $\underline{w}_0 \in R^d$
2. $i = 0$
3. while (!stop) {
    1. $\underline{w}_{i+1} = \underline{w}_i - \xi_i \left[ \sum_{j=1}^n (\underline{w}_i \cdot \underline{x}^{(j)} - y^{(j)}) \underline{x}^{(j)} + \lambda \underline{w}_i \right]$
    2. i++
4. }
5. return $\underline{w}_i$

Where:

- $\underline{w_i}$ is the approximated optimal weight vector;
- stop is a termination criterion of our choice;
- $\xi_i$ is the learning rate:
    \begin{equation}
        \xi_i = \frac{\xi_0}{n \sqrt{i}}
    \end{equation}

Let us focus on the update:
\begin{equation}
    \underline{w}_{i+1} = \underline{w}_i - \xi_i \left[ \sum_{j=1}^n (\underline{w}_i \cdot \underline{x}^{(j)} - y^{(j)}) \underline{x}^{(j)} + \lambda \underline{w}_i \right]
\end{equation}

Let us begin writing a function that computes the summand of the update: $(\underline{w}_i \cdot \underline{x}^{(j)} - y^{(j)}) \underline{x}^{(j)}$. 

In [15]:
from pyspark.mllib.linalg import DenseVector

def gradientSummand(weights, lp):
    """Calculates the gradient summand for a given weight and `LabeledPoint`.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        weights (DenseVector): An array of model weights (betas).
        lp (LabeledPoint): The `LabeledPoint` for a single observation.

    Returns:
        DenseVector: An array of values the same length as `weights`.  The gradient summand.
    """
    
    return (weights.dot(DenseVector(lp.features)) - lp.label) * lp.features

In [17]:
exampleW = DenseVector([1, 1, 1])
exampleLP = LabeledPoint(2.0, [3, 1, 4])
# gradientSummand = (dot([1 1 1], [3 1 4]) - 2) * [3 1 4] = (8 - 2) * [3 1 4] = [18 6 24]
summandOne = gradientSummand(exampleW, exampleLP)
print(summandOne)

exampleW = DenseVector([.24, 1.2, -1.4])
exampleLP = LabeledPoint(3.0, [-1.4, 4.2, 2.1])
summandTwo = gradientSummand(exampleW, exampleLP)
print(summandTwo)

[18.0,6.0,24.0]
[1.7304000000000002,-5.191200000000001,-2.5956000000000006]


Next, we write the function that makes the prediction, based on the weights vector $\underline{w}$ and the observation $\underline{x}^{(j)}$:

\begin{equation}
    \hat{y}^{(j)} = \underline{w} \cdot \underline{x}^{(j)}
\end{equation}

In [16]:
def getLabeledPrediction(weights, observation):
    """Calculates predictions and returns a (label, prediction) tuple.

    Note:
        The labels should remain unchanged as we'll use this information to calculate prediction
        error later.

    Args:
        weights (np.ndarray): An array with one weight for each features in `trainData`.
        observation (LabeledPoint): A `LabeledPoint` that contain the correct label and the
            features for the data point.

    Returns:
        tuple: A (label, prediction) tuple.
    """
    
    return (observation.label, weights.dot(DenseVector(observation.features)))

In [19]:
weights = np.array([1.0, 1.5])
predictionExample = sc.parallelize([LabeledPoint(2, np.array([1.0, .5])),
                                    LabeledPoint(1.5, np.array([.5, .5]))])
labelsAndPredsExample = predictionExample.map(lambda lp: getLabeledPrediction(weights, lp))
print(labelsAndPredsExample.collect())

[(2.0, 1.75), (1.5, 1.25)]


Finally, we can write the code that learns the best weight vector $\underline{w}^*$ using the gradient descent procedure.

In [17]:
def ridgeRegression(trainData, nIters, regFactor):
    """Calculates the weights and error for a linear regression model trained with gradient descent.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        trainData (RDD of LabeledPoint): The labeled data for use in training the model.
        nIters (int): The number of iterations of gradient descent to perform.
        regFactor (float greater then 0): Regularization factor used in ridge regression.

    Returns:
        (np.ndarray, np.ndarray): A tuple of (weights, training errors).  Weights will be the
            final weights (one weight per feature) for the model, and training errors will contain
            an error (RMSE) for each iteration of the algorithm.
    """    
    
    # number of data points in training set
    # TODO: count è lenta
    n = trainData.count()
    
    # number of dimensions
    d = len(trainData.take(1)[0].features)
    
    # initialize weight vector with a vector of d zero components
    w = np.zeros(d)
    
    # initialize learning rate
    csi = 1
    
    # we will compute e store the training error after each iteration in order to evaluate the process
    errorTrain = np.zeros(nIters)
    
    for i in range(nIters):
        # make the prediction with the current learnt weight vector
        labelsAndPredsTrain = trainData.map(lambda p: getLabeledPrediction(w, p))
        
        # compute the error and add it to the list
        errorTrain[i] = calcRMSE(labelsAndPredsTrain)
        
        # compute the gradient
        # TODO: collect è lenta
        gradient = sum([DenseVector(gradientSummand(w, lp)) for lp in trainData.collect()])
        #+ (regFactor * w)
        
        # update the learning rate
        csi = csi / (n * np.sqrt(i + 1))
        
        # update the weights
        w -= csi * gradient
        
    return w, errorTrain

In [18]:
# create a toy dataset with n = 10, d = 3, and then run 5 iterations of gradient descent
# note: the resulting model will not be useful; the goal here is to verify that
# linregGradientDescent is working properly
exampleN = 10
exampleD = 3
exampleData = (sc
               .parallelize(parsedTrainData.take(exampleN))
               .map(lambda lp: LabeledPoint(lp.label, lp.features[0:exampleD])))
print(exampleData.take(2))
exampleNumIters = 5
exampleWeights, exampleErrorTrain = ridgeRegression(exampleData, exampleNumIters, 0)
print(exampleWeights)

[LabeledPoint(37100.0, [76.0,4.0,1802.0]), LabeledPoint(76000.0, [86.0,4.0,1600.0])]
[1.30412730e+21 1.36480985e+19 6.28608319e+21]


Now, we can train our ridge regressor on the trainig data we have and evaluate it on our validation set.

In [None]:
numIters = 500
regFactor = .5

weightsLR0, errorTrainLR0 = ridgeRegression(parsedTrainData, numIters, regFactor)

labelsAndPreds = parsedValData.map(lambda p: getLabeledPrediction(weightsLR0, p))
rmseValLR0 = calcRMSE(labelsAndPreds)

print('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}'.format(rmseValBase,
                                                                       rmseValLR0))