In [49]:
import numpy as np
import matplotlib.pyplot as plt
# from jlinops import CGPseudoinverse, MatrixOperator
from jlinops import MatrixOperator, QRPseudoInverseOperator
from scipy.linalg import null_space

from scipy.sparse.linalg import cg as scipy_cg
from jlinops import cg

from scipy.linalg import qr as scipy_qr
from scipy.linalg import solve_triangular as scipy_solve_triangular
from scipy.sparse.linalg._interface import MatrixLinearOperator, _CustomLinearOperator
import scipy.sparse as sps

In [77]:
m, n = 8, 5
Amat = np.random.normal(size=(m,n))
Amat[:,-1] = Amat[:,-2]
Amat[:,2] = Amat[:,1]
A = MatrixOperator(Amat)
W = null_space(Amat)

# Basic pseudoinverse

In [78]:
class CGPseudoinverseOperator(_CustomLinearOperator):
    """Returns a linear operator that approximately computes the pseudoinverse of a matrix A using 
    a conjugate gradient method.
    """

    def __init__(self, operator, warmstart_prev=False, which="jlinops", *args, **kwargs):

        assert which in ["jlinops", "scipy"], "Invalid choice for which!"

        # Store operator
        self.original_op = operator

        # Setup
        self.which = which
        self.in_shape = self.original_op.shape[0]
        self.out_shape = self.original_op.shape[1]
        self.prev_eval = np.zeros(self.out_shape)
        self.prev_eval_t = np.zeros(self.in_shape)
        self.warmstart_prev = warmstart_prev

        # Build both operators we need
        self.AtA = self.original_op.T @ self.original_op
        self.AAt = self.original_op @ self.original_op.T

        # Define matvec and rmatvec
        def _matvec(x):
            if self.which == "scipy":
                sol, converged = scipy_cg(self.AtA, self.original_op.T @ x, x0=self.prev_eval, *args, **kwargs) 
                assert converged == 0, "CG algorithm did not converge!"
            elif self.which == "jlinops":
                solver_data = cg(self.AtA, self.original_op.T @ x, x0=self.prev_eval, *args, **kwargs)
                sol = solver_data["x"]
            else:
                raise ValueError

            if self.warmstart_prev:
                self.prev_eval = sol.copy()

            return sol
        
        def _rmatvec(x):
            if self.which == "scipy":
                sol, converged = scipy_cg(self.AAt, self.original_op @ x, x0=self.prev_eval_t, *args, **kwargs) 
                assert converged == 0, "CG algorithm did not converge!"
            elif self.which == "jlinops":
                solver_data = cg(self.AAt, self.original_op @ x, x0=self.prev_eval_t, *args, **kwargs)
                sol = solver_data["x"]
            else:
                raise ValueError

            if self.warmstart_prev:
                self.prev_eval_t = sol.copy()
                
            return sol
        
        super().__init__( (self.out_shape, self.in_shape), _matvec, _rmatvec )


In [79]:
Apinv = CGPseudoinverseOperator(A)

In [80]:
z = np.random.normal(size=8)
np.linalg.norm( Apinv.matvec( z ) - np.linalg.pinv(Amat) @ z )

7.580880525408319e-16

In [81]:
z = np.random.normal(size=5)
np.linalg.norm( Apinv.rmatvec( z ) - np.linalg.pinv(Amat).T @ z )

7.480440612324231e-16

# Special basic pseudoinverse

In [82]:
class CGModPseudoinverseOperator(_CustomLinearOperator):
    """Returns a linear operator that approximately computes the pseudoinverse of a matrix A using 
    a conjugate gradient method. Modifed so that it only ever solves systems with A^T A. 
    """

    def __init__(self, operator, W, Wpinv, warmstart_prev=False, which="jlinops", *args, **kwargs):

        assert which in ["jlinops", "scipy"], "Invalid choice for which!"

        # Store operator
        self.original_op = operator
        self.W = W
        self.Wpinv = Wpinv

        # Setup
        self.which = which
        self.in_shape = self.original_op.shape[0]
        self.out_shape = self.original_op.shape[1]
        self.prev_eval = np.zeros(self.out_shape)
        self.prev_eval_t = np.zeros(self.out_shape)
        self.warmstart_prev = warmstart_prev

        # Build both operators we need
        self.AtA = self.original_op.T @ self.original_op
        self.AAt = self.original_op @ self.original_op.T

        # Define matvec and rmatvec
        def _matvec(x):
            if self.which == "scipy":
                sol, converged = scipy_cg(self.AtA, self.original_op.T @ x, x0=self.prev_eval, *args, **kwargs) 
                assert converged == 0, "CG algorithm did not converge!"
            elif self.which == "jlinops":
                solver_data = cg(self.AtA, self.original_op.T @ x, x0=self.prev_eval, *args, **kwargs)
                sol = solver_data["x"]
            else:
                raise ValueError

            if self.warmstart_prev:
                self.prev_eval = sol.copy()

            return sol
        
        def _rmatvec(x):

            # Project x onto range(A^T A) = range(A^T).
            x = x - (self.W @ (self.Wpinv @ x))

            if self.which == "scipy":
                sol, converged = scipy_cg(self.AtA, x, x0=self.prev_eval_t, *args, **kwargs) 
                assert converged == 0, "CG algorithm did not converge!"
            elif self.which == "jlinops":
                solver_data = cg(self.AtA, x, x0=self.prev_eval_t, *args, **kwargs)
                sol = solver_data["x"]
            else:
                raise ValueError

            if self.warmstart_prev:
                self.prev_eval_t = sol.copy()
                
            return self.original_op @ sol
        
        super().__init__( (self.out_shape, self.in_shape), _matvec, _rmatvec )


