In [1]:
import findspark
findspark.init('/home/duynguyen/spark-master')

In [2]:
from pyspark import SparkContext
# $example on$
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
# $example off$

In [168]:
from numpy import zeros, max, sqrt, isnan, isinf, dot, diag, count_nonzero, where
from numpy.linalg import svd, linalg, LinAlgError, norm
from scipy.linalg import svd as scipy_svd


class FrequentDirections(object):
    
    def __init__(self ,sc , rows, columns, op='fd'):
        """
        Matrix Sketching using Frequent Direction.
        Choose 'fd' for normal Frequent Direction, 'ssd' for Space Saving Direction, 'cfd' for Compensative Frequent Direction, 'isvd' for iterative SVD, and a number between 0 and 1 for Parameterized Frequent Direction
        """
        self.class_name = 'FrequentDirections'
        self.op = op
        self.columns = columns
        self.rows = rows
        self.localSketchMatrix = zeros((self.rows, self.columns)) 
        self.distributedSketchMatrix = RowMatrix(sc.parallelize(self.localSketchMatrix))
        self.S = zeros(self.rows)
        self.U = []
        self.V = []
        self.step = 1
        self.nextZeroRow = 0
        self.emptyRows = self.rows

        # Parsing the operation parameter
        if self.op == 'fd':
            print("Matrix Sketching Using Frequent Direction")
            self.reduceRank = self.__FDOperate__
        elif self.op == 'ssd':
            print("Matrix Sketching Using Space Saving Direction")
            self.op = 2
            self.reduceRank = self.__SSDOperate__
        elif self.op == 'cfd':
            print("Matrix Sketching Using Compensative Frequent Direction")
            self.reduceRank = self.__CFDperate__
        elif self.op == 'isvd':
            print("Matrix Sketching Using iSVD")
            self.reduceRank = self.__iSVDOperate__
        elif type(self.op) != str and self.op > 0 and self.op < 1:
            print("Matrix Sketching Using Parameterized Frequent Direction")
            self.reduceRank = self.__PFDOperate__
            self.DELTA = 0
        else:
            print("Type of Reduce Rank algorithm is not correct")
            raise ValueError
    
    # Add new vector to the sketch matrix
    def add(self,vector):     
        if count_nonzero(vector) == 0:
            return
        
        # If the approximate matrix is full, call the operate method to free half of the columns
        if self.emptyRows <= 0:
