Depricate previous code.

Generate a dictionary of top k eigenvectors for each epoch.

In [1]:
import numpy as np

# 设置 NumPy 随机种子
np.random.seed(42)

def generate_eigenvector_dict():
    num_epochs = 3  # 迭代次数
    dim = 10  # 向量维度
    num_vectors = 5  # 始终保持 5 个特征向量
    recorded_steps_top_eigenvectors = {}

    # 生成一个初始的正交基（使用 QR 分解确保正交）
    base_vectors, _ = np.linalg.qr(np.random.randn(dim, num_vectors))  # 10 × 5 的正交矩阵
    
    for k in range(num_epochs):
        if k == 0:
            recorded_steps_top_eigenvectors[k] = base_vectors.copy()
        else:
            # 计算要保留的列数
            num_shared = max(num_vectors - k, 0)    # 计算需要保留的特征向量数量
            num_new = num_vectors - num_shared  # 计算需要替换的特征向量数量
            
            # 继承前一个 epoch 的部分特征向量
            shared_vectors = recorded_steps_top_eigenvectors[k-1][:, :num_shared] if num_shared > 0 else np.empty((dim, 0))
            
            # 生成新的特征向量
            new_vectors = np.random.randn(dim, num_new)
            
            # 组合最终的特征矩阵
            recorded_steps_top_eigenvectors[k], _ = np.linalg.qr(np.hstack((shared_vectors, new_vectors)))  # QR分解，前几列已经正交，不会改变

    return recorded_steps_top_eigenvectors

# 生成特征向量字典
recorded_steps_top_eigenvectors = generate_eigenvector_dict()

# 打印前 ? 个 epoch 的特征矩阵
for epoch in range(3):
    print(f"Epoch {epoch} 特征矩阵:\n", recorded_steps_top_eigenvectors[epoch], "\n")


Epoch 0 特征矩阵:
 [[-0.20433962  0.01774928 -0.21280398  0.35336356  0.21267685]
 [ 0.0963199  -0.52254612 -0.37396241 -0.200287   -0.09229253]
 [ 0.19064203  0.18541408 -0.13898791 -0.56814509  0.1919557 ]
 [ 0.23131538  0.37702749 -0.16801856 -0.27        0.05892778]
 [-0.60294259 -0.00950348  0.13019713 -0.53376948 -0.02739447]
 [-0.04563164  0.38436437 -0.1200868  -0.23065281 -0.40864833]
 [ 0.24753171 -0.59366849 -0.09533138 -0.268181   -0.06648195]
 [ 0.50223399  0.00085284  0.67768789 -0.10739636  0.16728233]
 [-0.3037924  -0.10161923  0.12161616 -0.12750935  0.73237177]
 [ 0.29613147  0.1987628  -0.5027993   0.03881013  0.41160129]] 

