In [17]:
import numpy as np
import matplotlib.pyplot as plt
import time

# LinUCB Agent (Baseline - Full rank)
class LinUCBAgent:
    def __init__(self, d, alpha, lambda_reg):
        self.d = d
        self.alpha = alpha 
        self.lambda_reg = lambda_reg
        self.theta_hat = np.zeros(d, dtype=np.float64)
        self.G = lambda_reg * np.eye(d, dtype=np.float64)
        self.S = np.zeros(d, dtype=np.float64)

    def select(self, action_set_t):  
        P = action_set_t @ self.theta_hat
        # Compute uncertainty: sqrt(x^T G^{-1} x) for each action
        V = np.linalg.solve(self.G, action_set_t.T)
        uncertainties = np.sqrt(np.maximum(0, np.sum(action_set_t * V.T, axis=1)))
        ucb_scores = P + self.alpha * uncertainties
        return np.argmax(ucb_scores)

    def update_all(self, a, reward):
        self.G += np.outer(a, a)
        self.S += reward * a
        self.theta_hat = np.linalg.solve(self.G, self.S)


In [18]:
# SketchedLinUCB Agent (Rank-r approximation using Frequent Directions)
class SketchedLinUCBAgent:
    """
    SketchedLinUCB using Frequent Directions algorithm for rank-r approximation.
    """
    def __init__(self, d, m, gamma, lambda_reg=1.0):
        """
        d: Context dimension
        m: Sketch rank (approximation rank)
        gamma: Exploration parameter
        lambda_reg: Regularization parameter
        """
        self.d = d
        self.m = m  # Sketch rank
        self.gamma = gamma
        self.lambda_reg = lambda_reg
        
        # Initialize sketch buffer (2m x d)
        self.Z = np.zeros((2 * m, d), dtype=np.float64)
        # Reward vector
        self.b = np.zeros(d, dtype=np.float64)
        # Spectral tail / adaptive regularizer
        self.alpha = float(lambda_reg)
        # Current row index in sketch buffer
        self.row_idx = 0
        
    def _compute_theta_hat(self):
        """Compute theta_hat using the sketched covariance approximation."""
        if self.row_idx == 0:
            # No data yet, return zeros
            return np.zeros(self.d, dtype=np.float64)
        
        # Get the active part of the sketch
        Z_active = self.Z[:self.row_idx, :]
        
        ZZt = Z_active @ Z_active.T
        M = ZZt + max(self.alpha, 1e-8) * np.eye(self.row_idx, dtype=np.float64)
        Zb = Z_active @ self.b
        y = np.linalg.solve(M, Zb)
        theta_hat = (1.0 / max(self.alpha, 1e-8)) * (self.b - Z_active.T @ y)
        theta_hat = np.nan_to_num(theta_hat, nan=0.0, posinf=1e6, neginf=-1e6)
        
        return theta_hat
    
    def _compute_uncertainty(self, x):
        """Compute uncertainty bonus for action x."""
        if self.row_idx == 0:
            # No data yet, use simple uncertainty
            x_norm_sq = np.dot(x, x)
            alpha_safe = max(self.alpha, 1e-8)
            return np.sqrt(max(0, x_norm_sq / alpha_safe))
        
        Z_active = self.Z[:self.row_idx, :]
        ZZt = Z_active @ Z_active.T
        M = ZZt + max(self.alpha, 1e-8) * np.eye(self.row_idx, dtype=np.float64)
        Zx = Z_active @ x
        y = np.linalg.solve(M, Zx)
        xZtM_invZx = x @ Z_active.T @ y
        width_sq = (1.0 / max(self.alpha, 1e-8)) * (np.dot(x, x) - xZtM_invZx)
        width = np.sqrt(max(0, width_sq))
        width = np.nan_to_num(width, nan=0.0, posinf=1e6)
        
        return width
    
    def select(self, action_set_t):
        theta_hat = self._compute_theta_hat()
        P = action_set_t @ theta_hat
        uncertainties = np.array([self._compute_uncertainty(x) for x in action_set_t])
        ucb_scores = P + self.gamma * uncertainties
        return np.argmax(ucb_scores)
    
    def _update_sketch(self):
        """Update sketch using Frequent Directions when buffer is full."""
        if self.row_idx < 2 * self.m:
            return
        
        # Compute SVD of current sketch
        Z_full = self.Z[:self.row_idx, :]  # (2m x d)
        U, Sigma, Vt = np.linalg.svd(Z_full, full_matrices=False)
        # U: (2m x min(2m, d)), Sigma: (min(2m, d),), Vt: (min(2m, d) x d)
        
        # Get water level: m-th squared singular value
        # If we have fewer than m singular values, use 0
        if len(Sigma) < self.m:
            delta = 0.0
        else:
            delta = Sigma[self.m - 1] ** 2
        
        self.alpha = max(self.alpha + delta, 1e-8)
        Sigma_new = np.sqrt(np.maximum(0, Sigma ** 2 - delta))
        Sigma_new = np.nan_to_num(Sigma_new, nan=0.0, posinf=0.0)
        
        # Reconstruct Z with top m components
        if len(Sigma_new) >= self.m:
            Z_new = Sigma_new[:self.m, np.newaxis] * Vt[:self.m, :]
        else:
            Sigma_padded = np.zeros(self.m, dtype=np.float64)
            Sigma_padded[:len(Sigma_new)] = Sigma_new
            Vt_padded = np.zeros((self.m, self.d), dtype=np.float64)
            Vt_padded[:Vt.shape[0], :] = Vt
            Z_new = Sigma_padded[:, np.newaxis] * Vt_padded
        Z_new = np.nan_to_num(Z_new, nan=0.0, posinf=1e6, neginf=-1e6)
        
        # Reset sketch buffer
        self.Z = np.zeros((2 * self.m, self.d), dtype=np.float64)
        self.Z[:self.m, :] = Z_new
        self.row_idx = self.m
    
    def update_all(self, a, reward):
        self.b += reward * a
        self.Z[self.row_idx, :] = a
        self.row_idx += 1
        self._update_sketch()