In [83]:
W = null_space(Amat)
W = MatrixOperator(W)
Wpinv = QRPseudoInverseOperator(W)

In [84]:
Apinv = CGModPseudoinverseOperator(A, W, Wpinv)

In [85]:
z = np.random.normal(size=8)
np.linalg.norm( Apinv.matvec( z ) - np.linalg.pinv(Amat) @ z )

5.689046731696577e-16

In [86]:
z = np.random.normal(size=5)
np.linalg.norm( Apinv.rmatvec( z ) - np.linalg.pinv(Amat).T @ z )

1.4038533086086518e-15

In [87]:
Wpinv.Q_fac

array([[ 0.00000000e+00, -3.92523115e-17],
       [ 7.07106781e-01, -1.94289029e-16],
       [-7.07106781e-01,  1.38777878e-16],
       [-7.21644966e-16, -7.07106781e-01],
       [-3.88578059e-16,  7.07106781e-01]])

In [88]:
Wpinv.R_fac

array([[-1.00000000e+00,  2.35513869e-16],
       [ 0.00000000e+00,  1.00000000e+00]])

In [89]:
Wpinv

<2x5 QRPseudoInverseOperator with dtype=float64>

# Preconditioned CG pseudoinverse

In [90]:
class CGPreconditionedPseudoinverseOperator(_CustomLinearOperator):
    """Returns a linear operator that approximately computes the pseudoinverse of a matrix A using 
    a conjugate gradient method. Modifed so that it only ever solves systems with A^T A. 
    """

    def __init__(self, operator, W, Wpinv, Lpinv, Ltpinv, warmstart_prev=False, which="jlinops", *args, **kwargs):

        assert which in ["jlinops", "scipy"], "Invalid choice for which!"

        # Store operator
        self.original_op = operator
        self.W = W
        self.Wpinv = Wpinv
        self.Lpinv = Lpinv
        self.Ltpinv = Ltpinv

        # Setup
        self.which = which
        self.in_shape = self.original_op.shape[0]
        self.out_shape = self.original_op.shape[1]
        self.prev_eval = np.zeros(self.out_shape)
        self.prev_eval_t = np.zeros(self.out_shape)
        self.warmstart_prev = warmstart_prev

        # Build both operators we need
        self.AtA = self.original_op.T @ self.original_op
        self.Q = self.Lpinv @ self.AtA @ self.Ltpinv

        # Define matvec and rmatvec
        def _matvec(x):
            if self.which == "scipy":
                sol, converged = scipy_cg(self.Q, self.Lpinv @ (self.original_op.T @ x), x0=self.prev_eval, *args, **kwargs) 
                assert converged == 0, "CG algorithm did not converge!"
            elif self.which == "jlinops":
                solver_data = cg(self.Q, self.Lpinv @ (self.original_op.T @ x), x0=self.prev_eval, *args, **kwargs)
                sol = solver_data["x"]
            else:
                raise ValueError

            if self.warmstart_prev:
                self.prev_eval = sol.copy()

            return self.Ltpinv @ sol
        
        def _rmatvec(x):

            # Project x onto range(A^T A) = range(A^T).
            x = x - (self.W @ (self.Wpinv @ x))

            if self.which == "scipy":
                sol, converged = scipy_cg(self.Q, self.Lpinv @ x, x0=self.prev_eval_t, *args, **kwargs) 
                assert converged == 0, "CG algorithm did not converge!"
            elif self.which == "jlinops":
                solver_data = cg(self.Q, self.Lpinv @ x, x0=self.prev_eval_t, *args, **kwargs)
                sol = solver_data["x"]
            else:
                raise ValueError

            if self.warmstart_prev:
                self.prev_eval_t = sol.copy()
                
            return self.original_op @ (self.Ltpinv @ sol)
        
        super().__init__( (self.out_shape, self.in_shape), _matvec, _rmatvec )