#             try:
#                 [self.U,self.S,self.V] = svd(self.localSketchMatrix , full_matrices=True)
#             except LinAlgError as err:
#                 [self.U,self.S,self.V] = scipy_svd(self.localSketchMatrix , full_matrices = True)
            self.svd = self.distributedSketchMatrix.computeSVD(self.rows, computeU=True)
            self.U = self.svd.U       # The U factor is a distributed RowMatrix.
            self.S[:] = self.svd.s[:]      # The singular values are stored in a local dense vector.
            self.V = self.svd.V       # The V factor is a local dense matrix.
            self.reduceRank()

        # Push the new vector to the next zero row and increase the next zero row index
        self.localSketchMatrix[self.nextZeroRow,:] = vector
        del(self.distributedSketchMatrix)
        self.distributedSketchMatrix = RowMatrix(sc.parallelize(self.localSketchMatrix))
        self.nextZeroRow += 1
        self.emptyRows -= 1


    # Shrink the approximate matrix using Frequent Direction
    def __FDOperate__(self):
        # Calculating matrix s
        delta = sqrt(self.S[:]**2 - self.S[len(self.S)-1]**2)
        self.S = delta
        self.S[len(self.S)-1] = 0
        #Shrink the sketch matrix
        self.localSketchMatrix[:,:] = dot(diag(self.S), self.V.toArray().transpose()[:len(self.S),:])
        self.nextZeroRow = (len(self.S)-1)
        self.emptyRows += 1
            
        
    # Shrink the approximate matrix using iterative SVD
    def __iSVDOperate__(self):
        # Calculating matrix s
        self.S[len(self.S)-1] = 0
        #Shrink the sketch matrix
        self.localSketchMatrix[:,:] = dot(diag(self.S), self.V.toArray().transpose()[:len(self.S),:])
        self.nextZeroRow = (len(self.S)-1)
        self.emptyRows += 1


   	# Shrink the approximate matrix using Parameterized FD
    def __PFDOperate__(self):
        #Shrink the sketch matrix
        # Calculating matrix s
        delta = self.S[-1]**2
        self.S[round(len(self.S)*(1-self.op)):] = sqrt(self.S[round(len(self.S)*(1-self.op)):]**2 - self.S[-1]**2)
        #Shrink the sketch matrix
        self.localSketchMatrix[:,:] = dot(diag(self.S), self.V.toArray().transpose()[:len(self.S),:])
        self.nextZeroRow = len(self.S) - 1
        self.emptyRows += 1
    

    # Shrink the approximate matrix using Space Saving Direction
    def __SSDOperate__(self):
        # Calculating matrix s
        self.S[-1] = sqrt(self.S[-1]**2 + self.S[-2]**2)
        self.S[len(self.S)-2] = 0
        #Shrink the sketch matrix
        self.localSketchMatrix[:,:] = dot(diag(self.S), self.V.toArray().transpose()[:len(self.S),:])
        self.nextZeroRow = len(self.S)-2
        self.emptyRows += 1
        
        
    # Shrink the approximate matrix using Compensative Direction
    def __CFDperate__(self):
        # Calculating SVD
        try:
            [U,self.S,Vt] = svd(self.sketchMatrix, full_matrices=False)
        except LinAlgError as err:
            [U,self.S,Vt] = scipy_svd(self.sketchMatrix, full_matrices = False)
        # Calculating matrix s
        self.DELTA += self.S[-1]**2
        self.S = sqrt(self.S[:len(self.S)]**2 + self.DELTA)
        delta = sqrt(self.S[:]**2 - self.S[len(self.S)-1]**2)
        self.S = delta
        self.S[len(self.S)-1] = 0
        #Shrink the sketch matrix
        self.localSketchMatrix[:,:] = dot(diag(self.S), self.V.toArray().transpose()[:len(self.S),:])
        self.nextZeroRow = len(self.S) - 1
        self.emptyRows += 1


    # Return the local sketch matrix
    def getLocalSketchMatrix(self):
        return self.localSketchMatrix
    
    # Return the distributed sketch matrix
    def getDistributedSketchMatrix(self):
        return self.distributedSketchMatrix
    
    # Return the S matrix
    def getS(self):
        return self.S
    
    # Return the U matrix
    def getU(self):
        return self.U
    
    # Return the Vt matrix
    def getV(self):
        return self.V


In [128]:
sc = SparkContext(appName="PythonSVDExample")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PythonSVDExample, master=local[*]) created by __init__ at <ipython-input-9-d6a060f3e95b>:1 

In [163]:
import numpy as np
A = 100* np.random.rand(100,20)
print(A)

[[ 74.17676246  56.48576044  61.90269881 ...,  13.81001226  60.90292572
   70.64630815]
 [ 65.23845528  56.85610287  17.73233222 ...,  20.18898103  77.39982244
   45.84270017]
 [ 55.17141272   7.68441521  99.83202449 ...,   2.94539277   1.32540591
   45.59873996]
 ..., 
 [ 15.8193658   21.3898864   62.86031705 ...,  71.73247605  39.41963616
   30.88483749]
 [ 67.41856555  28.24082072  52.43478114 ...,  86.46613207  18.36970851
   61.23992027]
 [ 41.99067863  80.7965796   58.5105537  ...,  57.32652329  95.4212792
   27.40444226]]


In [167]:
l = 10
fd =  FrequentDirections(sc=sc,rows=l,columns=20,op='fd')
for i in range(100):
    row = A[i,:]
    fd.add(row)

Matrix Sketching Using Frequent Direction