In [19]:
def evaluate_agent(agent, action_set, true_theta, T, noise_std=0.1, seed=42):
    """Evaluate agent and return regret and latency."""
    np.random.seed(seed)
    K = action_set.shape[0]
    cumulative_regret = []
    total_regret = 0.0
    total_time = 0.0
    
    for t in range(T):
        start = time.perf_counter()
        arm_idx = agent.select(action_set)
        total_time += time.perf_counter() - start
        
        arm_features = action_set[arm_idx]
        true_reward = np.dot(arm_features, true_theta)
        observed_reward = true_reward + np.random.normal(0, noise_std)
        
        best_reward = max([np.dot(action_set[i], true_theta) for i in range(K)])
        regret = best_reward - observed_reward
        total_regret += regret
        cumulative_regret.append(total_regret)
        
        start = time.perf_counter()
        agent.update_all(arm_features, observed_reward)
        total_time += time.perf_counter() - start
    
    return cumulative_regret, total_time / T

In [20]:
def compare_methods(d=1024, K=10, T=1000, alpha=1.0, lambda_reg=1.0, 
                    sketch_ranks=[5, 10, 20, 50, 100, 200], noise_std=0.1, seed=42):
    """Compare LinUCB vs SketchedLinUCB at different ranks."""
    np.random.seed(seed)
    true_theta = np.random.randn(d)
    true_theta = true_theta / np.linalg.norm(true_theta)
    action_set = np.random.randn(K, d)
    action_set = action_set / np.linalg.norm(action_set, axis=1, keepdims=True)
    
    results = {}
    
    # Full LinUCB
    print("Full LinUCB...")
    agent = LinUCBAgent(d=d, alpha=alpha, lambda_reg=lambda_reg)
    regret, avg_time = evaluate_agent(agent, action_set, true_theta, T, noise_std, seed)
    results['full'] = {'regret': regret, 'final_regret': regret[-1], 
                       'avg_time': avg_time, 'rank': d}
    print(f"  Regret: {regret[-1]:.2f}, Time: {avg_time*1000:.2f} ms\n")
    
    # SketchedLinUCB
    for m in sketch_ranks:
        print(f"SketchedLinUCB (rank={m})...")
        agent = SketchedLinUCBAgent(d=d, m=m, gamma=alpha, lambda_reg=lambda_reg)
        regret, avg_time = evaluate_agent(agent, action_set, true_theta, T, noise_std, seed)
        results[f'm{m}'] = {'regret': regret, 'final_regret': regret[-1],
                           'avg_time': avg_time, 'rank': m}
        print(f"  Regret: {regret[-1]:.2f}, Time: {avg_time*1000:.2f} ms\n")
    
    return results

In [21]:
# Run comparison: d=1024, K=10 arms, T=1000 steps
results = compare_methods(d=1024, K=10, T=1000, alpha=1.0, lambda_reg=1.0,
                          sketch_ranks=[5, 10, 20, 50, 100, 200], noise_std=0.1, seed=42)

Full LinUCB...
  Regret: 19.19, Time: 12.29 ms

SketchedLinUCB (rank=5)...
  Regret: 8.52, Time: 0.34 ms

SketchedLinUCB (rank=10)...
  Regret: 19.19, Time: 0.51 ms

SketchedLinUCB (rank=20)...
  Regret: 19.19, Time: 0.69 ms

SketchedLinUCB (rank=50)...
  Regret: 19.19, Time: 0.96 ms

SketchedLinUCB (rank=100)...
  Regret: 19.19, Time: 2.07 ms

SketchedLinUCB (rank=200)...
  Regret: 19.19, Time: 5.72 ms



In [24]:


# Extract data
ranks, regrets, latencies = [], [], []
for key in ['full'] + [f'm{m}' for m in [5, 10, 20, 50, 100, 200]]:
    if key in results:
        data = results[key]
        ranks.append(data['rank'])
        regrets.append(data['final_regret'])
        latencies.append(data['avg_time'] * 1000)

# Sort by rank
idx = np.argsort(ranks)
ranks, regrets, latencies = np.array(ranks)[idx], np.array(regrets)[idx], np.array(latencies)[idx]

# Summary table
print("\nSummary:")
print(f"{'Method':<20} {'Rank':<8} {'Regret':<12} {'Latency (ms)':<12}")
print("-" * 52)
for key in ['full'] + [f'm{m}' for m in [5, 10, 20, 50, 100, 200]]:
    if key in results:
        d = results[key]
        name = 'Full LinUCB' if key == 'full' else f'Sketch (m={d["rank"]})'
        print(f"{name:<20} {d['rank']:<8} {d['final_regret']:<12.2f} {d['avg_time']*1000:<12.2f}")


Summary:
Method               Rank     Regret       Latency (ms)
----------------------------------------------------
Full LinUCB          1024     19.19        12.29       
Sketch (m=5)         5        8.52         0.34        
Sketch (m=10)        10       19.19        0.51        
Sketch (m=20)        20       19.19        0.69        
Sketch (m=50)        50       19.19        0.96        
Sketch (m=100)       100      19.19        2.07        
Sketch (m=200)       200      19.19        5.72        
