# Dimension Independent Matrix Square using MapReduce

Le projet est basé sur les travaux  de Reza Bosagh Zadeh and Gunnar Carlsson qui développent une nouvelle manière de calculer le produit d’une matrice et de sa transposée. Nous considérons donc une matrice $A$ de taille $n \times p$ dont les entrées ont valeur dans $\left[-1;1\right]$ avec $n > p$. Nous cherchons à calculer la matrice $A^T A$. Dans ce cas précis, MapReduce est l’outil idéal pour manipuler ce genre de données. 

## 1. Calculs préliminaires

In [1]:
# Importation des packages
import numpy as np
import pandas
import pyspark
import math
import time

from operator import add

In [2]:
# Création de la matrice n lignes et p colonnes
mat= np.random.uniform(-1,1,size=(100,10))
n=mat.shape[0]
p=mat.shape[1]
nb=range(n)
mat=np.insert(mat, 0, range(n), axis = 1)

In [3]:
mat

array([[  0.00000000e+00,   8.60217130e-01,  -4.43845173e-01, ...,
          6.65345387e-01,  -2.50346925e-01,   6.77912027e-01],
       [  1.00000000e+00,   3.73621335e-01,  -8.33269714e-01, ...,
          6.89988918e-01,   8.47182291e-02,   9.41167457e-01],
       [  2.00000000e+00,  -5.13759800e-01,  -1.96709185e-01, ...,
         -8.12848010e-01,   3.23638518e-01,   5.43348625e-01],
       ..., 
       [  9.70000000e+01,  -1.75130472e-01,  -4.13002449e-01, ...,
         -2.11882133e-01,  -1.89418284e-01,  -2.24150152e-01],
       [  9.80000000e+01,   9.46566542e-01,   5.45271335e-01, ...,
         -1.36524068e-01,   8.32476919e-01,   6.61663116e-01],
       [  9.90000000e+01,  -7.46032352e-01,   6.04904536e-01, ...,
         -5.77618509e-01,  -6.05690717e-02,   5.70018559e-01]])

In [4]:
np.savetxt("A.csv",mat,fmt='%1.3f',delimiter=',')
A=sc.textFile("A.csv").map(lambda line: line.split(",")).map(lambda line: [float(i) for i in line])

On modifie la matrice en un tableau de trois colonnes (index ligne, index colonne, valeur).

In [5]:
def process_mat_row(row):
    index = int(row[0])
    row = row[1:]
    return [[index, j, v] for j, v in enumerate(row)]

In [6]:
new_A = A.flatMap(process_mat_row)
new_A.take(12)

[[0, 0, 0.86],
 [0, 1, -0.444],
 [0, 2, -0.156],
 [0, 3, 0.86],
 [0, 4, -0.131],
 [0, 5, 0.312],
 [0, 6, -0.613],
 [0, 7, 0.665],
 [0, 8, -0.25],
 [0, 9, 0.678],
 [1, 0, 0.374],
 [1, 1, -0.833]]

## 2. Méthode naïve

### Algorithme 1

In [7]:
# On récupère tout d'abord les a_ij: on a ligne, (colonne,valeur): la clé est sur la ligne
def a_ij(row):
    return row[0], (row[1], row[2])

In [8]:
# On récupère ensuite les deux valeurs a_ij et a_ik associé au même i(clé) pour tous les j et k
mat_join_naive = new_A.map(a_ij).join(new_A.map(a_ij))
mat_join_naive.take(15)

[(0, ((0, 0.86), (0, 0.86))),
 (0, ((0, 0.86), (1, -0.444))),
 (0, ((0, 0.86), (2, -0.156))),
 (0, ((0, 0.86), (3, 0.86))),
 (0, ((0, 0.86), (4, -0.131))),
 (0, ((0, 0.86), (5, 0.312))),
 (0, ((0, 0.86), (6, -0.613))),
 (0, ((0, 0.86), (7, 0.665))),
 (0, ((0, 0.86), (8, -0.25))),
 (0, ((0, 0.86), (9, 0.678))),
 (0, ((1, -0.444), (0, 0.86))),
 (0, ((1, -0.444), (1, -0.444))),
 (0, ((1, -0.444), (2, -0.156))),
 (0, ((1, -0.444), (3, 0.86))),
 (0, ((1, -0.444), (4, -0.131)))]

