In [28]:
import numpy as np
import matplotlib.pyplot as plt
from tensor.operation.kruskal import kruskal
from tensor.operation.khatri_rao import khatri_rao
from tensor.operation.matricize import matricize
from tensor.operation.sampled_khatri_rao_prod import SKR
from tensor.operation.sampled_matricize import Sampled_Matricize
from tensor.operation.right_pseudo_inverse import right_pseudo_inverse
import tensorly as tly
from tensorly.tenalg import mode_dot
from scipy.linalg import hadamard
import math

Algorithm: CP RAND

$\text{Input:}$ $\mathcal{X} \in \mathbb{R}^{I_1 \times \cdots \times I_N}$, $\mathbf{R}$, $\mathbf{S}$

$\text{Output:}$ $\boldsymbol{\lambda}$, $\{\mathbf{A}^{(n)}\}$

$
\textbf{CPRAND} (\mathcal{X}, \mathbf{R}, \mathbf{S}):\\
\quad \text{Initialize factor matrices } \mathbf{A}^{(2)}, \dots, \mathbf{A}^{(N)}\\
\quad \textbf{repeat}:\\
\quad \quad \text{for } n = 1, \dots, N \text{ do}:\\
\quad \quad \quad \text{Define sampling operator} \mathbf{S} \in \mathbb{R}^{S \times \prod_{m \neq n} I_m}\\
\quad \quad \quad \mathbf{Z}_S \gets \text{SKR}(\mathbf{S}, \mathbf{A}^{(1)}, \dots, \mathbf{A}^{(n-1)}, \mathbf{A}^{(n+1)}, \dots, \mathbf{A}^{(N)})\\
\quad \quad \quad \mathbf{X}^T_S \gets \mathbf{S} \mathbf{X}^T_{(n)}\\
\quad \quad \quad \mathbf{A}^{(n)} \gets \underset{\mathbf{A}}{\arg \min} \lVert \mathbf{Z}_S \mathbf{A}^T - \mathbf{X}^T_S \rVert_F\\
\quad \quad \quad \text{Normalize columns of } \mathbf{A}^{(n)} \text{ and update } \boldsymbol{\lambda}\\
\quad \quad \textbf{end for}\\
\quad \textbf{until convergence}\\
\quad \text{Return } \boldsymbol{\lambda}, \{\mathbf{A}^{(n)}\}\\
$


