# **MVA toolbox**

## **1. Data reading and preprocessing**

Run the following cell to load the MNIST data as LabelPoint elements.

Note: we have explicitely defined the number of features to be sure that the sparse feature vectors are created with all the dimensions

In [1]:
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "file:///export/usuarios_ml4ds/vanessa/codigo/mnist", numFeatures= 784)

### **Data normalization**

Now, let's normalize the data:
1. Remove the mean of each feature (we have to center the input data, but this is not efficient for sparse data)
2. Reescale each feature to make them have unitary standard deviation

Prepare output variable:
1. Convert $Y$ to 1 vs all codification
2. Normalize Y (remove means)

In [2]:
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from sklearn.preprocessing import label_binarize
from pyspark.sql import Row

# Create two new RDD by extracting the labels and features of the data
label = data.map(lambda x: x.label)
features = data.map(lambda x: Vectors.dense(x.features.toArray()))

# Define the StandardScaler() object and fit it with the data features 
scaler = StandardScaler(withMean=True, withStd=True).fit(features)

# Normalize the data features
features_norm = scaler.transform(features)

# Codify labels
set_classes = label.distinct().collect()
#codedLabel = label.map(lambda x: (x, label_binarize([x], classes=set_classes)))
codedLabel = label.map(lambda x: label_binarize([x], classes=set_classes))

# Normalize coded labes
scaler_labels = StandardScaler(withMean=True, withStd=False).fit(codedLabel)
codedLabel_norm = scaler_labels.transform(codedLabel)

In [3]:
# Create a new RDD of LabeledPoint data using the normalized features
LabeledPointMulticlass = Row("label", "codedLabel", "features")
# 1. Construct a RDD of tuples (label, featatures): check zip() method of RDD objects
data_norm = label.zip(codedLabel_norm).zip(features_norm)
# 2. Create the label point object
data_LP = data_norm.map(lambda x: LabeledPointMulticlass(x[0][0], x[0][1], x[1]))

### **Create training and validation partitions**

In [4]:
# Create trainign and validation partitions
(trainingData, valData) = data_LP.randomSplit([0.4, 0.6], seed=0)

# Our learning algorithms will make several passes over these datasets, so let’s cache these RDD in memory
trainingData.cache()
valData.cache()


PythonRDD[28] at RDD at PythonRDD.scala:53

### **Can we create a kernel matrix?**

In [26]:
from sklearn.metrics.pairwise import euclidean_distances
import numpy as np

trainingData_aux=trainingData.zipWithIndex()
K_training = trainingData_aux.cartesian(trainingData_aux)\
            .map(lambda (x1,x2): (x1[1], (x2[1], np.squeeze(euclidean_distances(x1[0].features, x2[0].features, squared=True)))))\
            .groupByKey()


In [27]:
K_training.take(5)

[(3088, <pyspark.resultiterable.ResultIterable at 0x7f7ec82d1ed0>),
 (7204, <pyspark.resultiterable.ResultIterable at 0x7f7ecc805fd0>),
 (19496, <pyspark.resultiterable.ResultIterable at 0x7f7ecc805e90>),
 (18480, <pyspark.resultiterable.ResultIterable at 0x7f7eca14d690>),
 (13576, <pyspark.resultiterable.ResultIterable at 0x7f7eca14d650>)]

## MVA 

### Define some parameters

In [None]:
from pyspark.mllib.linalg.distributed import RowMatrix
import numpy as np

M = len(set_classes)
R = M -1 # Number of projection vectors

# Precompute uselful matrixes
# Cyx (MxD)!!!!!!
Cyx = trainingData.map(lambda x : np.dot(x.codedLabel[:,np.newaxis],x.features[:,np.newaxis].T)).mean()
# Omega (MxM)
# PCA y OPLS
Omega = np.eye(M) 
Omega_1 = Omega 
# CCA  (TODO)
#Omega = np.diag(np.power((1./N)*(np.diag(np.dot(Y.T, Y))), -.5))
#Omega_1 

### MVA library

In [None]:
# Create MVA library
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
import numpy as np

# This function has to implement Y' = W^T Omega Y
def createPseudoY(Y, W, Omega):
    return np.squeeze(np.dot(np.dot(W.T, Omega), Y.T))


def stepU(W, trainingData, Omega, R, regType='l1', regParam=0.1):
    numFeatures = trainingData.first().features.shape[0]
    U = np.empty((R,numFeatures))

    for r in range(R):
        Wr = W[:,r][:,np.newaxis]
        PseudoY = trainingData.map (lambda x : createPseudoY(x.codedLabel, Wr, Omega))
        Datar = trainingData.zip(PseudoY).map(lambda x: LabeledPoint(x[1], x[0].features))
        # Build the model
        model = LinearRegressionWithSGD.train(Datar, iterations=100, regType=regType, regParam=regParam, step=0.00000001)
        U[r,:] = model.weights
    return U

def stepW(U, Cyx, Omega, Omega_1):
    A = np.dot(Omega, np.dot(Cyx, U.T))  
    # Sergio tiene esta linea:  A = np.dot(np.dot(R, np.dot(Cyx, pinvXY)), R)
    V, D, V2 = np.linalg.svd(A,full_matrices=False)
    W = np.dot(Omega_1,V)
    return W

def computeMSE(U, W, trainingData):
    return trainingData.map(lambda x: np.mean(np.array(x.codedLabel - np.dot(W,np.dot(x.features, U.T)))**2)).mean()

In [None]:
# W initialization  (MxR)
W = np.eye(M,R)
numFeatures = trainingData.first().features.shape[0]
U = np.empty((R, numFeatures))
# Compute MSE 
print computeMSE(U, W, trainingData)
for iter in range(4):

    # Run the iterative process
    U = stepU(W, trainingData, Omega, R, regType='l1', regParam=0.05)
    W = stepW(U, Cyx, Omega, Omega_1)
    print computeMSE(U, W, trainingData)
    

# PLOT the eigenvectors

In [None]:
import matplotlib.pyplot as plt
from pyspark.mllib.linalg import Vectors

def plot_data(images, h, w, n_row=1, n_col=10):
    """Plots the set of images provided in images

    Args:
        images (list of sparse vectors or numpy arrays): list of images where each image contains the 
            features corresponding to the pixels of an image.  
        h: heigth of the image (in number of pixels).
        w: width of the image (in number of pixels).
        n_row: Number of rows to use when plotting all the images
        n_col: Number of columns to use when plotting all the images

    """
    plt.figure(figsize=(1.8 * n_col, 2.4 * n_row))
    plt.subplots_adjust(bottom=0, left=.01, right=.99, top=.90, hspace=.35)
    for i in range(len(images)):
        plt.subplot(n_row, n_col, i + 1)      
        try:
            img = images[i].toArray()
        except:
            img = images[i]
            
        plt.imshow(img.reshape((h, w)), cmap=plt.cm.jet)
        plt.xticks(())
        plt.yticks(())

In [None]:
%matplotlib inline
h= 28
w =28
plot_data(U,h,w)

# TODO
* Actualizar MLLIB regresor a Spark 2.0
* Comprobar con PCA
* Comprobar (hacer Omega) para CCA