# Eléments logiciels pour le traitement des données massives (ENSAE 3A)

### Sholom Schechtman et Nicolas Schreuder

Article : https://arxiv.org/pdf/1307.0048.pdf

L'idée du projet est d'implémenter un algorithme MapReduce pour le problème de régression linéaire pénalisée, quand la matrice de features $X \in \mathbb{R}^{n \times p}$ avec $p << n$.

Cela correspond à un type de problème ou le nombre de features $p$ (les caractéristiques d'un individu ou d'un produit) est assez petit et il est envisageable de les stocker en mémoire, alors que la taille du dataset $n$ et très grande, ce qui nous pousse à passer par une étape MapReduce.

L'idée de l'agorithme est alors d'exprimer la quantité à minimiser en fonction des matrices ou des vecteurs dont la dimension est une fonction de $p$ ($p\times p$ ou $p$) mais pas de $n$. Puis de calculer ces quantitées à partir de $X \in \mathbb{R}^{n \times p}$ en faisant une reduction sur $n$ (qui est la taille de notre dataset)

In [2]:
import numpy as np
from numpy.random import multivariate_normal
from scipy.linalg.special_matrices import toeplitz

# Algorithme 1

Le problème de régression linéaire pénalisée et de la forme : 

$$\min_\beta ||Y - \alpha \mathbb{1} - X \beta||_2^2 + p_\lambda(\beta) $$

avec $Y$ le vecteur d'étiquettes, $\alpha$ un biais, $X$ la matrice de features, $\beta$ le vecteur des coefficients (à estimer) et une fonction de pénalisation $p_\lambda$ (par exemple la norme euclidienne pour la régression Ridge ou la norme $l_1$ pour le Lasso).

En notant $X_c \in \mathbb{R}^{n \times p }$ la matrice centrée réduite de $X$. Nous aavons :
$$X_c = (X - \mathbb{1} (\bar{X_1}, \dots, \bar{X_p}))D^{-1}$$

avec $D$ la matrice diagonale des écarts types.

Ainsi nous pouvons réécrire la fonction objectif :

$$\begin{align} ||Y - \alpha \mathbb{1} - X \beta||_2^2 + p_\lambda(\beta)
= & \ ||Y - \alpha \mathbb{1} - (X_cD + \mathbb{1} (\bar{X_1}, \dots, \bar{X_p})) \beta||_2 + p_\lambda(\beta) \\
= & \ ||Y - (\alpha + (\bar{X_1}, \dots, \bar{X_p})) \beta) \mathbb{1} - X_c D \beta ||_2 + p_\lambda(\beta)
\end{align}$$

Nous avons donc que minimiser en $(\alpha, \beta)$ :

$$||Y - \alpha \mathbb{1} - X \beta||_2^2 + p_\lambda(\beta) $$

revient à minimiser en $(\hat{\alpha}, \hat{\beta})$:

$$\begin{align}
||Y - \hat{\alpha} \mathbb{1} - X_c \hat{\beta}||_2^2 + p_\lambda(\hat{\beta})
\end{align}$$

Avec le changement de variable:

$$\begin{align} 
\hat{\alpha}&= \alpha + \left(\bar{X_1}, \dots, \bar{X_p}\right) \beta \\
\hat{\beta} &= D \beta
\end{align}
 $$

 
En développant la dernière équation nous obtenons :

 $$\begin{align}
||Y - \hat{\alpha} \mathbb{1} - X_c \hat{\beta}||_2^2 + p_\lambda(\hat{\beta}) = \ & Y^TY  + n \alpha^2 + \hat{\beta}^TX_c^TX_c\beta - 2 n \alpha \bar{Y} + 2 \alpha \mathbb{1}^X_c\beta - 2Y^TX_c\hat{\beta} + p_\lambda(\hat{\beta})\\
\end{align}$$
 
 Comme $X_c$ est centrée  le minimum est atteint pour $\hat{\alpha} = \bar{Y}$ et $\alpha \mathbb{1}^X_c = 0$.
 
 L'expression à minimiser devient finalement :
 
 $$\begin{align}
  &\ \hat{\beta} = \arg \min_{\beta}  Y^TY - n \bar{Y}^2 + \beta^TX_c^TX_c\beta - 2Y^TX_c\beta + p_\lambda(\beta) \\
   \Leftrightarrow &\ \hat{\beta} =  \arg \min_{\beta} \beta^TX_c^TX_c\beta - 2Y^TX_c\beta + p_\lambda(\beta) \\
   \Leftrightarrow &\ \hat{\beta} =  \arg \min_{\beta} \beta^TD^{-1}(X^TX - n(\bar{X_1}, \dots, \bar{X_p})^T (\bar{X_1}, \dots, \bar{X_p}))D^{-1}\beta - 2(Y^TX - n \bar{Y}(\bar{X_1}, \dots, \bar{X_p}))D^{-1}\beta + p_\lambda(\beta)
 \end{align}
 $$
 
 $X^TX, Y^TX, D^{-1}, (\bar{X_1}, \dots, \bar{X_p})^T (\bar{X_1}, \dots, \bar{X_p})$ sont des matrices de taille $p \times p$. On peutn par hypothèse, les stocker en mémoire et résoudre ce problème par une des méthodes d'optimisation classiques (coordinate descent par exemple).
 