In [9]:
# On fait la multiplication a_ij*a_ik, on garde les j et k en clé
def produit_matrice(row):
    index, ((j, vij), (k, vik)) = row
    return j, k, vij*vik

In [10]:
mat_produit_naive = mat_join_naive.map(produit_matrice)
mat_produit_naive.take(12)

[(0, 0, 0.7395999999999999),
 (0, 1, -0.38184),
 (0, 2, -0.13416),
 (0, 3, 0.7395999999999999),
 (0, 4, -0.11266),
 (0, 5, 0.26832),
 (0, 6, -0.52718),
 (0, 7, 0.5719000000000001),
 (0, 8, -0.215),
 (0, 9, 0.58308),
 (1, 0, -0.38184),
 (1, 1, 0.197136)]

### Algorithme 2

In [11]:
# On crée une nouvelle clé
mat_produit_key_naive = mat_produit_naive.map(lambda row: ((row[0], row[1]), row[2]))

# On additionne par clé
mat_finale_naive = mat_produit_key_naive.reduceByKey(add)

# Matrice finale 
AT_A_naive = mat_finale_naive.collect()
AT_A_naive.sort()
#AT_A #On obtient bien une matrice 10*10

In [12]:
# On transforme la sortie en tableau
liste_naive=[]
for k in range(p):
    liste_naive.append([AT_A_naive[k*p + i][1] for i in range(p)])

AT_A_finale_naive = np.array(liste_naive)
AT_A_finale_naive

