## CP_APR, CP_ALS, HOSVD, & TUCKER_ALS Profiling

In [None]:
import cProfile
import glob
import os
import pstats
from typing import Callable, Optional, Union

from pyttb import cp_als, cp_apr, hosvd, import_data, tucker_als, tensor, sptensor

In [None]:
def get_algorithm_func(algorithm_name: str) -> Optional[Callable]:
    """
    Returns the corresponding function for the user-supplied algorithm name.

    Parameters
    ----------
    algorithm_name:
        The algorithm to profile. Should be 'cp_apr', 'cp_als', 'tucker_als', or 'hosvd'.

    Returns
    -------
    alg_func:
        The function corresponding to the algorithm.
    """

    # input validation
    func_handler = {
        "cp_apr": cp_apr,
        "cp_als": cp_als,
        "tucker_als": tucker_als,
        "hosvd": hosvd,
    }

    alg_func = func_handler.get(algorithm_name.lower())
    if alg_func is None:
        raise ValueError(f"'{algorithm_name}' is not a recognized algorithm.")
    return alg_func

In [None]:
def profile_alg(
    alg_func: Callable, 
    input_tensor: Union[sptensor, tensor], 
    test_file: str, 
    algorithm_name: str, 
    rank: Optional[int] = None,
    tol: Optional[float] = None, 
    verbosity: Optional[int] = None,
) -> None:
    """
    Profiles the performance of the specified algorithm and prints the statistics.

    Parameters
    ----------
    alg_func:
        The function to profile.
    input_tensor:
        The input data tensor provided to the alg_func.
    test_file:
        The name of the tensor file.
    algorithm_name:
        The name of the user-supplied algorithm.
    rank:
        The rank of the tensor decomposition.
    tol:
        Relative error to stop at for hosvd.
    verbosity:
        Print level for hosvd.
    """
    
    profiler = cProfile.Profile()
    profiler.enable()
    
    # hsovd will not have a provided rank in its function call.
    if rank:
        alg_func(input_tensor, rank)
    else:
        alg_func(input_tensor, tol=tol, verbosity=verbosity)

    profiler.disable()
    stats = pstats.Stats(profiler).sort_stats("cumulative") # sort stats.
    if rank: # rank isn't used for hosvd
        print(f"Test file: {test_file}, Rank: {rank}, Algorithm: {algorithm_name}")
    else:
        print(f"Test file: {test_file}, Algorithm: {algorithm_name}")
    stats.print_stats(10)

In [None]:
def profile(test_files, ranks, algorithm_name: str):
    """
    Profiles the performance of the cp and Tucker algorithms with a set of tensors from test_files and ranks.

    Parameters
    ----------
    test_files:
        A list of strings representing the file paths to the test tensors.
    ranks:
        A list of integers representing the tensor testing ranks.
    algorithm_name:
        The algorithm to profile. Should be either 'cp_apr' or 'cp_als'.
    """

    # obtain the appropriate function.
    alg_func = get_algorithm_func(algorithm_name)

    # choose only 'integer' files for cp_apr.
    if algorithm_name == "cp_apr":
        test_files = [tf for tf in test_files if "integer" in tf]
    # TODO: bypassing a "TypeError: unsupported operand type(s) for ** or pow(): 'sptensor' and 'int'."
    if algorithm_name == "hosvd":
        test_files = [tf for tf in test_files if "sparse" not in tf]

    for test_file in test_files:
        print("*" * 80)
        try:
            input_tensor = import_data(test_file)  # Load the tensor.

            # hosvd() doesn't accept 'ranks', so skip that input.
            if algorithm_name != "hosvd":
                # test across ranks.
                for rank in ranks:
                    try:
                        profile_alg(
                            alg_func, input_tensor, test_file, algorithm_name, rank
                        )
                    except Exception as e:
                        print(
                            f"Error when testing {os.path.basename(test_file)} with Rank = {rank} and Algorithm = {algorithm_name}: {type(e).__name__}: {e}"
                        )
            else:
                try:
                    profile_alg(
                        alg_func,
                        input_tensor,
                        test_file,
                        algorithm_name,
                        tol=1e-4,
                        verbosity=-1,
                    )
                except Exception as e:
                    print(
                        f"Error when testing {os.path.basename(test_file)} with Algorithm = {algorithm_name}: {type(e).__name__}: {e}"
                    )

        except Exception as e:
            print(
                f"Error when loading {os.path.basename(test_file)} for Algorithm = {algorithm_name}: {type(e).__name__}: {e}"
            )

In [None]:
# def profile(test_files, ranks, algorithm):
#     """
#     Profiles the performance of the cp and Tucker algorithms with a set of tensors from test_files and ranks.

#     Parameters
#     ----------
#     test_files:
#         A list of strings representing the file paths to the test tensors.
#     ranks:
#         A list of integers representing the tensor testing ranks.
#     algorithm:
#         The algorithm to profile. Should be either 'cp_apr' or 'cp_als'.
#     """

#     # declare uninitialzed 'rank' for hosvd() exception handling
#     rank = "*** No rank supplied for hosvd()***"

#     # obtain the appropriate function.
#     alg_func = get_algorithm_func(algorithm)

#     # choose only integer files for cp_apr.
#     if algorithm == "cp_apr":
#         test_files = [tf for tf in test_files if "integer" in tf]
#     # bypassing a "TypeError: unsupported operand type(s) for ** or pow(): 'sptensor' and 'int'."
#     if algorithm == "hosvd":
#         test_files = [tf for tf in test_files if "sparse" not in tf]

#     for test_file in test_files:
#         try:
#             print("*" * 50)
#             input_tensor = import_data(test_file)  # Load the tensor.

#             # initialize and enable a profiler.
#             profiler = cProfile.Profile()
#             profiler.enable()

#             # hosvd() accepts different inputs, so skip 'ranks' for it.
#             if algorithm != "hosvd":
#                 # test across ranks.
#                 for rank in ranks:
#                     try:
#                         alg_func(input_tensor, rank)
#                     # ensure the profiler is always disabled before the next profiler starts
#                     finally:
#                         profiler.disable()

#                     # sort the statistics based on cumulative time spent on funcs and sub-funcs
#                     stats = pstats.Stats(profiler).sort_stats("cumulative")
#                     print(
#                         f"Test file: {test_file}, Rank: {rank}, Algorithm: {algorithm}"
#                     )
#                     stats.print_stats(10)
#             else:
#                 try:
#                     disable_printing = -1
#                     alg_func(input_tensor, tol=1e-4, verbosity=disable_printing)
#                 # ensure the profiler is always disabled before the next profiler starts
#                 finally:
#                     profiler.disable()

#                 # sort the statistics based on cumulative time spent on funcs and sub-funcs
#                 stats = pstats.Stats(profiler).sort_stats("cumulative")
#                 print(f"Test file: {test_file}, Algorithm: {algorithm}")
#                 stats.print_stats(10)

#         except Exception as e:
#             print(
#                 f"Error when testing {os.path.basename(test_file)} with Rank = {rank} and Algorithm = {algorithm}: {type(e).__name__}: {e}"
#             )

In [None]:
ranks = [2, 3, 4]
test_files = glob.glob("data/*.tns")

In [None]:
# Runtimes -> cp_apr: ~5m40s; cp_als: ~9s; tucker_als: ~2m40s; hosvd: 4.6s
# profile(test_files, ranks, 'cp_apr')
# profile(test_files, ranks, "cp_als")
# profile(test_files, ranks, "tucker_als")
# profile(test_files, ranks, "hosvd")
# profile(test_files, ranks, "foo")