# RPCA on Apache Spark

In [1]:
import pyspark
import numpy as np
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import Vectors

In [2]:
pyspark.__version__

'2.3.1'

In [3]:
conf = SparkConf()
conf.set('spark.app.name', 'rpca_cpm')
conf.set('spark.master', 'local[*]')
conf.set('spark.driver.memory', '4g')
conf.set('spark.driver.maxResultSize', '3g')
conf.set('spark.executor.cores', '1')
conf.set('spark.executor.instances', '3')
conf.set('spark.executor.memory', '2g');

In [4]:
sc = (SparkSession
      .builder
      .config(conf=conf)
      .getOrCreate()
      .sparkContext)

In [5]:
sc

In [6]:
cpm_df = (sc
          .textFile('./data/cpm_weekly_clean.csv')
          .map(lambda line: line.split(','))
          .map(lambda values: (values[0], Vectors.dense(np.asarray(values[1:]).astype(np.float64))))
          .toDF(['id', 'features']))

In [7]:
cpm_df

DataFrame[id: string, features: vector]

In [8]:
pd.DataFrame(cpm_df.take(5), columns=cpm_df.columns)

Unnamed: 0,id,features
0,0,"[0.179941670478, 0.192673380292, 0.16633547213..."
1,1,"[0.139389470106, 0.140954515364, 0.09225391828..."
2,2,"[0.225183226125, 0.161582089438, 0.15019315241..."
3,3,"[0.17807286211, 0.135347010517, 0.114142604504..."
4,4,"[0.183854930206, 0.146498617435, 0.13645942448..."


## RPCA

In [9]:
!ls

anomaly-detection-on-time-series-via-rpca.ipynb  rpca-spark.ipynb
data						 rpca.svg
motion_detection.svg				 run_spark.sh
rpca						 spark-warehouse
rpca_spark


In [10]:
!mkdir rpca_spark

mkdir: cannot create directory ‘rpca_spark’: File exists


In [11]:
!touch rpca_spark/__init__.py

In [12]:
%%writefile ./rpca_spark/transformer.py

import numpy as np

from pyspark import keyword_only
from pyspark.ml import Estimator
from pyspark.ml.param import Params
from pyspark.ml.param import TypeConverters
from pyspark.ml.param.shared import HasInputCol
from pyspark.ml.param.shared import HasOutputCol
from pyspark.ml.param.shared import Param
from pyspark.mllib.linalg.distributed import IndexedRowMatrix
from pyspark.mllib.linalg.distributed import IndexedRow