array([[  3.24452250e+01,  -1.40396300e+00,   4.32892100e+00,
          3.00703100e+00,  -1.85982900e+00,  -6.89354000e-01,
         -7.47093600e+00,   2.05804200e+00,   8.69336000e-01,
          1.76682700e+00],
       [ -1.40396300e+00,   3.92347690e+01,  -1.48056300e+00,
          1.18330600e+00,  -3.59645000e-01,  -1.59124800e+00,
         -3.61449600e+00,  -3.97248500e+00,   1.84601900e+00,
          4.95970200e+00],
       [  4.32892100e+00,  -1.48056300e+00,   3.28025080e+01,
          2.84696200e+00,   1.22969700e+00,   2.30929700e+00,
         -3.28962000e-01,   3.74747900e+00,   7.84522000e-01,
         -1.75377600e+00],
       [  3.00703100e+00,   1.18330600e+00,   2.84696200e+00,
          3.35308470e+01,  -2.47654000e-01,  -1.10997300e+00,
         -2.74061200e+00,  -1.60585900e+00,  -4.18404200e+00,
          1.37096600e+00],
       [ -1.85982900e+00,  -3.59645000e-01,   1.22969700e+00,
         -2.47654000e-01,   3.55208030e+01,   5.06489400e+00,
          7.07268000e-01

### Méthode naïve : calcul de performance

In [13]:
time1=time.time()

mat_finale_naive = new_A.map(a_ij).join(new_A.map(a_ij)).map(produit_matrice).map(lambda row: ((row[0], row[1]), row[2])).reduceByKey(add)
AT_A_naive = mat_finale_naive.collect()
AT_A_naive.sort()

liste_naive=[]
for k in range(p):
    liste_naive.append([AT_A_naive[k*p + i][1] for i in range(p)])
AT_A_finale_naive = np.array(liste_naive)
    
time2=time.time()
print(time2-time1)


0.9346730709075928


## 3. Méthode DIMSUM

### Algorithme 3

In [14]:
def carre(row):
    r, c, aij = row
    return c,aij**2

def racine(row):
    c, aij = row
    return c,aij**1/2

In [15]:
norme=new_A.map(carre).reduceByKey(add).map(racine)
norme.take(10)

[(0, 16.222612500000004),
 (8, 16.279587999999997),
 (2, 16.401254),
 (4, 17.7604015),
 (6, 17.3081255),
 (1, 19.6173845),
 (3, 16.765423500000004),
 (9, 15.330870500000003),
 (5, 18.336724),
 (7, 17.568188)]

In [16]:
norme_c=norme.collect()
norme_c.sort()
n_c_df=sqlContext.createDataFrame(norme)
n_c=n_c_df.toPandas()

In [17]:
n_c.sort_values(by='_1',inplace=True)
n_c.reset_index(inplace=True,drop=True)
n_c

Unnamed: 0,_1,_2
0,0,16.222613
1,1,19.617385
2,2,16.401254
3,3,16.765424
4,4,17.760402
5,5,18.336724
6,6,17.308125
7,7,17.568188
8,8,16.279588
9,9,15.330871


In [18]:
# On récupère tout d'abord les a_ij: on a ligne, (colonne,valeur): la clé est sur la ligne
def a_ij(row):
    return row[0], (row[1], row[2])

In [19]:
# On récupère ensuite les deux valeurs a_ij et a_ik associé au même i(clé) pour tous les j et k
mat_join = new_A.map(a_ij).join(new_A.map(a_ij))
#mat_join.take(15)

In [20]:
# On fait la multiplication a_ij*a_ik comme expliqué dans l'algo 3, on garde les j et k en clé
def produit_matrice2(row):
    index, ((j, vij), (k, vik)) = row
    s=1
    gamma=2*math.log(p)/s
    norme=1/(n_c['_2'][j]*n_c['_2'][k])
    proba=np.random.binomial(1,min(1,norme*gamma))
    vjk = (vij*vik)*proba
    
    return j, k, vjk

In [21]:
mat_produit = mat_join.map(produit_matrice2)
mat_produit.take(10)

[(0, 0, 0.0),
 (0, 1, -0.0),
 (0, 2, -0.0),
 (0, 3, 0.0),
 (0, 4, -0.0),
 (0, 5, 0.0),
 (0, 6, -0.0),
 (0, 7, 0.0),
 (0, 8, -0.0),
 (0, 9, 0.0)]

### Algorithme 4

In [22]:
# On crée une nouvelle clé
mat_produit_key = mat_produit.map(lambda row: ((row[0], row[1]), (row[1],row[2])))
mat_produit_key.take(20)

[((0, 0), (0, 0.0)),
 ((0, 1), (1, -0.0)),
 ((0, 2), (2, -0.0)),
 ((0, 3), (3, 0.0)),
 ((0, 4), (4, -0.0)),
 ((0, 5), (5, 0.0)),
 ((0, 6), (6, -0.0)),
 ((0, 7), (7, 0.0)),
 ((0, 8), (8, -0.0)),
 ((0, 9), (9, 0.0)),
 ((1, 0), (0, -0.0)),
 ((1, 1), (1, 0.0)),
 ((1, 2), (2, 0.0)),
 ((1, 3), (3, -0.0)),
 ((1, 4), (4, 0.0)),
 ((1, 5), (5, -0.0)),
 ((1, 6), (6, 0.0)),
 ((1, 7), (7, -0.0)),
 ((1, 8), (8, 0.0)),
 ((1, 9), (9, -0.0))]

In [23]:
# On crée une nouvelle clé et on rajoute la valeur ci
#mat_produit_key2 = mat_produit.map(lambda row: ((row[0], row[1]), n_c['_2'][row[1]], row[2]))
#mat_produit_key2.take(20)

In [24]:
# On somme les vi
def somme_ligne(rowi,rowj):
    s=1
    gamma=2*math.log(p)/s
    norme=1/(n_c['_2'][rowi[0]]*n_c['_2'][rowj[0]])
    bij = (rowi[1]+rowj[1])*(min(1,norme*gamma)==1)/norme+(rowi[1]+rowj[1])*(min(1,norme*gamma)!=1)/gamma
    return (rowi[0],bij)

In [25]:
# On additionne par clé
mat_finale = mat_produit_key.reduceByKey(somme_ligne).map(lambda row: ((row[0][0], row[0][1]), row[1][1]))

In [26]:
# Matrice finale 
AT_A = mat_finale.collect()

In [27]:
AT_A.sort()
AT_A #On obtient bien une matrice 10*10

[((0, 0), 6.0267498752147675e-13),
 ((0, 1), 0.0),
 ((0, 2), 0.0),
 ((0, 3), 0.0055333079394415926),
 ((0, 4), 4.5247069571333816e-10),
 ((0, 5), 4.9064993640878087e-06),
 ((0, 6), 0.0),
 ((0, 7), -0.015559096369874268),
 ((0, 8), -0.0001859760592758199),
 ((0, 9), -5.4125734292993565e-10),
 ((1, 0), -2.2311500672715644e-06),
 ((1, 1), 0.0),
 ((1, 2), 0.0083130605457868816),
 ((1, 3), 0.0016886309615544589),
 ((1, 4), 0.0),
 ((1, 5), 1.7676987148189476e-05),
 ((1, 6), 5.9951084072195629e-06),
 ((1, 7), 0.0010187670965045004),
 ((1, 8), -5.1766030834216801e-07),
 ((1, 9), -2.54175627974606e-10),
 ((2, 0), -1.53753704039889e-07),
 ((2, 1), -3.4057918501230574e-11),
 ((2, 2), 0.0019374988886879262),
 ((2, 3), -0.00021879361633240469),
 ((2, 4), 0.0),
 ((2, 5), 7.8223857015304307e-09),
 ((2, 6), 0.00065434385081148338),
 ((2, 7), 0.00044232006953321979),
 ((2, 8), 8.0973743305338108e-08),
 ((2, 9), -5.4009369119483448e-11),
 ((3, 0), -5.1835717872871734e-08),
 ((3, 1), -1.4701861576975748e

In [28]:
liste=[]
for k in range(p):
    liste.append([AT_A[k*p + i][1] for i in range(p)])

AT_A_finale = np.array(liste)
AT_A_finale

array([[  6.02674988e-13,   0.00000000e+00,   0.00000000e+00,
          5.53330794e-03,   4.52470696e-10,   4.90649936e-06,
          0.00000000e+00,  -1.55590964e-02,  -1.85976059e-04,
         -5.41257343e-10],
       [ -2.23115007e-06,   0.00000000e+00,   8.31306055e-03,
          1.68863096e-03,   0.00000000e+00,   1.76769871e-05,
          5.99510841e-06,   1.01876710e-03,  -5.17660308e-07,
         -2.54175628e-10],
       [ -1.53753704e-07,  -3.40579185e-11,   1.93749889e-03,
         -2.18793616e-04,   0.00000000e+00,   7.82238570e-09,
          6.54343851e-04,   4.42320070e-04,   8.09737433e-08,
         -5.40093691e-11],
       [ -5.18357179e-08,  -1.47018616e-12,  -1.12800553e-03,
          1.77764638e-02,   0.00000000e+00,   2.60810303e-08,
          4.19078136e-04,   0.00000000e+00,  -3.89400829e-08,
          0.00000000e+00],
       [ -1.13288227e-17,  -1.56717442e-12,  -9.48788317e-13,
          2.83837575e-02,   0.00000000e+00,   1.39684912e-09,
         -9.51596681e-05

In [29]:
time1=time.time()

norme=new_A.map(carre).reduceByKey(add).map(racine)
norme_c=norme.collect()
norme_c.sort()
n_c_df=sqlContext.createDataFrame(norme)
n_c=n_c_df.toPandas()
n_c.sort_values(by='_1',inplace=True)
n_c.reset_index(inplace=True,drop=True)

mat_finale = new_A.map(a_ij).join(new_A.map(a_ij)).map(produit_matrice2).map(lambda row: ((row[0], row[1]), (row[1],row[2]))).reduceByKey(somme_ligne).map(lambda row: ((row[0][0], row[0][1]), row[1][1]))
AT_A = mat_finale.collect()
AT_A.sort()
liste=[]
for k in range(p):
    liste.append([AT_A[k*p + i][1] for i in range(p)])
AT_A_finale = np.array(liste)

time2=time.time()
print(time2-time1)

3.5056869983673096


## 4. Comparaison des deux méthodes

### Calcul de la matrice B avec la méthode DIMSUM

Nous allons comparer les deux matrices $DBD$ et $A^TA$ comme expliqué dans l'article.

In [30]:
# Création de la matrice D
D=pandas.DataFrame(np.diag(n_c['_2']))
B=pandas.DataFrame(AT_A_finale)
DBD=D.dot(B.dot(D))

ATA=AT_A_finale_naive

In [31]:
mat_erreur=(DBD-ATA)**2
erreur=sum(mat_erreur.sum())/(n*p)

In [32]:
erreur*100

1232.1811200230127