Epoch 1 特征矩阵:
 [[-0.20433962  0.01774928 -0.21280398  0.35336356  0.02923768]
 [ 0.0963199  -0.52254612 -0.37396241 -0.200287    0.13338619]
 [ 0.19064203  0.18541408 -0.13898791 -0.56814509 -0.61109847]
 [ 0.23131538  0.37702749 -0.16801856 -0.27        0.15475094]
 [-0.60294259 -0.00950348  0.13019713 -0.53376948  0.34686508]
 [-0.04563164  0.38436

In [2]:
print(len(recorded_steps_top_eigenvectors))

3


## Top Down Search

In [None]:
import numpy as np
from functools import wraps
# Binary Search for initial k_0
def Binary_Search(previous, current, k_0, tau=0.98):
    # tau = 0.95
    # Lower and upper bounds for k
    left, right = 0, k_0 - 1
    
    while left < right:
        mid = left + (right - left + 1) // 2  # 取较大中点，防止死循环
        matrix = np.matmul(previous[:, :mid].T, current[:, :mid])
        _, sigma, _ = np.linalg.svd(matrix)
        
        # Check if all singular values are within [0, 1]
        assert np.all((sigma >= -0.001) & (sigma <= 1.001)), "Eigenvectors are not ortho-normalized!"
    
        if np.min(sigma) < tau:
            right = mid - 1
        else:
            left = mid  
    
    return left + 1

# Define a decorator
def optimize_k0(func):
    @wraps(func)
    def wrapper(recorded_steps_top_eigenvectors, tau=0.98, k_0=None):
        sorted_steps = sorted(recorded_steps_top_eigenvectors.keys())
            
        if len(sorted_steps) < 2:
            raise ValueError("At least two recorded steps are required for Top-K search.")
        
        if k_0 is None:
        # 选择 epoch_0 和 epoch_1
            epoch_0 = recorded_steps_top_eigenvectors[sorted_steps[0]]
            epoch_1 = recorded_steps_top_eigenvectors[sorted_steps[1]]
            k_0 = epoch_0.shape[1]  # 先给 k_0 赋值
        
            # 计算初始 k_0
            optimized_k0 = Binary_Search(epoch_0, epoch_1, k_0, tau)
        else:
            optimized_k0 = k_0  # 直接使用提供的 k_0
            
        # 调用被装饰的 Top_Down 方法，并传入优化后的 k_0
        return func(recorded_steps_top_eigenvectors, optimized_k0, sorted_steps, tau)
    
    return wrapper

# Use the decorator
@optimize_k0

def Top_Down(recorded_steps_top_eigenvectors, tau=0.98, k_0=None, sorted_steps=None):
    if sorted_steps is None:  
        sorted_steps = sorted(recorded_steps_top_eigenvectors.keys())
        
    num_steps = len(sorted_steps) 
    # tau = 0.95 # Tolerance 
    k = int(k_0)  # Initialize k
    trajectory = {}
    trajectory[0] = k_0
    trajectory[1] = k_0 
    
    for i in range(2, num_steps):
        current_step = sorted_steps[i]
        current = recorded_steps_top_eigenvectors[current_step][:, :k]   # Current top k eigenvectors
        
        previous_step = sorted_steps[i - 1]
        previous = recorded_steps_top_eigenvectors[previous_step][:,:k]  # Previous top k eigenvectors
            
        for d in range(trajectory[i-1]):
                
            current_subspace = current[:, :trajectory[i-1]-d]  # Current subspace, top k-d eigenvectors
            previous_subspace = previous[:, :trajectory[i-1]-d]  # Previous subspace, top k-d eigenvectors
                
            # Compute the similarity between the current and previous subspaces
            matrix = np.matmul(current_subspace.T, previous_subspace)
            _, sigma, _ = np.linalg.svd(matrix)
                
            # Check if all singular values are within [0, 1]
            assert np.all((sigma >= -0.001) & (sigma <= 1.001)), "Eigenvectors are not ortho-normalized!"
                
            min_sigma = np.min(sigma)
                
            # If the minimal singular value is less then tau, then we reduce the dimensionality by d (one reduces dimension from top k to top k-d)
                
            # If the minimal singular value is greater than tau, which means the two subspaces are aligned, then we stop the loop and return the current k.
            if min_sigma > tau:
                trajectory[i] = trajectory[i-1] - d
                break
                
            
            
    return trajectory

In [6]:
tau = 0.98
k = Top_Down(recorded_steps_top_eigenvectors, tau, k_0=None)

print(k) 

{0: 5, 1: 5, 2: 3}


In [None]:
import numpy as np
# import wandb

def Top_K_Search(recorded_steps_top_eigenvectors):
    
    sorted_steps = sorted(recorded_steps_top_eigenvectors.keys())  # Epoch
    num_steps = len(sorted_steps)
    
    if num_steps < 2:
        raise ValueError("At least two recorded steps are required for Top-K search.")
    
    # Initialize k
    k = recorded_steps_top_eigenvectors[sorted_steps[0]].shape[1]  # Number of columns
    # k = k_0
    tau = 0.98 # Tolerance
    
    for i in range(num_steps):
        current_step = sorted_steps[i]
        current = recorded_steps_top_eigenvectors[current_step][:, :k]   # Current top k eigenvectors
        if i > 0:
            previous_step = sorted_steps[i - 1]
            previous = recorded_steps_top_eigenvectors[previous_step][:,:k]  # Previous top k eigenvectors
            
            for d in range(k):
                
                current_subspace = current[:, :k-d]  # Current subspace, top k-d eigenvectors
                previous_subspace = previous[:, :k-d]  # Previous subspace, top k-d eigenvectors
                
                # Compute the similarity between the current and previous subspaces
                matrix = np.matmul(current_subspace.T, previous_subspace)
                _, sigma, _ = np.linalg.svd(matrix)
                
                # Check if all singular values are within [0, 1]
                assert np.all((sigma >= -0.001) & (sigma <= 1.001)), "Eigenvectors are not ortho-normalized!"
                
                min_sigma = np.min(sigma)
                
                # If the minimal singular value is less then tau, then we reduce the dimensionality by d (one reduces dimension from top k to top k-d)
                
                # If the minimal singular value is greater than tau, which means the two subspaces are aligned, then we stop the loop and return the current k.
                if min_sigma > tau:
                    k = k-d
                    break

    return k

k = Top_K_Search(recorded_steps_top_eigenvectors)

print(k)    
        
        


3


## Bottom Up Search

In [None]:
import numpy as np
# import wandb

def Top_K_Search(recorded_steps_top_eigenvectors):
    
    sorted_steps = sorted(recorded_steps_top_eigenvectors.keys())  # Epoch
    num_steps = len(sorted_steps)
    
    if num_steps < 2:
        raise ValueError("At least two recorded steps are required for Top-K search.")
    
    # Initialize k
    k = recorded_steps_top_eigenvectors[sorted_steps[0]].shape[1]  # Number of columns
    # k = k_0
    tau = 0.98 # Tolerance
    
    for i in range(num_steps):
        current_step = sorted_steps[i]
        current = recorded_steps_top_eigenvectors[current_step][:, :k]   # Current top k eigenvectors
        if i > 0:
            previous_step = sorted_steps[i - 1]
            previous = recorded_steps_top_eigenvectors[previous_step][:,:k]  # Previous top k eigenvectors
            
            for d in range(k):
                
                current_subspace = current[:, :d+1]  # Current subspace
                previous_subspace = previous[:, :d+1]  # Previous subspace
                
                # Compute the similarity between the current and previous subspaces
                matrix = np.matmul(current_subspace.T, previous_subspace)
                _, sigma, _ = np.linalg.svd(matrix)
                
                # Check if all singular values are within [0, 1]
                assert np.all((sigma >= -0.001) & (sigma <= 1.001)), "Eigenvectors are not ortho-normalized!"
                
                # If the minimal singular value is less then tau, then we know that the two subspaces are not aligned, so we stop the loop and return the current k.
                min_sigma = np.min(sigma)
                
                if min_sigma < tau:
                    k = d
                    break

    return k

k = Top_K_Search(recorded_steps_top_eigenvectors)

print(k)    

3


这两个算法的主要区别是：

Bottom-up Search: 从低维空间开始，逐渐增加维度，直到找到两个子空间的某个维度不"对齐"，即它们最小奇异值小于某个阈值（如0.98）。
但是这个算法的要求很严格，举例来说：
$$
U = \begin{bmatrix} 
u_1, u_2, \dots, u_m
\end{bmatrix}
$$
$$
V = \begin{bmatrix} 
v_1, v_2, \dots, v_m
\end{bmatrix}
$$
我们从第一维开始，即$d=0$, $u_1$ 和 $v_1$，如果它们不"对齐"，那么我们就停止，返回当前维度 $k=d=0$。反之，如果 $u_1$ 和 $v_1$ "对齐"，那么我们就继续增加维度，然后比较 $d=1$， $u_1, u_2$ 和 $v_1, u_2$ 是否对齐，直到找到某个维度不"对齐"为止， 此时返回 $k=d$ 。

这个算法找到的 $k$ 满足以下性质:
- 对于任意 $d \leq k$， $u_1, u_2, \dots, u_d$ 和 $v_1, v_2, \dots, v_d$张成的子空间都是"对齐"的。

Top-down Search: 从高维空间开始，逐渐减少维度，直到找到两个子空间"对齐"，即它们最小奇异值大于某个阈值（如0.98）。
这个算法比较宽松，举例来说：
$$
U = \begin{bmatrix} 
u_1, u_2, \dots, u_m
\end{bmatrix}
$$
$$
V = \begin{bmatrix} 
v_1, v_2, \dots, v_m
\end{bmatrix}
$$

我们从第 $m$ 维开始，即$k=m$, 考察 $u_1, \dots, u_m$ 和 $v_1, \dots, v_m$ 张成的子空间。如果它们对齐，即它们最小奇异值大于某个阈值（如0.98），那么就停止，返回当前维度。反之，如果它们不对齐，则减小维度，即$k=m-1$，考察 $u_1, \dots, u_{m-1}$ 和 $v_1, \dots, v_{m-1}$ 张成的子空间，直到找到某个维度对齐为止。

这个算法找到的 $k$ 满足以下性质:
- 对于 $dim=k$， $u_1, u_2, \dots, u_k$ 和 $v_1, v_2, \dots, v_k$ 张成的子空间是"对齐"的。
- 但有可能发生对于某个 $d < k$， $u_1, u_2, \dots, u_d$ 和 $v_1, v_2, \dots, v_d$ 张成的子空间是 **"不对齐"** 的。

因此这个算法比较宽松。我们要考察的是空间的整体性质，因此 Top-down Search 是可能是更好的选择。

In [2]:
def binary_search_iterative(arr, target):
    left, right = 0, len(arr) - 1
    while left <= right:
        mid = left + (right - left) // 2
        if arr[mid] == target:
            return mid
        elif arr[mid] > target:
            right = mid - 1
        else:
            left = mid + 1
    return -1  # 未找到

# 示例
arr = [1, 3, 5, 7, 9, 11]
target = 7
print(binary_search_iterative(arr, target))  # 输出 3


KeyboardInterrupt: 

In [3]:
import numpy as np

def First_Step(recorded_steps_top_eigenvectors):
    sorted_steps = sorted(recorded_steps_top_eigenvectors.keys())
    epoch_0 = recorded_steps_top_eigenvectors[sorted_steps[0]]
    epoch_1 = recorded_steps_top_eigenvectors[sorted_steps[1]]
    
    k_0 = epoch_0.shape[1]  # 列数，索引范围 0 到 k_0-1
    
    left, right = 0, k_0 - 1  # 搜索索引范围 s-1
    
    while left < right:
        mid = left + (right - left + 1) // 2  # 取较大中点
        # 这里取 mid+1 列，因为 mid 为 s-1
        matrix = np.matmul(epoch_0[:, :mid+1].T, epoch_1[:, :mid+1])
        _, sigma, _ = np.linalg.svd(matrix)
        
        # 检查奇异值是否近似处于 [0, 1]
        assert np.all((sigma >= -0.001) & (sigma <= 1.001)), "Eigenvectors are not ortho-normalized!"
    
        if np.min(sigma) >= 0.98:
            left = mid  # 当前 s = mid+1 满足条件，尝试更大 s
        else:
            right = mid - 1  # 当前 s 不满足，缩小范围
            
    return left + 1  # 返回实际列数 s

# 模拟测试数据
# 我们构造两个 3x4 矩阵，分别代表 epoch_0 和 epoch_1
# 对于 s=1,2: 模拟奇异值为1.0；对于 s=3,4: 模拟奇异值低于阈值。
# 这里我们通过硬编码的方式来模拟 SVD 返回的最小奇异值。

class FakeMatrix:
    def __init__(self, s):
        self.s = s  # 列数

    @property
    def shape(self):
        return (3, self.s)

# 构造两个字典，键为字符串，值为 FakeMatrix 实例
recorded_steps_top_eigenvectors = {
    'epoch1': FakeMatrix(4),
    'epoch2': FakeMatrix(4)
}

# 重写 np.linalg.svd 来模拟不同 s 的情况
original_svd = np.linalg.svd
def fake_svd(matrix):
    s = matrix.shape[0] if matrix.shape[0] < matrix.shape[1] else matrix.shape[1]
    # 模拟 s=mid+1 的情况：如果 s <= 2, min sigma = 1.0; 如果 s >= 3, min sigma = 0.97.
    if s <= 2:
        sigma = np.ones(s)
    else:
        sigma = np.array([1.0]* (s - 1) + [0.97])
    return None, sigma, None

np.linalg.svd = fake_svd  # 覆盖

s_found = First_Step(recorded_steps_top_eigenvectors)
print("找到的 s =", s_found)  # 预期输出 2

# 恢复原始的 svd
np.linalg.svd = original_svd


TypeError: 'FakeMatrix' object is not subscriptable