Lors de l'étape map-reduce on calculera $X^TX, Y^TX, \bar{Y}, (\bar{X_1}, \dots, \bar{X_p}), COV(X)$ à partir desquelles on retrouvera les quantités citées précédemment.

Il faut en général fixer un hyper-paramètre, $\lambda$, régissant l'impact de la pénalisation sur la fonction de coût. 
Ce paramètre est le plus souvent fixé par validation croisée. 
Nous incorporons cette validation croisée dans notre implémentation en partitionant l'ensemble des observations en $K$ classes (de façon aléatoire) grâce à la fonction $\textit{reducebyKey}$.

In [3]:
from scipy.optimize import minimize 
def PenalizedLR_MR(Data, k, lambdas, penalizer="ridge"):
    """
    Data: a RDD, each rows is a tuple (x, y)
    k: number of partitions for splitting (k needs to be strictly greater than 3)
    lambdas: list of lambdas (penalization parameter) to compare by CV 
    penalizer: penalization term (only "ridge" is available for now)
    """
    
    def map_statistics(row):
        # Compate statistics for one row [size, mean(x), mean(y), Y^TY, y * x, cov(x)]
        x = row[0]
        y = row[1]
        return (np.random.randint(k), [1, x, y, y**2, y * x, np.zeros((len(row[0]), len(row[0])))])


    def reduce_statistics(row1, row2):
        # Combined with map_statistics, returns [size, mean(X), mean(Y), Y^TY, Y^TX, Cov(X)]
        s_1 = row1[0]
        s_2 = row2[0]
        mean_x = s_1 / (s_1 + s_2) * row1[1] + s_2 / (s_1 + s_2) * row2[1]
        mean_y = s_1 / (s_1 + s_2) * row1[2] + s_2 / (s_1 + s_2) * row2[2]
        
        mean_substraction = (row1[1] - row2[1]).reshape((1, -1))
        cov = s_1 / (s_1 + s_2) * row1[5] + s_2 / (s_1 + s_2) * row2[5] + s_1 * s_2 / (
            s_1 + s_2)**2 * (mean_substraction).T.dot(mean_substraction)
        emit = [s_1 + s_2, mean_x, mean_y, row1[3] +
                row2[3], row1[4] + row2[4], cov]
        return emit

    statistics = Data.map(map_statistics)
    statistics = statistics.reduceByKey(reduce_statistics)

    # Cross validation
    test_errors = []
    for lmbda in lambdas:
        error = 0
        for i in range(k):
            # Split in train/test
            statistics_train = statistics.filter(lambda row: row[0] != i)
            statistics_test = statistics.filter(lambda row: row[0] == i).collect()[0][1]
            
            # Compute the dot products we need for our train dataset
            size_train, means_X_train, mean_Y_train, YT_Y_train, YT_X_train, COV_X_train  = statistics_train.map(lambda row: row[1]).reduce(reduce_statistics)
            
            # Reshape our vectors
            means_X_train = means_X_train.reshape(-1, 1)
            YT_X_train = YT_X_train.reshape(-1, 1)

            p = COV_X_train.shape[0]
            XT_X_train = size_train * (COV_X_train + means_X_train.dot(means_X_train.T))
            D_inv_train = np.diag([1 / np.sqrt(COV_X_train[i,i]) for i in range(p)])
            D_train = np.diag([np.sqrt(COV_X_train[i,i]) for i in range(p)])
            
            # Compute statistics for our test dataset:
            size_test = statistics_test[0]
            means_X_test = statistics_test[1].reshape(-1, 1)
            mean_Y_test = statistics_test[2]
            YT_Y_test = statistics_test[3]
            YT_X_test = statistics_test[4].reshape(-1, 1)
            COV_X_test = statistics_test[5]
            
            XT_X_test = size_test * (COV_X_test + means_X_test.T.dot(means_X_test))
            D_inv_test = np.diag([1 / np.sqrt(COV_X_test[i,i]) for i in range(p)])
            D_test = np.diag([np.sqrt(COV_X_train[i,i]) for i in range(p)])
            
            def beta_objective(beta):
                pen_term = 0
                if penalizer == "Ridge":
                    pen_term = lmbda + np.linalg.norm(beta**2)
                    
                # Define the simplified objective function for beta
                linear_term = -(YT_X_train - size_train * mean_Y_train * means_X_train).T.dot(D_inv_train).dot(beta)
                quadratic_term = (1 /2 * beta.dot(D_inv_train).dot(XT_X_train - size_train * 
                                                                   means_X_train.dot(means_X_train.T)).dot(D_inv_train).dot(beta))
                return linear_term + quadratic_term + pen_term
            
            
            alpha_hat = mean_Y_test
            beta_hat = minimize(beta_objective, np.zeros(p), method="CG").x
            
            def test_error(alpha, beta):
                quadratic_term = YT_Y_test - size_test * alpha**2 + beta.dot(D_inv_test).dot(XT_X_test - size_test * means_X_test.T.dot(means_X_test)).dot(D_inv_test).dot(beta)
                double_term = -2 * alpha * mean_Y_test -2 * (YT_X_test - size_test * mean_Y_test * means_X_test).T.dot(D_inv_test).dot(beta)
                return quadratic_term + double_term
            
            error += test_error(alpha_hat, beta_hat)
        test_errors.append(error)
    
    # Choose best lambda according to mean error
    best_i = np.argmin(test_errors)
    best_lambda = lambdas[best_i]

    # Compute the dot products we need
    size, means_X, mean_Y, YT_Y, YT_X, COV_X  = statistics.map(lambda x : x[1]).reduce(reduce_statistics)
    
    # Reshape our vectors
    means_X = means_X.reshape(-1, 1)
    YT_X = YT_X.reshape(-1, 1)
    
    p = COV_X.shape[0]
    XT_X = size * (COV_X + means_X.dot(means_X.T))
    D_inv = np.diag([1 / np.sqrt(COV_X[i,i]) for i in range(p)])
    D = np.diag([np.sqrt(COV_X[i,i]) for i in range(p)])
    
    def beta_objective(beta):
        pen_term = 0
        if penalizer == "Ridge":
            pen_term = lmbda + np.linalg.norm(beta**2)
        
        # Change of variables 
        beta = D.dot(beta)
        
        # Compute simplified objective function for beta
        linear_term = -(YT_X - size * mean_Y * means_X).T.dot(D_inv).dot(beta)
        quadratic_term = 1 /2 * beta.dot(D_inv).dot(XT_X - size * means_X.dot(means_X.T)).dot(D_inv).dot(beta)
        return linear_term + quadratic_term + pen_term
    
    # Compute final alpha, beta
    beta = minimize(beta_objective, np.zeros(p), method="CG").x
    alpha_hat = mean_Y
    alpha = alpha_hat - means_X.T.dot(beta)
    
    return (alpha, beta, best_lambda)

