# Data Structures

## Local Vector
A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.

MLlib recognizes the following types as dense vectors:

NumPy’s [array](http://docs.scipy.org/doc/numpy/reference/generated/numpy.array.html)

Python’s list, e.g., [1, 2, 3]

In [1]:
import numpy as np
from pyspark.mllib.linalg import Vectors

# create through numpy array
v1 = Vectors.dense(np.array([1.0, 2.0, 3.0]))
# create through Python's list
v2 = Vectors.dense([0, 1, 2])
# create through 
v3 = Vectors.dense(0, 1, 2)
v1, v2, v3

(DenseVector([1.0, 2.0, 3.0]),
 DenseVector([0.0, 1.0, 2.0]),
 DenseVector([0.0, 1.0, 2.0]))

and the following as sparse vectors:

MLlib’s [SparseVector](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.SparseVector).

SciPy’s [csc_matrix (Compressed Sparse Column matrix)](http://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csc_matrix.html#scipy.sparse.csc_matrix) with a single column

In [2]:
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

# Create a SparseVector through factory methods.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Create a SparseVector through scipy's csc_matrix
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))
sv1, sv2

(SparseVector(3, {0: 1.0, 2: 3.0}),
 <3x1 sparse matrix of type '<class 'numpy.float64'>'
 	with 2 stored elements in Compressed Sparse Column format>)

**We recommend using NumPy arrays over lists for efficiency, and using the factory methods implemented in Vectors to create sparse vectors.**

## LabeledPoint
A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....

In [3]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, Vectors.sparse(3, [0, 2], [1.0, 3.0]))
pos, neg

(LabeledPoint(1.0, [1.0,0.0,3.0]), LabeledPoint(0.0, (3,[0,2],[1.0,3.0])))

## Local matrix
A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. MLlib supports dense matrices, whose entry values are stored in a single double array in column-major order, and sparse matrices, whose non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order. For example, the following dense matrix

\begin{pmatrix}
1.0 & 2.0 \\
3.0 & 4.0 \\
5.0 & 6.0
\end{pmatrix}

is stored in a one-dimensional array [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] with the matrix size (3, 2).

The base class of local matrices is Matrix, and we provide two implementations: DenseMatrix, and SparseMatrix. We recommend using the factory methods implemented in Matrices to create local matrices. Remember, local matrices in MLlib are stored in column-major order.

In [4]:
from pyspark.ml.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
dm2, sm

(DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False),
 SparseMatrix(3, 2, [0, 1, 3], [0, 2, 1], [9.0, 6.0, 8.0], False))

## Distributed Matrix
A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.

The basic type is called [RowMatrix](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix). A RowMatrix is a row-oriented distributed matrix without meaningful row indices, e.g., a collection of feature vectors. It is backed by an RDD of its rows, where each row is a local vector. We assume that the number of columns is not huge for a RowMatrix so that a single local vector can be reasonably communicated to the driver and can also be stored / operated on using a single node. 

An [IndexedRowMatrix](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.IndexedRowMatrix) is similar to a RowMatrix but with row indices, which can be used for identifying rows and executing joins. 

A [CoordinateMatrix](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.CoordinateMatrix) is a distributed matrix stored in coordinate list (COO) format, backed by an RDD of its entries. 

A [BlockMatrix](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.BlockMatrix) is a distributed matrix backed by an RDD of MatrixBlock which is a tuple of (Int, Int, Matrix).

### RowMatrix

A RowMatrix is a row-oriented distributed matrix without meaningful row indices, backed by an RDD of its rows, where each row is a local vector. Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.

A [RowMatrix](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix) can be created from an RDD of vectors.

Refer to the [RowMatrix Python docs](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix) for more details on the API.

In [5]:
from pyspark.mllib.linalg.distributed import RowMatrix

# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows

## DataFrame

**This is different from `pandas.DataFrame`!**

[DataFrame](https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#datasets-and-dataframes) is a new data structure introduced to support `spark.ml` library. I find a good summary of the differences between `ml` and `mllib` in http://yuqli.com/?p=2330. 

In [6]:
# Note that we're using the ml version of Vectors
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1.218, Vectors.dense(1.560, -0.605)),
    (2.949, Vectors.dense(0.346, 2.158)),
    (3.627, Vectors.dense(1.380, 0.231)),
    (0.273, Vectors.dense(0.520, 1.151)),
    (4.199, Vectors.dense(0.795, -0.226))], ["label", "features"])
df.show()

+-----+--------------+
|label|      features|
+-----+--------------+
|1.218| [1.56,-0.605]|
|2.949| [0.346,2.158]|
|3.627|  [1.38,0.231]|
|0.273|  [0.52,1.151]|
|4.199|[0.795,-0.226]|
+-----+--------------+



