In [1]:
import os
import numpy as np
import cupy as cp
import cupyx.scipy.sparse as csp
from PIL import Image
from tqdm import tqdm
from typing import List
import package.myUtil as myUtil

In [2]:
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
cap_dates = {128: "241114", 256: "241205"}
n = 256
LAMBDA = 100
RATIO = 0.05
DO_THIN_OUT = False
DATA_PATH = "../data"
IMG_NAME = "hadamard"
CAP_DATE = cap_dates[n]
EXP_DATE = "241206"
DIRECTORY = f"{DATA_PATH}/{EXP_DATE}"
SETTING = f"{n}_p-{int(100*RATIO)}_lmd-{LAMBDA}"
if DO_THIN_OUT:
    SETTING = SETTING + "to"

if not os.path.exists(DIRECTORY):
    os.makedirs(DIRECTORY)
if not os.path.exists(DIRECTORY + "/systemMatrix"):
    os.makedirs(DIRECTORY + "/systemMatrix")
use_list = myUtil.get_use_list(n * n, RATIO)

In [None]:
G = myUtil.images2matrix(f"{DATA_PATH}/{IMG_NAME}{n}_cap_{CAP_DATE}/", use_list, thin_out=DO_THIN_OUT)
F = myUtil.images2matrix(f"{DATA_PATH}/{IMG_NAME}{n}_input/", use_list).astype(cp.int8)
M, K = G.shape
N, K = F.shape
print("G shape:", G.shape, "F shape:", F.shape, "M=", M, "N=", N, "K=", K)
print("G max:", G.max(), "G min:", G.min(), "F max:", F.max(), "F min:", F.min())

black = myUtil.calculate_bias(M, DATA_PATH, CAP_DATE)
B = cp.tile(black[:, None], K)

G = G - B

white_img = Image.open(f"{DATA_PATH}/capture_{CAP_DATE}/White.png").convert("L")
white = (cp.asarray(white_img) / 255).astype(cp.float32)
if DO_THIN_OUT:
    white = white[::2, ::2].ravel() - black
else:
    white = white.ravel() - black
H1 = cp.tile(white[:, None], K)

F_hat = 2 * F - 1
G_hat = 2 * G - H1
del F, G, H1
cp._default_memory_pool.free_all_blocks()

In [5]:
def prox_l122(Y: cp.ndarray, gamma: float, N: int) -> csp.csr_matrix:
    factor = (2 * gamma) / (1 + 2 * gamma * N)
    l1_norms = cp.sum(cp.absolute(Y), axis=1)
    X = cp.sign(Y) * cp.maximum(cp.absolute(Y) - factor * l1_norms[:, None], 0)
    return csp.csr_matrix(X)


def fista(
    F: cp.ndarray,
    G: cp.ndarray,
    lmd: float,
    N: int,
    M: int,
    chunk_size: int = 3000,
    max_iter: int = 150,
) -> cp.ndarray:
    """
    Solve the optimization problem using FISTA:
    min_h ||G - HF||_F^2 + lambda * ||H||_1,2^2
    """
    L = N
    print("L:", L)
    gamma = 1.0 / (L * 3)
    
    chunks: List[csp.csr_matrix] = []

    # M / chunk_size = 88
    for c in range(0, 44):
        start = c * chunk_size
        end = min((c + 1) * chunk_size, M)

        t = 1.0
        H_chunk = csp.csr_matrix((chunk_size, N), dtype=cp.float32)
        Y_chunk = cp.zeros((chunk_size, N), dtype=cp.float32)

        for i in tqdm(range(max_iter)):
            t_old = t
            H_chunk_old = H_chunk.copy()

            H_chunk = prox_l122(Y_chunk - gamma * (Y_chunk @ F - G[start:end, :]) @ F.T, gamma * lmd, N)
            t = (1 + np.sqrt(1 + 4 * t_old**2)) / 2
            Y_chunk = H_chunk + ((t_old - 1) / t) * (H_chunk - H_chunk_old)

        chunks.append(H_chunk)
        cp.get_default_memory_pool().free_all_blocks()

    H = csp.vstack(chunks).tocsr()

    return H

In [None]:
H = fista(F_hat, G_hat, LAMBDA, N, M)
print("H shape:", H.shape)

In [None]:
print(f"shape: {H.shape}, nnz: {H.nnz}({H.nnz / H.shape[0] / H.shape[1] * 100:.2f}%)")
H_np = {
    "data": cp.asnumpy(H.data),
    "indices": cp.asnumpy(H.indices),
    "indptr": cp.asnumpy(H.indptr),
    "shape": H.shape
}
np.savez(f"{DIRECTORY}/systemMatrix/H1_matrix_{SETTING}.npz", **H_np)
print(f"Saved {DIRECTORY}/systemMatrix/H1_matrix_{SETTING}.npz")