# Quelques tests

Nous commençons par tester notre algorithme pour $n = 500$.

In [4]:
# Design random feature matrix
n, p = 500, 100
cov = toeplitz(0.5 ** np.arange(p))
X = multivariate_normal(np.zeros(p), np.eye(p), n)

# Set coefficients
idx = np.arange(p)
coefs = (idx % 2) * np.exp(-idx / 10.)
coefs[40:] = 0.
    
y = X.dot(coefs)

Data_array = [(X[i], y[i]) for i in range(n)]

Data_rdd = sc.parallelize(Data_array)

In [5]:
alpha, beta, lmbda = PenalizedLR_MR(Data=Data_rdd, k=5, lambdas=np.logspace(-2, 2, 5), penalizer="Ridge")

print("Estimated intercept : {}".format(alpha))
print("Euclidian distance between true coefficients and estimated coefficients : {}".format(np.linalg.norm(beta - coefs, ord=2)))
print("Best penalization parameter found by CV : {}".format(lmbda))

[4.81162156e-05] 0.004597685090530298 0.01


Notre algorithme parvient à estimer correctement le terme de biais et les coefficients. Nous le comparons avec ce qu'obtiens la régression Ridge implémentée dans scikit-learn avec CV.

In [7]:
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score

scores = []
lambdas = np.logspace(-2, 2, 5)
for lmbda in lambdas:
    clf = Ridge(lmbda, fit_intercept=True)
    score = cross_val_score(clf, X, y, cv=5).mean()
    scores.append(score)