In [169]:
print(fd.getLocalSketchMatrix())
print(fd.getS())

[[ -4.91712589e+02  -5.03981499e+02  -4.96034743e+02  -5.19209159e+02
   -5.11456257e+02  -5.34266084e+02  -5.01024967e+02  -5.64475515e+02
   -4.97855116e+02  -5.05149183e+02  -4.71295796e+02  -5.11410958e+02
   -4.97726515e+02  -5.04353385e+02  -3.80995126e+02  -4.80383329e+02
   -4.78348766e+02  -4.63529147e+02  -4.90495888e+02  -4.94344596e+02]
 [  3.10217398e+01   2.20927653e+01  -3.13881585e+01   2.81183044e+00
   -4.82391127e+01   4.59935490e+01   5.18190734e+01  -5.34203591e+00
   -4.11975889e+01  -4.22560982e+01  -2.22121531e+01  -8.01049446e+01
    1.13737222e+01  -2.42345293e+01  -2.27650548e+01   1.98103704e+01
    7.80931589e+01  -4.04680037e+01   6.33665018e+01   2.87345346e+01]
 [ -1.38073129e+01  -2.81153138e+01   5.23140799e+00   9.16531677e+01
   -2.25718120e+01   3.15507667e+01   1.39762943e+01  -2.53101975e+01
    3.50878713e+01   8.09985281e+00   5.51600734e+00  -1.13834853e+01
    1.95520646e+00   4.09816533e+01  -4.23782058e+01  -8.33578963e+01
   -9.58726788e+00

In [30]:
fd.getDistributedSketchMatrix().rows.collect()

[DenseVector([3.8074, 27.7652, 84.5342, 62.3573, 23.0472, 70.5257, 73.9106, 89.8079, 89.447, 90.1831, 47.1219, 97.0055, 37.6688, 10.0116, 35.8555, 95.6983, 38.4716, 6.5108, 96.8757, 42.8443]),
 DenseVector([42.6074, 59.846, 91.0104, 41.3266, 87.6616, 56.7175, 47.8576, 10.7291, 61.5257, 58.284, 57.2444, 54.1114, 63.7084, 18.6313, 96.8052, 8.4404, 0.5815, 75.2998, 51.2543, 98.7061]),
 DenseVector([6.8133, 98.3658, 78.4205, 9.4251, 84.6307, 17.3883, 73.6359, 84.889, 49.9635, 34.1662, 20.4015, 19.1985, 90.6758, 63.8162, 48.6891, 13.0695, 42.0981, 59.9316, 87.613, 12.5752]),
 DenseVector([78.7361, 30.0491, 21.7485, 84.4172, 64.3682, 96.6846, 82.6476, 91.5166, 80.3817, 55.1016, 31.8799, 7.4774, 89.7606, 52.5243, 31.9271, 59.2847, 40.7302, 82.1948, 50.2091, 34.1954]),
 DenseVector([17.4635, 37.0829, 74.4209, 97.2032, 84.2433, 12.0505, 54.3987, 86.2278, 66.7769, 35.4716, 84.4427, 54.1307, 59.1003, 2.5274, 23.4833, 0.0646, 3.2015, 85.4729, 25.2708, 3.5244]),
 DenseVector([11.2552, 0.257, 70.459

In [77]:
Matrix = RowMatrix(sc.parallelize(A))
svd2 = Matrix.computeSVD(5, computeU=True)

In [68]:
print(svd2.V)

DenseMatrix([[-0.47940821,  0.81746502, -0.01760412,  0.27716641, -0.15744074],
             [-0.40110302, -0.26094608, -0.3207908 ,  0.44495359,  0.68566251],
             [-0.45004222, -0.02028785, -0.47969164, -0.75292758, -0.00681117],
             [-0.33332592, -0.45840826, -0.22249658,  0.3597372 , -0.70699372],
             [-0.54372785, -0.23045249,  0.78560458, -0.1699538 ,  0.07206128]])


In [78]:
[U,S,V] = svd(A , full_matrices=False)

In [79]:
print(V.transpose())

[[-0.47940821 -0.81746502  0.01760412 -0.27716641  0.15744074]
 [-0.40110302  0.26094608  0.3207908  -0.44495359 -0.68566251]
 [-0.45004222  0.02028785  0.47969164  0.75292758  0.00681117]
 [-0.33332592  0.45840826  0.22249658 -0.3597372   0.70699372]
 [-0.54372785  0.23045249 -0.78560458  0.1699538  -0.07206128]]


In [72]:
print(svd2.s)

[339.165712374,135.258445457,91.5167565005,77.063612949,35.0405021255]


In [73]:
print(S)

[ 339.16571237  135.25844546   91.5167565    77.06361295   35.04050213]


In [74]:
for vector in svd2.U.rows.collect():
    print(vector)

[-0.395959899238,0.290094393827,0.143401577937,-0.486912196255,-0.148746647467]
[-0.314467845435,-0.401832240529,-0.254387906223,-0.294130224232,-0.451110980468]
[-0.324944860716,-0.428145262238,-0.147702847387,-0.00197461022659,-0.225991743632]
[-0.255389421462,0.367563384128,-0.27807894927,-0.0570780990215,0.327836736141]
[-0.246460536897,0.167520427897,0.48941708739,0.229590956726,-0.298450001776]
[-0.380895813076,-0.312848513258,0.312415666963,0.572996715732,0.188116900846]
[-0.299845168176,-0.278350324092,0.256904016439,-0.380445267575,0.64270062284]
[-0.27759131508,0.423891943764,0.179744454603,0.0768327233427,-0.238110313782]
[-0.251838728867,0.168381816867,0.0161351144162,-0.0727289919241,0.0889972590462]
[-0.370542289317,0.156562706417,-0.616342460428,0.36901693314,0.116972767022]


In [80]:
print(U)

[[-0.3959599  -0.29009439 -0.14340158  0.4869122   0.14874665]
 [-0.31446785  0.40183224  0.25438791  0.29413022  0.45111098]
 [-0.32494486  0.42814526  0.14770285  0.00197461  0.22599174]
 [-0.25538942 -0.36756338  0.27807895  0.0570781  -0.32783674]
 [-0.24646054 -0.16752043 -0.48941709 -0.22959096  0.29845   ]
 [-0.38089581  0.31284851 -0.31241567 -0.57299672 -0.1881169 ]
 [-0.29984517  0.27835032 -0.25690402  0.38044527 -0.64270062]
 [-0.27759132 -0.42389194 -0.17974445 -0.07683272  0.23811031]
 [-0.25183873 -0.16838182 -0.01613511  0.07272899 -0.08899726]
 [-0.37054229 -0.15656271  0.61634246 -0.36901693 -0.11697277]]


In [81]:
dot(diag(svd2.s), svd2.V.toArray().transpose())

array([[-162.59882596, -136.04039172, -152.63889   , -113.05272171,
        -184.41384287],
       [ 110.56904782,  -35.29516168,   -2.74410243,  -62.00358914,
         -31.1706453 ],
       [  -1.61107215,  -29.35773324,  -43.89982342,  -20.36216556,
          71.89598308],
       [  21.35944517,   34.28973091,  -58.02331947,   27.72264844,
         -13.09725385],
       [  -5.51680261,   24.02595868,   -0.23866696,  -24.77341481,
           2.52506329]])

In [147]:
s2=[]
s2[:] = svd2.s[:]

In [150]:
s2[1] = 0

In [143]:
svd2.s[1]

135.2584454565349

In [170]:
svd2.U + svd2.U

TypeError: unsupported operand type(s) for +: 'RowMatrix' and 'RowMatrix'

In [173]:
s2 = svd2.s.array

In [175]:
s2[0] = 0

ValueError: assignment destination is read-only

In [177]:
dot(svd2.V,svd2.V)

TypeError: unsupported operand type(s) for *: 'DenseMatrix' and 'DenseMatrix'

In [None]:
svd2.V.