In [None]:

"""
IS-PCA (Week 1) — synthetic data + block-aware IS-PCA

Implements:
- generate_block_data: synthetic HDLSS generator with block-diagonal Σ
- ispca_known_blocks: IS-PCA given known blocks (Procedure 2 step 2)
- basic_pca: PCA baseline

Notes:
- Week 1 scope avoids automatic block detection. Blocks are known from the generator.
- Columns are centered. No scaling by default (aligns with paper's mean-zero setup).
- Block covariance per paper: Σ_i = (1-ω_i) I + 2 ω_i 1 1^T  (compound symmetry with spike).
"""
import numpy as np
from dataclasses import dataclass
from typing import List, Tuple, Optional

@dataclass
class SyntheticBlocks:
    X: np.ndarray                 # shape (n, p), column-centered
    blocks: List[np.ndarray]      # list of index arrays per block
    omegas: np.ndarray            # shape (b,), ω_i per block
    Sigma: np.ndarray             # population covariance, shape (p, p)

def _make_block_cov(p_i: int, omega: float) -> np.ndarray:
    # Σ_i = (1-ω) I + 2 ω 1 1^T
    I = np.eye(p_i)
    one = np.ones((p_i, 1))
    return (1.0 - omega) * I + 2.0 * omega * (one @ one.T)

def generate_block_data(
    n: int,
    p: int,
    b: int,
    block_sizes: Optional[List[int]] = None,
    omega_range: Tuple[float, float] = (0.1, 0.3),
    random_state: Optional[int] = 0,
) -> SyntheticBlocks:
    """
    Create HDLSS synthetic data with block-diagonal Σ.
    Blocks are contiguous by construction and returned as index arrays.
    """
    rng = np.random.default_rng(random_state)

    if block_sizes is None:
        # equal-sized blocks, last block gets the remainder
        base_size = p // b
        block_sizes = [base_size] * b
        block_sizes[-1] += p - base_size * b
    assert sum(block_sizes) == p, "block_sizes must sum to p"

    # Draw ω_i per block
    omegas = rng.uniform(omega_range[0], omega_range[1], size=b)

    # Build Σ block-diagonal
    blocks = []
    Sigma_blocks = []
    start = 0
    for sz, omega in zip(block_sizes, omegas):
        Sigma_i = _make_block_cov(sz, omega)
        Sigma_blocks.append(Sigma_i)
        idx = np.arange(start, start + sz, dtype=int)
        blocks.append(idx)
        start += sz
    Sigma = np.block([[Sigma_blocks[i] if i==j else np.zeros((block_sizes[i], block_sizes[j]))
                       for j in range(b)] for i in range(b)])

    # Sample X ~ N(0, Σ) row-wise (n samples)
    L = np.linalg.cholesky(Sigma + 1e-12 * np.eye(p))
    Z = rng.standard_normal(size=(n, p))
    X = Z @ L.T

    # Center columns
    X = X - X.mean(axis=0, keepdims=True)
    return SyntheticBlocks(X=X, blocks=blocks, omegas=omegas, Sigma=Sigma)

def ispca_known_blocks(
    X: np.ndarray,
    blocks: List[np.ndarray],
    k_per_block: Optional[List[int]] = None,
):
    """
    IS-PCA with known blocks.
    For each block, compute SVD(X_block) and pad V_i into a block-diagonal V_tilde.
    Return:
      Z, V_tilde, V_blocks, S_blocks
    Notes:
      - If k_per_block provided, truncate per-block right singular vectors to k_i.
      - Orthonormality holds within blocks; columns remain orthonormal overall.
    """
    n, p = X.shape
    V_tilde = np.zeros((p, p))
    V_blocks = []
    S_blocks = []

    total_cols = 0
    for bi, idx in enumerate(blocks):
        Xb = X[:, idx]  # shape (n, p_i)

        # economy SVD
        U, S, Vt = np.linalg.svd(Xb, full_matrices=False)
        V = Vt.T  # p_i x r

        if k_per_block is not None:
            k_i = int(min(k_per_block[bi], V.shape[1]))
            V = V[:, :k_i]
            S = S[:k_i]

        V_blocks.append(V.copy())
        S_blocks.append(S.copy())

        # place V into padded V_tilde columns
        cols = V.shape[1]
        V_tilde[np.ix_(idx, np.arange(total_cols, total_cols+cols))] = V
        total_cols += cols

    # Complete to an orthonormal basis if needed
    if total_cols < p:
        # Fill remaining columns with a QR orthonormal completion in each block
        for bi, idx in enumerate(blocks):
            p_i = idx.size
            used = V_blocks[bi].shape[1]
            if used < p_i:
                # Orthonormal complement within the block
                # Build block basis: [V | Q2]
                # Start with a random matrix and orthogonalize against V
                rng = np.random.default_rng(0)
                need = p_i - used
                Rnd = rng.standard_normal((p_i, need))
                # Orthogonalize
                Qv, _ = np.linalg.qr(V_blocks[bi])  # p_i x used
                # Remove projections on Qv
                R = Rnd.copy()
                if used > 0:
                    R = R - Qv @ (Qv.T @ R)
                Q2, _ = np.linalg.qr(R)
                V_tilde[np.ix_(idx, np.arange(total_cols, total_cols+need))] = Q2[:, :need]
                total_cols += need

    # Global orthonormalization for numerical stability
    V_tilde, _ = np.linalg.qr(V_tilde)

    Z = X @ V_tilde
    return Z, V_tilde, V_blocks, S_blocks

def basic_pca(X: np.ndarray, k: Optional[int] = None):
    """
    Center columns, compute economy SVD, return scores and loadings.
    """
    Xc = X - X.mean(axis=0, keepdims=True)
    U, S, Vt = np.linalg.svd(Xc, full_matrices=False)
    if k is not None:
        U, S, Vt = U[:, :k], S[:k], Vt[:k, :]
    scores = U * S  # n x k
    loadings = Vt.T # p x k
    return scores, loadings, S