best_i = np.argmin(scores)
best_lambda = lambdas[best_i]
clf = Ridge(best_lambda, fit_intercept=True)
clf.fit(X, y)

print('Estimated intercept : {}'.format(clf.intercept_))
print("Euclidian distance between true coefficients and estimated coefficients : {}".format(np.linalg.norm(clf.coef_ - coefs, ord=2)))

0.008422742420145918 0.32487655733699944


In [5]:
%timeit PenalizedLR_MR(Data=Data_rdd, k=5, lambdas=np.logspace(-2, 2, 5), penalizer="Ridge")

1 loop, best of 3: 14.3 s per loop


In [8]:
%%timeit
scores = []
lambdas = np.logspace(-2, 2, 5)
for lmbda in lambdas:
    clf = Ridge(lmbda, fit_intercept=True)
    score = cross_val_score(clf, X, y, cv=5).mean()
    scores.append(score)

best_i = np.argmin(scores)
best_lambda = lambdas[best_i]
clf = Ridge(best_lambda, fit_intercept=True)
clf.fit(X, y)

10 loops, best of 3: 63.8 ms per loop


Nous essayons maintenant un problème plus grand avec $n = 10000$. Nous comparons notre algorithme avec celui de Scikit-learn.

In [9]:
n, p = 10000, 100
cov = toeplitz(0.5 ** np.arange(p))
X = multivariate_normal(np.zeros(p), np.eye(p), n)

idx = np.arange(p)
coefs = (idx % 2) * np.exp(-idx / 10.)
coefs[40:] = 0.
    
y = X.dot(coefs)

Data_array = [(X[i], y[i]) for i in range(n)]

Data_rdd = sc.parallelize(Data_array)

In [10]:
%timeit PenalizedLR_MR(Data=Data_rdd, k=5, lambdas=np.logspace(-2, 2, 5), penalizer="Ridge")

1 loop, best of 3: 20.3 s per loop


In [11]:
%%timeit
scores = []
lambdas = np.logspace(-2, 2, 5)
for lmbda in lambdas:
    clf = Ridge(lmbda, fit_intercept=True)
    score = cross_val_score(clf, X, y, cv=5).mean()
    scores.append(score)

best_i = np.argmin(scores)
best_lambda = lambdas[best_i]
clf = Ridge(best_lambda, fit_intercept=True)
clf.fit(X, y)

1 loop, best of 3: 603 ms per loop


Finalement nous testons notre algorithme avec $n = 100000$.

In [12]:
n, p = 100000, 100
cov = toeplitz(0.5 ** np.arange(p))
X = multivariate_normal(np.zeros(p), np.eye(p), n)


idx = np.arange(p)
coefs = (idx % 2) * np.exp(-idx / 10.)
coefs[40:] = 0.
    
y = X.dot(coefs)

Data_array = [(X[i], y[i]) for i in range(n)]

Data_rdd = sc.parallelize(Data_array)

In [13]:
%timeit PenalizedLR_MR(Data=Data_rdd, k=5, lambdas=np.logspace(-2, 2, 5), penalizer="Ridge")

1 loop, best of 3: 24.2 s per loop


In [14]:
%%timeit
scores = []
lambdas = np.logspace(-2, 2, 5)
for lmbda in lambdas:
    clf = Ridge(lmbda, fit_intercept=True)
    score = cross_val_score(clf, X, y, cv=5).mean()
    scores.append(score)

best_i = np.argmin(scores)
best_lambda = lambdas[best_i]
clf = Ridge(best_lambda, fit_intercept=True)
clf.fit(X, y)

1 loop, best of 3: 5.54 s per loop


### Conclusion

Malheureusement nous n'avons pu tester notre algorithme sur un cluster, la comparaison entre le Ridge de Scikit-learn et notre implémentation MapReduce n'est pas juste pour cette dernière, étant donné que MapReduce sert à traiter de grands volumes de données en distribuant les calculs sur un cluster.

Néanmoins, nous remarquons que pour $n$ petit, la régression Ridge de scikit-learn est beaucoup plus rapide que notre implémentation, ce qui est normal vu qu'elle est spécialement optimisée pour Python (probablement codée en cython même).

Cependant lorsque $n$ augmente l'écart diminue sensiblement passant de au moins 20 fois rapide pour $n=500$ à moins de 5 fois plus rapide pour $n=100000$. 

Ceci laisse supposer que notre méthode pourrait battre la régression Ridge de scikit-learn en temps d'éxécution si elle était executée sur un grand nombre de machines et que les fonctions hors MapReduce étaient codées en cython par exemple.