# A Regression Example

In this tutorial, we will be using the same data set used by The Elements of Statistical Machine Learning, which comes from a study by Stamey et al. (1989) that examined the correlation between the level of prostate specific antigen (PSA) and a number of clinical measures, in 97 men who were about to receive a radical prostatectonmy.

Through this simple example, we hope you can learn the following topics:
  * Read text data into RDD of `LabeledPoint`s and `DataFrame`;
  * View basic statistics such as correlation matrix;
  * Transform data using `StandardScaler`;
  * Train a regression model using `LinearRegression`;
  * Train a regression model with regularization (e.g. Ridge), and use cross-validation to choose the best model.


## Download and Read Data

As the first step, we will download the data file and save it locally. The file has the following format: 
```
        lcavol  lweight age     lbph    svi     lcp     gleason pgg45   lpsa    train
1       -0.579818495    2.769459        50      -1.38629436     0       -1.38629436     6         0     -0.4307829      T
2       -0.994252273    3.319626        58      -1.38629436     0       -1.38629436     6         0     -0.1625189      T
3       -0.510825624    2.691243        74      -1.38629436     0       -1.38629436     7        20     -0.1625189      T
4       -1.203972804    3.282789        58      -1.38629436     0       -1.38629436     6         0     -0.1625189      T
5        0.751416089    3.432373        62      -1.38629436     0       -1.38629436     6         0      0.3715636      T
6       -1.049822124    3.228826        50      -1.38629436     0       -1.38629436     6         0      0.7654678      T
```

There're 8 predictors (column 1--8) and the outcome is `lpsa` (column 9). This last column indicates which 67 observations were used as the 
"training set" and which 30 as the test set, as described on page 48
in the book.

In [7]:
from urllib.request import urlretrieve
from pyspark.mllib.regression import LabeledPoint

# retrieve the data and save it locally
f = urlretrieve("https://web.stanford.edu/~hastie/ElemStatLearn/datasets/prostate.data", 
                "prostate.data")

# read data as LabeledPoint RDDs
def parse_data_point(row):
    values = row.split('\t')
    is_train = values[-1] == 'T'
    label = float(values[-2])
    features = [float(v) for v in values[1:-2]] # skip the id column
    return is_train, LabeledPoint(label, features)

# read psa data into RDD of strings
psa_file = sc.textFile('prostate.data')
# skip the header row
header = psa_file.first()
# split the first row to get the list of header names
feature_labels = header.split('\t')[1:-2]
print('Predictors: %s' % (feature_labels))

psa_data = psa_file.filter(lambda x: x != header).map(parse_data_point)
print('Total rows: %s' % psa_data.count())

Predictors: ['lcavol', 'lweight', 'age', 'lbph', 'svi', 'lcp', 'gleason', 'pgg45']
Total rows: 97


### Feature transformation

Standardizes features by scaling to unit variance and/or removing the mean using column summary statistics on the samples in the training set. This is a very common pre-processing step.

In [8]:
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors

label = psa_data.map(lambda x: x[1].label)
features = psa_data.map(lambda x: x[1].features)
is_train = psa_data.map(lambda x: x[0])

scaler = StandardScaler(withMean=True, withStd=True).fit(features)
# use the scaler to transform the original data
std_psa_data = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray())))) \
               .map(lambda x: LabeledPoint(x[0], x[1]))
# add the is_train data back
std_psa_data = is_train.zip(std_psa_data)

### Split training/test data set

Normally we would use `data.randomSplit` to split the data into training and test sets. Since the data contains the split information, we'll just use it. 

In [9]:
from pyspark.ml.linalg import Vectors

data_training = std_psa_data.filter(lambda x: x[0]).map(lambda x: x[1])
data_test = std_psa_data.filter(lambda x: not x[0]).map(lambda x: x[1])

df_data_training = data_training.map(
    lambda x: [x.label, Vectors.dense(x.features.toArray())]
).toDF(['label', 'features'])

df_data_test = data_test.map(
    lambda x: [x.label, Vectors.dense(x.features.toArray())]
).toDF(['label', 'features'])


data_training.count(), data_test.count()

(67, 30)

### Basic Statistics

In [10]:
import numpy as np
from pyspark.mllib.stat import Statistics

np.round(Statistics.corr(data_training.map(lambda x: x.features)), 3)