In [29]:
class CP_rand_mix:
    def __init__(
        self,
        tensor: np.ndarray,
        rank: int,
        max_iter=10000,
        eps=0.01,
        initalisation: str = "random",
    ):
        """
        PARAFAC decompositon, it initializes the maximum amount of iterations that
        are done along with the tolerance value for the convergence
        it also takes the size of the core tensor rank
        """
        self.tensor = tensor
        self.mixed_tensor = None

        self.rank = rank
        self.max_iter = max_iter
        self.eps = eps
        self.init_type = initalisation
        self.errors = []

        self.factors = []
        self.mixed_factors = [None for _ in range(self.tensor.ndim)]

        self.lamda = np.ones(self.rank)

        # Diagonal matrices for sign-flip operation, diagnol entries = 1 or -1 randomly, size: (tensor.shape[i], tensor.shape[i])
        self.D = []
        for i in range(self.tensor.ndim):
            diag_signs = np.random.choice([-1, 1], size=self.tensor.shape[i])
            self.D.append(np.diag(diag_signs))
            assert self.D[-1].shape == (self.tensor.shape[i], self.tensor.shape[i])

        self.F = []
        for i in range(self.tensor.ndim):
            self.F.append(np.random.rand(self.tensor.shape[i], self.tensor.shape[i]))
            self.F.append(hadamard(self.tensor.shape[i]))
            # assert self.F[-1].shape == (self.tensor.shape[i], self.tensor.shape[i])

    def fit(self, check_convergence=True):
        self._init_factors()
        for i in range(self.max_iter):
            for mode in range(self.tensor.ndim):
                self._update_factors(mode)
            if check_convergence and self._is_converged():
                break
        print(f"Converged in {i+1} iterations")

    def _init_factors(self):
        """
        initialize the factors using the `rank` many left singular
        vectors of the corresponding mode-n matricization of the input tensor
        """
        if self.init_type == "random":
            self.factors = [
                np.random.rand(self.tensor.shape[i], self.rank)
                for i in range(self.tensor.ndim)
            ]
        # elif self.init_type == 'hosvd':
        #     self.factors = []
        #     for i in range(self.tensor.ndim):
        #         M = np.linalg.svd(matricize(self.tensor, i))[0]
        #         if M.shape[1] < self.rank:
        #             M_ = np.zeros((M.shape[0], self.rank - M.shape[1]))
        #             M = np.concatenate((M, M_), axis=1)
        #         else:
        #             M = M[:, :self.rank]
        #         self.factors.append(M)
        else:
            raise Exception("Invalid initialisation method")

        # mix the factor matrices
        for i in range(self.tensor.ndim):
            self.mixed_factors[i] = self.F[i] @ self.D[i] @ self.factors[i]
            assert self.mixed_factors[i].shape == (self.tensor.shape[i], self.rank)

        # mix the tensor
        self.mixed_tensor = self.tensor
        for i in range(self.tensor.ndim):
            self.mixed_tensor = mode_dot(self.mixed_tensor, self.F[i] @ self.D[i], i)
        assert self.mixed_tensor.shape == self.tensor.shape

    def _update_factors(self, mode):
        """
        Update the factors per iteration for the (mode+1)th Factor
        """
        tot_row = 1  # total number of rows in the mode-{mode} matricization
        for x in range(self.tensor.ndim):
            if x != mode:
                tot_row *= self.tensor.shape[x]
        S = 10 * int(math.log(self.rank)) * self.rank

        sel_rows = np.random.choice(tot_row, S, replace=True)

        Z_s_hat = SKR(sel_rows, self.mixed_factors, mode)
        assert Z_s_hat.shape == (S, self.rank)

        # complex conjugate of F[mode]
        F_conj = np.conj(self.F[mode])
        assert F_conj.shape == (self.tensor.shape[mode], self.tensor.shape[mode])

        X_s_hat = self.D[mode] @ F_conj @ Sampled_Matricize(sel_rows, self.mixed_tensor, mode)
        assert X_s_hat.shape == (self.tensor.shape[mode], S)

        rpi = right_pseudo_inverse(Z_s_hat.T)
        assert rpi.shape == (S, self.rank)
        self.factors[mode] = X_s_hat @ rpi
        assert self.factors[mode].shape == (self.tensor.shape[mode], self.rank)

        col_norms = np.linalg.norm(self.factors[mode], axis=0)

        self.factors[mode] = self.factors[mode] / col_norms # normalize the self.factors[mode]
        self.mixed_factors[mode] = self.F[mode] @ self.D[mode] @ self.factors[mode]

        self.lamda = col_norms

    def _is_converged(self):
        """
        check if the algorithm has converged
        by computing the Frobenius norm of the difference between the current tensor
        and the tensor obtained by multiplying the factors
        """
        tmp = self.factors[0]

        self.factors[0] = self.factors[0] * self.lamda
        estim = kruskal(*self.factors)
        error = np.linalg.norm(self.tensor - estim)
        print("Error = ", error)
        self.errors.append(error)
        self.factors[0] = tmp
        return error < self.eps

    def plot_errors(self):
        plt.plot(self.errors)
        plt.legend(["Errors"])
        plt.xlabel("Iterations")
        plt.ylabel("Frobenius norm")
        plt.show()

In [30]:
X = np.random.rand(2, 2, 2)
R = 4
tly_ans = tly.decomposition.parafac(X, rank=R)
X_tly = tly.cp_to_tensor(tly_ans)
print(f"Error is: {np.linalg.norm(X - X_tly)}")
solver = CP_rand_mix(X, R, int(1e4), 0.01, "random")
solver.fit()
# print(factors[0].shape, factors[1].shape, factors[2].shape)

Error is: 1.0005808618753766e-14


UFuncTypeError: Cannot cast ufunc 'multiply' output from dtype('complex128') to dtype('float64') with casting rule 'same_kind'