In [13]:
# Make it Python2 & Python3 compatible
from __future__ import print_function
import sys
if sys.version[0] == 3:
    xrange = range

The notebook deployment includes Spark automatically within each Python notebook kernel. This means that, upon kernel instantiation, there is an SparkContext object called sc immediatelly available in the Notebook, as in a PySpark shell. Let's take a look at it:


In [14]:
# Spark version we are using
print( sc.version )

2.0.2


Import Linear algebra librarires from ML lib

In [15]:
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

Import data from file system or create tmp

In [16]:
# Xrdd = sc.textFile("data.txt")
Xrdd = sc.parallelize([IndexedRow(0, [1, 1, 0]), IndexedRow(1, [1, 0, 0]),
                              IndexedRow(2, [1, 1, 0]), IndexedRow(3, [0, 0, 1])])
print(Xrdd)
#This data is now RDD structure 

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475


Create a distributed block matrix from input data. 
Different types of matrices, will be denoted with different suffixes
#X_bm : block matrix distributed rdd
#X_np : numpy local matrix
#X_rm : indexed row matrix
#X_local: local matrix MLlib structure
see: https://spark.apache.org/docs/latest/mllib-data-types.html

In [18]:
#converte RDD to IndexedRowMatrix
X_rm = IndexedRowMatrix(Xrdd)
#convert IndexedRowMatrix to Block distributed matrix

#define number of blocks for distributed matrix for each variable
nNumberPerBlock=3 
mNumberPerBlock=2
kNumberPerBlock=3
X_bm = X_rm.toBlockMatrix(mNumberPerBlock, nNumberPerBlock)
print(X_bm.toLocalMatrix())
X_bm.validate()

DenseMatrix([[ 1.,  1.,  0.],
             [ 1.,  0.,  0.],
             [ 1.,  1.,  0.],
             [ 0.,  0.,  1.]])


We are solving the following optimization problem \Phi(X) = \Phi(X) F H.
For start \Phi(X)=X.
We create two random matrices F and H.

In [19]:
import numpy as np

#matrix H : kxn
#number of bases for factorization
k = 2; 
n = X_bm.numCols()
m = X_bm.numRows()

Hindexedrows = []
for i in range(0,k):
	Hindexedrows.append((i, np.random.random_sample((n,))))

# Hindexedrows is a num py object -> we create RDD from him
H_rdd = sc.parallelize(Hindexedrows)
# Then we create IndexedRowMatrix , which is a distributed matrix object and create a block matrix
H_bm = IndexedRowMatrix(H_rdd).toBlockMatrix(kNumberPerBlock, nNumberPerBlock)
H_bm.validate()
H_local = H_bm.toLocalMatrix()

print(H_local)

DenseMatrix([[ 0.14933398,  0.40924514,  0.07532505],
             [ 0.47027156,  0.4806673 ,  0.6953724 ]])


In [20]:
#matrix F : nxk
Findexedrows = []
for i in range(0,n):
	Findexedrows.append((i, np.random.random_sample((k,))))

F_bm = IndexedRowMatrix(sc.parallelize(Findexedrows)).toBlockMatrix(nNumberPerBlock, kNumberPerBlock)
F_bm.validate()

print(F_bm.toLocalMatrix())

DenseMatrix([[ 0.79258059,  0.77966523],
             [ 0.75551724,  0.0253088 ],
             [ 0.46144306,  0.22886885]])


Create a Kernel matrix -> this matrix is the largest. All multiplications with this matrix are done with blockmatrix mulitplication function.
All other multiplactions can be done locally.

In [21]:
K = X_bm.transpose().multiply(X_bm)
K.validate()
print(K.toLocalMatrix())
# faster:  IndexedRowMatrixobject.computeGramianMatrix() -> then create block matrix

DenseMatrix([[ 3.,  2.,  0.],
             [ 2.,  2.,  0.],
             [ 0.,  0.,  1.]])


Set parameters for factorization

In [36]:
alpha = 10
mu = 100
num_iter = 20

In [37]:
# block_matrix.toLocalMatrix() converts for local rdd matrix
# local_matrix.toArray() converts to numpy matrix
# some matrix operations are not available for distributed objects
# we run large matrix multiplicaiton with K with distributed matrix operation
# operations on nxk matrices are done locally with numpy

for i in range(0,num_iter):
    #-----update for H -------- H= H.* A./B
    
    # multiplicaiton with K (largematrix)
    FK_bm = F_bm.transpose().multiply(K)

    # A = nominator for update rule
    A_np = np.add( alpha * FK_bm.toLocalMatrix().toArray()  ,2*mu*H_local.toArray())
    
    # denominator Fˆt*K*F*H
    FKFH_bm = FK_bm.multiply(F_bm).multiply(H_bm)
    # denominator H*H^T*H
    HHH_np = np.matmul(np.matmul(H_local.toArray(), H_local.toArray().transpose()), H_local.toArray())
    # denominator whole
    B_np = np.add( alpha*FKFH_bm.toLocalMatrix().toArray(), 2*mu*HHH_np)
    
    #update rule for H 
    H_np = np.multiply( np.divide(A_np, B_np), H_local.toArray())
    
    # convert numpy H object first to IndexedRowMatrix and then to blockmatrix 
    H_bm = IndexedRowMatrix(sc.parallelize( list(enumerate(H_np.tolist()) ))).toBlockMatrix(kNumberPerBlock, nNumberPerBlock)
    # convet also to local matrix, used in each iteration for numpy operations
    H_local = H_bm.toLocalMatrix()
    
    #----------update for F-------
    # nominator K*H^T
    KH_bm = K.multiply( H_bm.transpose() )
    # denominator: K*F*H*H^T
    KFHH_bm = K.multiply(F_bm).multiply(H_bm).multiply(H_bm.transpose())
    # udpate rule for F
    F_np = np.multiply( F_bm.toLocalMatrix().toArray() , np.divide( KH_bm.toLocalMatrix().toArray(), KFHH_bm.toLocalMatrix().toArray() ))
    # convert numpy F object to to IndexedRowMatrix and then to blockmatrix 
    F_bm = IndexedRowMatrix(sc.parallelize( list(enumerate(F_np.tolist()) ))).toBlockMatrix(nNumberPerBlock, kNumberPerBlock)
    
    print('.', end='')

print(' Finished ')

.................... Finished 


In [38]:
#recommendations Y= X*F*H
Y = np.matmul(X_bm.toLocalMatrix().toArray() , np.matmul(F_np, H_np))
print(Y)

[[  1.00526809e+00   9.99752422e-01   9.92589794e-29]
 [  9.89292777e-01   2.21286704e-02   9.76815963e-29]
 [  1.00526809e+00   9.99752422e-01   9.92589794e-29]
 [  9.87388149e-29   0.00000000e+00   9.74935356e-57]]


In [39]:
#print original data
print(X_bm.toLocalMatrix())

DenseMatrix([[ 1.,  1.,  0.],
             [ 1.,  0.,  0.],
             [ 1.,  1.,  0.],
             [ 0.,  0.,  1.]])