class RPCA(Estimator, HasInputCol, HasOutputCol):
    
    method = Param(Params._dummy(), 'method', 'Output of rpca', typeConverter=TypeConverters.toString)
    mu = Param(Params._dummy(), 'mu', 'Parameter from the Augmented Lagrange Multiplier form', typeConverter=TypeConverters.toFloat)
    l = Param(Params._dummy(), 'l', 'Parameter mix', typeConverter=TypeConverters.toFloat)
    tol = Param(Params._dummy(), 'tol', 'Tolerance accuracy of matrix reconstruction', typeConverter=TypeConverters.toFloat)
    max_iter = Param(Params._dummy(), 'max_iter', 'Maximum number of iterations', typeConverter=TypeConverters.toInt)
    indexCol = Param(Params._dummy(), 'indexCol', 'Column used to index', typeConverter=TypeConverters.toString)
    
    @keyword_only
    def __init__(self, method='sparse', mu=None, l=None, tol=1E-7, max_iter=1000, inputCol=None, outputCol=None):
        super(RPCA, self).__init__()
        self._setDefault(method='sparse', mu=None, l=None, tol=1E-7, max_iter=1000, inputCol=None, outputCol=None)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
    
    @keyword_only
    def setParams(self, method='sparse', mu=None, l=None, tol=1E-7, max_iter=1000, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def setMethod(self, value):
        return self._set(method=value)
    
    def getMethod(self):
        return self.getOrDefault(self.method)
      
    def setMu(self, value):
        return self._set(mu=value)
    
    def getMu(self):
        return self.getOrDefault(self.mu)
        
    def setL(self, value):
        return self._set(l=value)
    
    def getL(self):
        return self.getOrDefault(self.l)
        
    def setTol(self, value):
        return self._set(tol=value)
    
    def getTol(self):
        return self.getOrDefault(self.tol)
    
    def setMaxIter(self, value):
        return self._set(max_iter=value)
    
    def getMaxIter(self):
        return self.getOrDefault(self.max_iter)
        
    def _fit(self, dataset):
        
        inputCol = self.getInputCol()
        outputCol = self.getOutputCol()
        
        ds_rdd = dataset.select(inputCol).rdd
        
        m = IndexedRowMatrix(ds_rdd)
        
        mu = self.getMu()
        l = self.getL()
        
        if not mu: 
            mu = 1.25 * mat.computeSVD(1).s
        
        print('mu:', mu)
             
        if not l:
            n_cols = mat.numCols()
            n_rows = mat.numRows()
            l = 1.0 / np.sqrt(np.max((n_cols, n_rows))) 
            
        print('l:', l)
        
        pass

Overwriting ./rpca_spark/transformer.py


In [13]:
from rpca_spark.transformer import RPCA

In [14]:
rpca_model = RPCA(method='sparse', inputCol='features', outputCol='sparse')

In [15]:
# indexed row matrix

from pyspark.mllib.linalg.distributed import IndexedRowMatrix
from pyspark.mllib.linalg.distributed import IndexedRow

m = IndexedRowMatrix(cpm_df.rdd.map(lambda row: IndexedRow(row.id, row.features)))

n_cols = m.numCols()
n_rows = m.numRows()

n_rows, n_cols

(9, 168)

In [16]:
# mu

mu = 1.25 * m.computeSVD(1).s[0]
mu

17.64458037175552

In [17]:
# l

l = 1.0 / np.sqrt(np.max((n_cols, n_rows)))
l

0.077151674981045956

In [18]:
# sgn spectral

m_sign = IndexedRowMatrix(cpm_df.rdd.map(lambda row: IndexedRow(row.id, np.sign(row.features))))
m_sign_spectral = m_sign.computeSVD(1).s[0]

m_sign_spectral

38.884444190447162

In [19]:
# sgn max abs

m_sign_maxabs = l**-1 * m_sign.rows.map(lambda row: np.sum(np.abs(row.vector))).max()
m_sign_maxabs

2177.5288746650413

In [20]:
norm_max = np.max([m_sign_maxabs, m_sign_spectral])
norm_max

2177.5288746650413

In [21]:
MYAE = m.rows.map(lambda row: (row.index, row.vector, Vectors.dense(np.sign(row.vector)) / norm_max, row.vector * 0.0, row.vector *0.0)).toDF(['id', 'M', 'Y', 'A', 'E']).rdd

In [22]:
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg import DenseMatrix

In [39]:
tol = 1E-7
max_iter = 1000
err = np.inf
i = 0

t = mu**-1
mat = MYAE.cache()

def _shrink(M, t):
    return np.sign(M) * np.maximum((np.abs(M) - t), np.zeros(M.shape))

def _compute_low_rank(U, S, V, mu):
    pass

def _compute_sparse()

while err > tol and i < max_iter:
    # SVD(M - E + Y * t)
    USV = (IndexedRowMatrix(mat.map(lambda x: IndexedRow(x.id, (x.M - x.E + x.Y * t))))
        .computeSVD(k=n_cols, computeU=True))
    
    U = USV.U
    S = USV.s.toArray()
    V = USV.V.toArray()
    
    print(U)
    
    shrinkage = _shrink(S, mu)
    T = np.dot(np.diag(shrinkage), V).flatten()
    print(T)
    
    
    
    
    # A = U S_shrink V^T)
    
    #SV = np.dot(np.diag(_shrink(S, mu)), V)
    #SV_rows, SV_cols = SV.shape
    
    #SV_dense = DenseMatrix(SV_rows, SV_cols, SV.flatten())
    #print(K)
    #A = U.rows.map(lambda x: IndexedRow(x.index, (x.vector * K)))
    
    

    break
    


<pyspark.mllib.linalg.distributed.IndexedRowMatrix object at 0x7f81040d55c0>
[ 0.  0.  0. ...,  0.  0.  0.]