array([[ 1.   ,  0.3  ,  0.286,  0.063,  0.593,  0.692,  0.426,  0.483],
       [ 0.3  ,  1.   ,  0.317,  0.437,  0.181,  0.157,  0.024,  0.074],
       [ 0.286,  0.317,  1.   ,  0.287,  0.129,  0.173,  0.366,  0.276],
       [ 0.063,  0.437,  0.287,  1.   , -0.139, -0.089,  0.033, -0.03 ],
       [ 0.593,  0.181,  0.129, -0.139,  1.   ,  0.671,  0.307,  0.481],
       [ 0.692,  0.157,  0.173, -0.089,  0.671,  1.   ,  0.476,  0.663],
       [ 0.426,  0.024,  0.366,  0.033,  0.307,  0.476,  1.   ,  0.757],
       [ 0.483,  0.074,  0.276, -0.03 ,  0.481,  0.663,  0.757,  1.   ]])


### Linear Regression

Since [LinearRegressionWithSGD](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html#pyspark.mllib.regression.LinearRegressionWithSGD) has been deprecated in spark 2.0.0, we will need to use `ml.regression.LinearRegression` instead, which requires us to convert the input into `DataFrame` format. 

We will then show how to use `LinearRegression` to train the model.

In [102]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# no regularization
reg_param = 0.1
lr = LinearRegression(maxIter=10, regParam=reg_param)

# Fit the model
lrModel = lr.fit(df_data_training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(np.round(lrModel.coefficients, 3)))
print("Intercept: %s" % str(round(lrModel.intercept, 3)))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print('\n')
print('Training Summary:')
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
print('MSE = %f' % trainingSummary.meanSquaredError)
print("RMSE = %f" % trainingSummary.rootMeanSquaredError)
print("R-squared = %f" % trainingSummary.r2)
print("MAE = %f" % trainingSummary.meanAbsoluteError)
print("Explained variance = %f" % trainingSummary.explainedVariance)

# Prediction
lrPred = lrModel.transform(df_data_test)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction")
mse = evaluator.evaluate(lrPred, {evaluator.metricName: "mse"})
sde = (lrPred.rdd.map(lambda x: (x.prediction - x.label)**2).variance() / (lrPred.count() - 1))**0.5
print('\n')
print('Test Summary:')
print("MSE = %g" % mse)
print('Std Error = %g' % sde)

Coefficients: [ 0.57   0.257 -0.107  0.199  0.279 -0.151  0.015  0.194]
Intercept: 2.467


Training Summary:
numIterations: 1
objectiveHistory: [0.0]
MSE = 0.449112
RMSE = 0.670158
R-squared = 0.687474
MAE = 0.506446
Explained variance = 0.883040


Test Summary:
MSE = 0.492781
Std Error = 0.162808


### Ridge Regression (with tuning)
Now let's introduce the Ridge regularizor and use cross-validation to tune the hyperparams. 
To get the list of regparam to iterate through, we will need to solve $\lambda$ in the formula 
$$df(\lambda) = \sum_{i=1}^p\frac{d_i^2}{d_i^2+\lambda}$$
where $d_i$ is the $i$th diagonal entry of the matrix $D$ in singular value decomposition of the features matrix $X$
$$X = UDV^T$$

In [103]:
get_optimal_lambdas(data_training.map(lambda x: x.features), multiple=1.0)

[1.5917807097034505e-15,
 4.87448572221687,
 12.325917405547605,
 24.176282072356294,
 44.21483690462158,
 81.5530281510686,
 164.0241749168161,
 431.60164542313055]

In [87]:
m = RowMatrix(data_training.map(lambda x: x.features))

In [109]:
data_training.map(lambda x: x.features).collect()

[DenseVector([-1.6374, -2.0062, -1.8624, -1.0247, -0.5229, -0.8632, -1.0422, -0.8645]),
 DenseVector([-1.989, -0.722, -0.7879, -1.0247, -0.5229, -0.8632, -1.0422, -0.8645]),
 DenseVector([-1.5788, -2.1888, 1.3612, -1.0247, -0.5229, -0.8632, 0.3426, -0.1553]),
 DenseVector([-2.1669, -0.808, -0.7879, -1.0247, -0.5229, -0.8632, -1.0422, -0.8645]),
 DenseVector([-0.5079, -0.4588, -0.2506, -1.0247, -0.5229, -0.8632, -1.0422, -0.8645]),
 DenseVector([-2.0361, -0.934, -1.8624, -1.0247, -0.5229, -0.8632, -1.0422, -0.8645]),
 DenseVector([-0.5573, -0.2088, -0.7879, 0.9901, -0.5229, -0.8632, -1.0422, -0.8645]),
 DenseVector([-0.9294, -0.0579, 0.1523, -1.0247, -0.5229, -0.8632, -1.0422, -0.8645]),
 DenseVector([-2.2883, -0.0706, -0.1163, 0.8041, -0.5229, -0.8632, -1.0422, -0.8645]),
 DenseVector([0.2235, -1.4147, -0.1163, -1.0247, -0.5229, -0.2993, 0.3426, 0.1992]),
 DenseVector([0.1078, -1.4722, 0.4209, -1.0247, -0.5229, -0.8632, 0.3426, -0.6872]),
 DenseVector([0.1622, -1.3256, 0.2866, -1.0247,

In [88]:
svd = m.computeSVD(8)

In [94]:
sum((svd.s*svd.s) / (svd.s*svd.s + 12.325917405547605))

6.000000024611741

In [44]:
import numpy as np
from pyspark.mllib.linalg.distributed import RowMatrix

def get_optimal_lambdas(features, multiple=1):
    """Return a list of optimal lambdas for cross-validation"""
    p = len(features.first())
    svd = RowMatrix(features).computeSVD(p)
    dj = svd.s # the diagional elements d_j
    k_range = np.arange(p, 0, -1.0 / multiple)
    return [solve_lambda(dj, k) for k in k_range]
    
def solve_lambda(dj, df):
    """Given the diagonal elements d_j, and the degree of freedom, solve for lambda"""
    xn = 1.0 # initial guess
    f = np.sum(dj * dj / (dj * dj + xn)) - df
    fp = - np.sum( dj * dj / (( dj * dj + xn ) * ( dj * dj + xn )))
    xnp1 = xn - f / fp 
    
    while abs(xn - xnp1) / abs(xn) > 0.0001:
        xn = xnp1
        f = np.sum(dj * dj / (dj * dj + xn)) - df
        fp = - np.sum( dj * dj / (( dj * dj + xn ) * ( dj * dj + xn ))) 
        xnp1 = xn - f / fp 

    return xn

In [45]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Ridge regression (by setting elasticNetParam = 0)
rr = LinearRegression(maxIter=10, elasticNetParam=0.0)

lambdas = get_optimal_lambdas(data_training.map(lambda x: x.features))
print('Optimal lambdas: {}'.format(lambdas))

# Here we just make a few wild guesses of lambdas
# It's better to calculate the optimal lambdas by inferring them from effective degree of freeom
paramGrid = ParamGridBuilder() \
    .addGrid(rr.regParam, lambdas) \
    .build()
    
crossval = CrossValidator(estimator=rr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName='mse'),
                          numFolds=10)
# Fit the model
rrModel = crossval.fit(df_data_training)

Optimal lambdas: [1.5917807097034505e-15, 4.87448572221687, 12.325917405547605, 24.176282072356294, 44.21483690462158, 81.5530281510686, 164.0241749168161, 431.60164542313055]


In [47]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(np.round(rrModel.bestModel.coefficients, 3)))
print("Intercept: %s" % str(round(rrModel.bestModel.intercept, 3)))

# Prediction
rrPred = rrModel.transform(df_data_test)

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction")
mse = evaluator.evaluate(rrPred, {evaluator.metricName: "mse"})
sde = (rrPred.rdd.map(lambda x: (x.prediction - x.label)**2).variance() / rrPred.count())**0.5
print('\n')
print('Test Summary:')
print("MSE = %g" % mse)
print('Std Error = %g' % sde)

Coefficients: [ 0.68   0.263 -0.141  0.21   0.305 -0.288 -0.021  0.267]
Intercept: 2.465


Test Summary:
MSE = 0.521274
Std Error = 0.17572


# Pipelines

## Main concepts in Pipelines

MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. This section covers the key concepts introduced by the Pipelines API, where the pipeline concept is mostly inspired by the scikit-learn project.

[DataFrame](https://spark.apache.org/docs/2.2.0/ml-pipeline.html#dataframe): This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.

[Transformer](https://spark.apache.org/docs/2.2.0/ml-pipeline.html#transformers): A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

[Estimator](https://spark.apache.org/docs/2.2.0/ml-pipeline.html#estimators): An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

[Pipeline](https://spark.apache.org/docs/2.2.0/ml-pipeline.html#pipeline): A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

[Parameter](https://spark.apache.org/docs/2.2.0/ml-pipeline.html#parameters): All Transformers and Estimators now share a common API for specifying parameters.

### Pipeline
![Pipeline](https://spark.apache.org/docs/2.2.0/img/ml-Pipeline.png)

### PipelineModel
![PipelineModel](https://spark.apache.org/docs/2.2.0/img/ml-PipelineModel.png)

# References
 * [Data Types - RDD-based API](https://spark.apache.org/docs/2.2.0/mllib-data-types.html)
 * [Linear Methods - RDD-based API](https://spark.apache.org/docs/2.2.0/mllib-linear-methods.html#mjx-eqn-eqregPrimal)