In [None]:
from IPython.display import clear_output
clear_output()

In [None]:
%%writefile task.py
import torch
import numpy as np
import time

# ------------------------------------------------------------------------------------------------
# Your Task 1.1 code here
# ------------------------------------------------------------------------------------------------

def distance_cosine(X, Y):
    # Normalize vectors for cosine distance calculation
    X_norm = torch.nn.functional.normalize(X, dim=1)
    Y_norm = torch.nn.functional.normalize(Y, dim=1)
    # Calculate cosine similarity
    similarity = torch.matmul(X_norm, Y_norm.T)
    # Clamp values to avoid numerical issues
    similarity = torch.clamp(similarity, min=-1.0, max=1.0)
    # Convert similarity to distance (1 - similarity)
    return 1.0 - similarity

def distance_l2(X, Y):
    # Euclidean distance calculation using matrix operations
    # ||x-y||^2 = ||x||^2 + ||y||^2 - 2x·y
    X_squared = (X ** 2).sum(dim=1, keepdim=True)
    Y_squared = (Y ** 2).sum(dim=1, keepdim=True).T
    XY = torch.matmul(X, Y.T)
    distances = X_squared + Y_squared - 2 * XY
    # Clamp to prevent negative values due to numerical precision
    distances = torch.clamp(distances, min=1e-12)
    return torch.sqrt(distances)

def distance_dot(X, Y):
    # Negative dot product as distance (higher dot product = lower distance)
    return -torch.matmul(X, Y.T)

def distance_manhattan(X, Y):
    # Optimization: Use batch processing for large datasets
    batch_size = 1024
    n_x = X.shape[0]
    n_y = Y.shape[0]

    # For large matrices, use batched computation to save memory
    if n_x * n_y > 10000000 and max(n_x, n_y) > batch_size:
        result = torch.zeros((n_x, n_y), device=X.device)
        for i in range(0, n_x, batch_size):
            end_i = min(i + batch_size, n_x)
            # Process one batch at a time
            result[i:end_i] = torch.cdist(X[i:end_i], Y, p=1)
        return result
    else:
        # For smaller matrices, compute all at once
        return torch.cdist(X, Y, p=1)

# ------------------------------------------------------------------------------------------------
# Your Task 1.2 code here
# ------------------------------------------------------------------------------------------------

def custom_topk(distances, k):
    # Implementation of top-k algorithm for finding k smallest distances
    n_queries = distances.shape[0]
    top_indices = torch.zeros((n_queries, k), dtype=torch.int64, device=distances.device)

    for i in range(n_queries):
        dist_row = distances[i]
        for j in range(k):
            if j == 0:
                # For first neighbor, simply find minimum
                _, min_idx = torch.min(dist_row, dim=0)
            else:
                # For subsequent neighbors, mask previously found indices
                masked = dist_row.clone()
                masked[top_indices[i, :j]] = float('inf')
                _, min_idx = torch.min(masked, dim=0)
            top_indices[i, j] = min_idx

    return top_indices

def our_knn(N, D, A, X, K, metric="L2"):
    # Use GPU if available for acceleration
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Convert inputs to tensors if needed and move to device
    A_is_tensor = torch.is_tensor(A)
    X_is_tensor = torch.is_tensor(X)

    # Optimization: For large datasets, use batching to avoid memory issues
    large_dataset = N > 100000  # Consider datasets with >100k vectors as large

    # Handle input formatting for X (single or multiple queries)
    if X_is_tensor:
        if X.dim() == 1:
            X = X.unsqueeze(0)  # Convert to 2D tensor for single query
        num_queries = X.shape[0]
    else:
        X = np.asarray(X, dtype=np.float32)
        if X.ndim == 1:
            X = X.reshape(1, -1)  # Convert to 2D array for single query
        num_queries = X.shape[0]

    # Initialize result tensor on CPU to save GPU memory
    result_indices = torch.zeros((num_queries, K), dtype=torch.int64, device='cpu')

    # Configure batch sizes for processing
    query_batch_size = min(64, num_queries)  # Process up to 64 queries at once
    db_batch_size = min(10000, N) if large_dataset else N  # Use smaller batches for large datasets

    # Process queries in batches for memory efficiency
    for q_start in range(0, num_queries, query_batch_size):
        q_end = min(q_start + query_batch_size, num_queries)

        # Create query batch tensor on device
        if X_is_tensor:
            X_batch = X[q_start:q_end].to(device)
        else:
            X_batch = torch.tensor(X[q_start:q_end], dtype=torch.float32, device=device)

        if large_dataset:
            # Optimization: For large datasets, process database in batches
            # Initialize top K tracking for each query in batch
            topk_values = torch.full((q_end - q_start, K), float('inf'), device=device)
            topk_indices = torch.zeros((q_end - q_start, K), dtype=torch.int64, device=device)

            # Process database in batches
            for db_start in range(0, N, db_batch_size):
                db_end = min(db_start + db_batch_size, N)

                # Create database batch tensor on device
                if A_is_tensor:
                    A_batch = A[db_start:db_end].to(device)
                else:
                    A_batch = torch.tensor(A[db_start:db_end], dtype=torch.float32, device=device)

                # Calculate distances using the appropriate metric
                if metric.lower() == "l2":
                    batch_dist = distance_l2(X_batch, A_batch)
                elif metric.lower() == "cosine":
                    batch_dist = distance_cosine(X_batch, A_batch)
                elif metric.lower() == "dot":
                    batch_dist = distance_dot(X_batch, A_batch)
                elif metric.lower() == "manhattan":
                    batch_dist = distance_manhattan(X_batch, A_batch)
                else:
                    raise ValueError(f"Unknown metric: {metric}")

                # Update top-K for each query by combining results from each batch
                batch_size = q_end - q_start
                for i in range(batch_size):
                    # Combine current results with new batch results
                    curr_values = topk_values[i]
                    curr_indices = topk_indices[i]

                    # Get current batch distances and indices
                    new_values = batch_dist[i]
                    new_indices = torch.arange(db_start, db_end, device=device)

                    # Combine and get new top-K
                    if db_start > 0:  # Not the first batch
                        all_values = torch.cat([curr_values, new_values])
                        all_indices = torch.cat([curr_indices, new_indices])

                        # Get top-K (smallest for L2, cosine, manhattan; largest for dot)
                        if metric.lower() == "dot":
                            topk = torch.topk(all_values, min(K, len(all_values)), largest=True)
                        else:
                            topk = torch.topk(all_values, min(K, len(all_values)), largest=False)

                        topk_values[i] = topk.values
                        topk_indices[i] = all_indices[topk.indices]
                    else:  # First batch, just take top-K
                        if metric.lower() == "dot":
                            topk = torch.topk(new_values, min(K, len(new_values)), largest=True)
                        else:
                            topk = torch.topk(new_values, min(K, len(new_values)), largest=False)

                        topk_values[i, :len(topk.values)] = topk.values
                        topk_indices[i, :len(topk.indices)] = new_indices[topk.indices]

                # Memory optimization: clear batch data after processing
                del A_batch, batch_dist
                if device.type == 'cuda':
                    torch.cuda.empty_cache()

            # Store results for this query batch
            result_indices[q_start:q_end] = topk_indices.cpu()

        else:
            # For smaller datasets, process entire database at once (more efficient)
            A_device = A.to(device) if A_is_tensor else torch.tensor(A, dtype=torch.float32, device=device)

            # Calculate distances using the appropriate metric
            if metric.lower() == "l2":
                distances = distance_l2(X_batch, A_device)
            elif metric.lower() == "cosine":
                distances = distance_cosine(X_batch, A_device)
            elif metric.lower() == "dot":
                distances = distance_dot(X_batch, A_device)
            elif metric.lower() == "manhattan":
                distances = distance_manhattan(X_batch, A_device)
            else:
                raise ValueError(f"Unknown metric: {metric}")

            # Get top-K indices
            if metric.lower() == "dot":
                _, indices = torch.topk(distances, K, dim=1, largest=True)
            else:
                _, indices = torch.topk(distances, K, dim=1, largest=False)

            # Store results
            result_indices[q_start:q_end] = indices.cpu()

            # Memory optimization: clear data when done with non-tensor input
            if not A_is_tensor:
                del A_device

        # Memory optimization: clear batch data after processing
        del X_batch
        if device.type == 'cuda':
            torch.cuda.empty_cache()

    return result_indices

def benchmark_knn(N, D, K, metric="L2"):
    # Generate random data for benchmarking
    A = np.random.randn(N, D).astype(np.float32)
    X = np.random.randn(100, D).astype(np.float32)

    # Time the KNN query execution
    start = time.time()
    _ = our_knn(N, D, A, X, K, metric)
    return time.time() - start

# ------------------------------------------------------------------------------------------------
# Your Task 2.1 code here
# ------------------------------------------------------------------------------------------------

def distance_kernel(X, Y, metric="L2"):
    # Optimized distance calculation between matrix X and Y
    if metric == "L2":
        X2 = (X ** 2).sum(dim=1, keepdim=True)
        Y2 = (Y ** 2).sum(dim=1, keepdim=True).T
        return torch.sqrt(torch.clamp(X2 + Y2 - 2 * X @ Y.T, min=0.0))
    elif metric == "cosine":
        # Normalize vectors and compute cosine distance
        X_norm = X / (X.norm(dim=1, keepdim=True) + 1e-8)  # Add epsilon to avoid division by zero
        Y_norm = Y / (Y.norm(dim=1, keepdim=True) + 1e-8)
        return 1 - torch.mm(X_norm, Y_norm.T)
    else:
        raise ValueError("metric must be 'L2' or 'cosine'")

def our_kmeans(N, D, A, K, metric="L2", max_iters=100, tol=1e-4, use_gpu=True):
    # Select device based on availability and user preference
    device = "cuda" if use_gpu and torch.cuda.is_available() else "cpu"
    print(f"Running K-Means on PyTorch ({device.upper()} backend), metric={metric}")

    # Convert input to tensor and move to device
    A_tensor = torch.tensor(A, dtype=torch.float32).contiguous()
    if use_gpu and A_tensor.device.type == "cpu":
        # Use pin_memory for faster host-to-device transfer
        A_tensor = A_tensor.pin_memory().to(device, non_blocking=True)
    else:
        A_tensor = A_tensor.to(device)

    # Initialize centroids randomly from data points
    # centroids = A_tensor[torch.randperm(N)[:K]].clone()
    centroids = A_tensor[torch.randperm(N, device=device)[:K]].clone()

    # Main K-means loop
    for i in range(max_iters):
        # Calculate distances from each point to each centroid
        distances = distance_kernel(A_tensor, centroids, metric=metric)

        # Assign each point to nearest centroid
        labels = torch.argmin(distances, dim=1)

        # Update centroids as mean of assigned points
        new_centroids = torch.zeros_like(centroids, device=device)
        ones = torch.ones_like(labels, dtype=torch.float32, device=device)

        # Count points per cluster
        counts = torch.zeros(K, dtype=torch.float32, device=device).scatter_add_(0, labels, ones)

        # Sum points per cluster
        new_centroids.scatter_add_(0, labels.unsqueeze(1).expand(-1, D), A_tensor)

        # Divide by count to get mean (avoid division by zero with clamp_min)
        new_centroids /= counts.unsqueeze(1).clamp_min(1)

        # Check for convergence
        if torch.norm(new_centroids - centroids, p=2, dim=1).max().item() < tol:
            print(f"K-Means converged at iteration {i+1}")
            break

        centroids = new_centroids

    return labels.cpu()



def our_kmeans_modified(N, D, A, num_clusters, metric="L2", max_iters=100, tol=1e-4, use_gpu=True):
    """
    KMeans clustering with GPU/CPU support.

    Parameters:
      N: Number of data points.
      D: Data dimension.
      A: Numpy array of shape (N, D).
      num_clusters: Number of clusters to form.
      metric: "L2" or "cosine" used to compute distances.
      max_iters: Maximum iterations.
      tol: Convergence threshold.
      use_gpu: If True, use CUDA if available.

    Returns:
      labels: Tensor of shape (N,) with cluster assignments.
      centroids: Tensor of shape (num_clusters, D) representing cluster centers.
    """
    device = "cuda" if use_gpu and torch.cuda.is_available() else "cpu"

    # Check if A is already a torch tensor and move to correct device
    if torch.is_tensor(A):
        A_tensor = A
        if A_tensor.device.type != device:
            A_tensor = A_tensor.to(device)
    else:
        A_tensor = torch.tensor(A, dtype=torch.float32).contiguous()
        if use_gpu and torch.cuda.is_available():
            A_tensor = A_tensor.to(device)

    # Initialize centroids using k-means++ initialization for better clustering quality
    centroids = torch.zeros((num_clusters, D), dtype=torch.float32, device=device)

    # Choose first centroid randomly
    # first_idx = torch.randint(0, N, (1,)).item()
    first_idx = torch.randint(0, N, (1,), device=device).item()
    centroids[0] = A_tensor[first_idx].clone()

    # Choose remaining centroids using k-means++ algorithm (weighted sampling based on distance)
    for i in range(1, num_clusters):
        # Calculate distances to closest existing centroid
        min_dists = torch.min(distance_kernel(A_tensor, centroids[:i], metric=metric), dim=1)[0]

        # Square distances and normalize to create a probability distribution
        # Points farther from existing centroids have higher probability
        probs = min_dists ** 2
        probs /= probs.sum()

        # Sample next centroid based on probability distribution
        next_idx = torch.multinomial(probs, 1).item()
        centroids[i] = A_tensor[next_idx].clone()

    # Main K-means loop
    for i in range(max_iters):
        distances = distance_kernel(A_tensor, centroids, metric=metric)  # (N, num_clusters)
        labels = torch.argmin(distances, dim=1)  # (N,)

        # Update centroids
        new_centroids = torch.zeros_like(centroids, device=device)
        ones = torch.ones_like(labels, dtype=torch.float32, device=device)
        counts = torch.zeros(num_clusters, dtype=torch.float32, device=device)
        counts = counts.scatter_add_(0, labels, ones)
        new_centroids = new_centroids.scatter_add_(0, labels.unsqueeze(1).expand(-1, D), A_tensor)
        counts = counts.unsqueeze(1).clamp_min(1)  # Prevent division by zero
        new_centroids /= counts

        # Check for convergence
        shift = torch.norm(new_centroids - centroids, p=2, dim=1).max().item()
        centroids = new_centroids
        if shift < tol:
            print(f"K-Means converged at iteration {i + 1}")
            break

    return labels.cpu(), centroids.cpu()


# ------------------------------------------------------------------------------------------------
# Your Task 2.2 code here
# ------------------------------------------------------------------------------------------------

# Optimized Approximate Nearest Neighbors Implementation with Recall Guarantees

def calculate_clusters_for_recall(K, desired_recall=0.7):
    """
    Calculate how many clusters need to be searched to achieve the desired recall rate.
    Uses the formula L ≥ (K-1)/(1-r) where:
    - K is the number of nearest neighbors we want
    - r is the desired recall rate (0.0 to 1.0)
    - L is the number of clusters to search

    Returns: Number of clusters to search (L)
    """
    if desired_recall >= 1.0:
        # If perfect recall is desired, we'd need to search all clusters
        return float('inf')

    # Mathematical formula to guarantee recall rate
    L = (K - 1) / (1 - desired_recall)
    return max(int(np.ceil(L)), 1)  # Ensure at least 1 cluster is searched

def our_ann_improved(N, D, A, X, K, num_clusters=None, desired_recall=0.7, metric="L2", max_iters=100, tol=1e-4, use_gpu=True):
    """
    Optimized Approximate Nearest Neighbor (ANN) algorithm with mathematical recall guarantees.

    Parameters:
      N: Number of vectors.
      D: Dimension of each vector.
      A: Numpy array of shape (N, D).
      X: Query vector (e.g. numpy array of shape (D,)).
      K: Number of nearest neighbors to return.
      num_clusters: Number of clusters for KMeans (auto-calculated if None).
      desired_recall: Target recall rate (0.0 to 1.0) for the algorithm.
      metric: "L2" or "cosine" distance metric.
      max_iters, tol, use_gpu: Parameters for KMeans.

    Returns:
      A list of indices (of length K) indicating the top K nearest neighbors in A.
    """
    # Set device first and be consistent
    device = "cuda" if use_gpu and torch.cuda.is_available() else "cpu"

    # Auto-tune parameters based on dataset size and desired recall
    if num_clusters is None:
        # Calculate optimal number of clusters based on data size
        # Optimization: use sqrt(N) as a heuristic but cap min/max values
        num_clusters = min(max(int(np.sqrt(N)), 10), 200)

    # Calculate K1 (number of clusters to search) based on recall guarantee formula
    K1 = calculate_clusters_for_recall(K, desired_recall)
    K1 = min(K1, num_clusters)  # Cannot search more clusters than exist

    # Calculate K2 (candidates per cluster) based on dimensionality scaling
    # Optimization: Higher dimensions need more candidates due to curse of dimensionality
    dim_factor = max(1.0, np.log10(D) / 2)
    K2 = int(K * max(5, dim_factor))
    K2 = min(K2, 200)  # Cap to avoid excessive computation

    print(f"ANN parameters (recall guarantee): num_clusters={num_clusters}, K1={K1}, K2={K2}, desired_recall={desired_recall}")

    # Step 1: Convert inputs to tensors and move to the correct device
    # Convert A to tensor if it's not already
    if torch.is_tensor(A):
        A_tensor = A.to(device)
    else:
        A_tensor = torch.tensor(A, dtype=torch.float32, device=device)

    # Convert X to tensor if it's not already
    if torch.is_tensor(X):
        X_tensor = X.to(device)
    else:
        X_tensor = torch.tensor(X, dtype=torch.float32, device=device)

    # Ensure X has the right shape
    if X_tensor.dim() == 1:
        X_tensor = X_tensor.unsqueeze(0)  # shape (1, D)

    # Step 2: Cluster A into num_clusters clusters using KMeans
    labels, centroids = our_kmeans_modified(N, D, A_tensor, num_clusters,
                                        metric=metric, max_iters=max_iters, tol=tol,
                                        use_gpu=use_gpu)

    # Convert labels and centroids to tensors on the correct device
    if not torch.is_tensor(labels):
        labels = torch.tensor(labels, dtype=torch.long, device=device)
    else:
        labels = labels.to(device)

    if not torch.is_tensor(centroids):
        centroids = torch.tensor(centroids, dtype=torch.float32, device=device)
    else:
        centroids = centroids.to(device)

    # Step 3: Calculate cluster statistics for density-aware search
    cluster_sizes = torch.zeros(num_clusters, dtype=torch.float32, device=device)
    for c in range(num_clusters):
        cluster_sizes[c] = torch.sum(labels == c).float()

    # Compute density-based weights for each cluster
    # Optimization: Consider cluster density in priority calculation
    total_points = torch.sum(cluster_sizes).float()
    cluster_weights = cluster_sizes / total_points

    # Step 4: Calculate distance from query to each centroid
    if metric == "L2":
        centroid_dist = torch.cdist(X_tensor, centroids, p=2).squeeze(0)
    elif metric == "cosine":
        X_norm = X_tensor / (torch.norm(X_tensor, dim=1, keepdim=True) + 1e-8)
        centroids_norm = centroids / (torch.norm(centroids, dim=1, keepdim=True) + 1e-8)
        centroid_dist = 1 - torch.matmul(X_norm, centroids_norm.t()).squeeze(0)
    else:
        raise ValueError(f"Unsupported metric: {metric}")

    # Step 5: Create a priority score for each cluster combining distance and density
    # Optimization: Adaptive radius factor based on dimensionality
    dim_radius_factor = 1.0 + (D / 1000)

    # Softmax-normalize distances to get probability distribution
    exp_neg_dist = torch.exp(-centroid_dist / dim_radius_factor)
    softmax_probs = exp_neg_dist / torch.sum(exp_neg_dist)

    # Combine distance probability with density weight
    # Optimization: Balance between distance and density with alpha parameter
    alpha = 0.7  # Weight for distance vs density
    cluster_scores = alpha * softmax_probs + (1 - alpha) * cluster_weights

    # Get top K1 clusters based on combined score
    _, ranked_clusters = torch.topk(cluster_scores, min(K1, num_clusters))
    selected_clusters = ranked_clusters.cpu().numpy().tolist()

    # Step 6: Process selected clusters to find candidate neighbors
    unique_candidate_indices = set()
    candidate_with_dist = []

    for c in selected_clusters:
        # Get indices of all points in this cluster
        cluster_indices = torch.nonzero(labels == c, as_tuple=True)[0]

        if cluster_indices.numel() == 0:
            continue

        # Calculate distances between query and points in this cluster
        cluster_points = A_tensor[cluster_indices]

        if metric == "L2":
            dists = torch.cdist(X_tensor, cluster_points, p=2).squeeze(0)
        elif metric == "cosine":
            X_norm = X_tensor / (torch.norm(X_tensor, dim=1, keepdim=True) + 1e-8)
            cluster_norm = cluster_points / (torch.norm(cluster_points, dim=1, keepdim=True) + 1e-8)
            dists = 1 - torch.matmul(X_norm, cluster_norm.t()).squeeze(0)

        # Get top K2 candidates from this cluster
        topk = min(K2, cluster_points.size(0))
        _, cluster_topk = torch.topk(-dists, topk, largest=True)

        # Add candidates with their distances to our list
        for i in range(len(cluster_topk)):
            idx = cluster_indices[cluster_topk[i]].item()
            dist = dists[cluster_topk[i]].item()
            if idx not in unique_candidate_indices:
                unique_candidate_indices.add(idx)
                candidate_with_dist.append((dist, idx))

    # Step 7: Final candidate selection
    if len(candidate_with_dist) < K:
        # Not enough candidates, perform exact search
        # Fallback strategy to guarantee results even with insufficient candidates
        print("Warning: Not enough candidates found in selected clusters, performing exact search")

        if metric == "L2":
            dists = torch.cdist(X_tensor, A_tensor, p=2).squeeze(0)
        elif metric == "cosine":
            X_norm = X_tensor / (torch.norm(X_tensor, dim=1, keepdim=True) + 1e-8)
            A_norm = A_tensor / (torch.norm(A_tensor, dim=1, keepdim=True) + 1e-8)
            dists = 1 - torch.matmul(X_norm, A_norm.t()).squeeze(0)

        _, indices = torch.topk(-dists, min(K, N), largest=True)
        return indices.cpu().numpy().tolist()

    # Sort candidates by distance and return top K
    candidate_with_dist.sort()  # Sort by distance (first element of tuple)
    top_candidates = [idx for _, idx in candidate_with_dist[:K]]

    return top_candidates

# Main Benchmark Runner
if __name__ == "__main__":
    print("Speed benchmark for D=2:")
    print(f"Time for 10,000 vectors with D=2: {benchmark_knn(10000, 2, 10):.4f} seconds")
    print("\nSpeed benchmark for D=2^15:")
    print(f"Time for 1,000 vectors with D=2^15: {benchmark_knn(1000, 2**15, 10):.4f} seconds")
    print("\nProcessing 4,000 vectors:")
    print(f"Time for 4,000 vectors: {benchmark_knn(4000, 128, 10):.4f} seconds")
    print("\nProcessing simulation for 4,000,000 vectors:")
    sample_time = benchmark_knn(40000, 128, 10)
    print(f"Estimated time for 4,000,000 vectors: {sample_time * (4000000 / 40000):.2f} seconds")

Overwriting task.py


In [None]:
%%writefile test.py
import sys
import time
import numpy as np
import torch
from task import our_knn, our_kmeans, our_ann_improved

def generate_data(size, for_knn=False):
    """
    Generate synthetic data for testing KNN or K-means algorithms
    Parameters:
        size: String indicating dataset size ("small", "medium", "large", etc.)
        for_knn: Boolean indicating if data is for KNN (includes query vectors)
    Returns:
        N, D, A, [X], K: Dataset parameters, data matrix, [query vectors], number of neighbors/clusters
    """
    if size == "small":
        N, D, K = 100, 2, 5      # Small dataset: 100 vectors, 2 dimensions, 5 neighbors/clusters
        M = 10                   # 10 query vectors
    elif size == "medium":
        N, D, K = 4000, 128, 10  # Medium: 4,000 vectors, 128 dimensions
        M = 100
    elif size == "large":
        N, D, K = 10000, 1024, 20  # Large: 10,000 vectors, 1024 dimensions
        M = 100
    elif size == "large_dim2":
        N, D, K = 10000, 2, 10   # Testing with very low dimension but large number of vectors
        M = 100
    elif size == "large_dim32k":
        N, D, K = 1000, 2**15, 10  # Testing with very high dimension (32,768)
        M = 10
    elif size == "huge":
        # Simulate large dataset performance without using full 4M vectors
        N, D, K = 40000, 128, 10  # Use 40,000 as representative for 4,000,000
        M = 100
        print("Note: Using 40,000 vectors to extrapolate performance for 4,000,000 vectors")

    # Generate random data with normal distribution (float32 for GPU compatibility)
    A = np.random.randn(N, D).astype(np.float32)

    if for_knn:
        # For KNN we need query vectors
        X = np.random.randn(M, D).astype(np.float32)
        return N, D, A, X, K
    else:
        # For K-means we only need the data matrix
        return N, D, A, K


def run_knn(N, D, A, X, K, metric, use_gpu, label):
    """
    Run and benchmark the KNN implementation
    """
    print(f"\n--- Running KNN with {metric.upper()} on {label.upper()} ({N} db, {X.shape[0]} queries, {D} dimensions) ---")

    # Ensure GPU environment is properly set before running KNN
    if use_gpu:
        if torch.cuda.is_available():
            # Synchronize and clear cache for consistent benchmarking
            torch.cuda.synchronize()
            torch.cuda.empty_cache()
        else:
            print("Warning: GPU requested but not available. Falling back to CPU.")

    # Measure execution time
    start = time.time()
    indices = our_knn(N, D, A, X, K, metric=metric)

    # Ensure all GPU operations complete before stopping timer
    if use_gpu and torch.cuda.is_available():
        torch.cuda.synchronize()

    end = time.time()
    print(f"First 3 query results:\n{indices[:3]}")
    print(f"Time: {end - start:.4f} sec")

def compare_cpu_gpu(size, metric="L2"):
    """
    Directly compare CPU and GPU performance on the same dataset
    """
    print(f"\n======= Comparing CPU vs GPU on {size.upper()} dataset with {metric} =======")

    # Generate data for KNN test
    N, D, A, X, K = generate_data(size, for_knn=True)

    # Run on CPU first
    torch.set_default_tensor_type('torch.FloatTensor')
    cpu_start = time.time()
    our_knn(N, D, A, X, K, metric=metric)
    cpu_end = time.time()
    cpu_time = cpu_end - cpu_start

    # Run on GPU if available
    if torch.cuda.is_available():
        # Set default tensor type to CUDA for GPU operations
        torch.set_default_tensor_type('torch.cuda.FloatTensor')
        torch.cuda.synchronize()  # Ensure GPU is synchronized before timing
        gpu_start = time.time()
        our_knn(N, D, A, X, K, metric=metric)
        torch.cuda.synchronize()  # Ensure all GPU operations complete
        gpu_end = time.time()
        gpu_time = gpu_end - gpu_start

        # Calculate and display speedup
        speedup = cpu_time / gpu_time
        print(f"CPU time: {cpu_time:.4f} sec")
        print(f"GPU time: {gpu_time:.4f} sec")
        print(f"GPU speedup: {speedup:.2f}x")
    else:
        print("GPU not available, skipping comparison")

def extrapolate_large_dataset():
    """
    Estimate performance for very large datasets by extrapolating from smaller samples
    """
    print("\n======= Extrapolating performance for 4,000,000 vectors =======")

    # Test with 40,000 vectors to estimate performance for 4M
    N, D, A, X, K = generate_data("huge", for_knn=True)

    if torch.cuda.is_available():
        # Use GPU for extrapolation
        torch.set_default_tensor_type('torch.cuda.FloatTensor')
        torch.cuda.synchronize()
        start = time.time()
        our_knn(N, D, A, X, K, metric="L2")
        torch.cuda.synchronize()
        end = time.time()
        measured_time = end - start

        # Linear extrapolation - assuming O(n) scaling which is optimistic
        # Actual scaling may be worse depending on algorithm implementation
        estimated_time = measured_time * (4000000 / N)
        print(f"Time for {N} vectors: {measured_time:.4f} sec")
        print(f"Estimated time for 4,000,000 vectors: {estimated_time:.2f} sec")
        print(f"Optimizations needed for 4M vectors: batching, memory management, results streaming")
    else:
        print("GPU not available, skipping extrapolation")

def compare_all():
    import traceback

    print("\n======= [KNN] Comparing CPU vs GPU =======")
    knn_metrics = ["L2", "cosine", "dot", "manhattan"]
    knn_datasets = ["small", "medium", "large", "large_dim2", "large_dim32k", "huge"]
    for size in knn_datasets:
        for metric in knn_metrics:
            try:
                print(f"\n--- [KNN] Comparing on {size.upper()} with {metric.upper()} ---")
                compare_cpu_gpu(size, metric)
            except Exception as e:
                print(f"[KNN COMPARISON] Skipped {size.upper()} with {metric.upper()} due to error: {e}")
                traceback.print_exc()

    print("\n======= [KMeans] Comparing CPU vs GPU =======")
    kmeans_metrics = ["L2", "cosine"]
    kmeans_datasets = ["small", "large"]
    for size in kmeans_datasets:
        for metric in kmeans_metrics:
            try:
                print(f"\n--- [KMEANS] Running on {size.upper()} with {metric.upper()} ---")
                N, D, A, K = generate_data(size)

                # CPU run
                cpu_start = time.time()
                our_kmeans(N, D, A, K, metric=metric, use_gpu=False)
                cpu_time = time.time() - cpu_start

                # GPU run
                if torch.cuda.is_available():
                    torch.cuda.synchronize()
                    gpu_start = time.time()
                    our_kmeans(N, D, A, K, metric=metric, use_gpu=True)
                    torch.cuda.synchronize()
                    gpu_time = time.time() - gpu_start
                    print(f"CPU time: {cpu_time:.4f} sec")
                    print(f"GPU time: {gpu_time:.4f} sec")
                    print(f"GPU speedup: {cpu_time / gpu_time:.2f}x")
                else:
                    print("GPU not available, skipped GPU comparison")

            except Exception as e:
                print(f"[KMEANS COMPARISON] Skipped {size.upper()} with {metric.upper()} due to error: {e}")
                traceback.print_exc()

    print("\n======= [ANN] Comparing CPU vs GPU =======")
    ann_metrics = ["L2", "cosine"]
    ann_datasets = ["small", "large"]
    for size in ann_datasets:
        for metric in ann_metrics:
            try:
                print(f"\n--- [ANN] Running on {size.upper()} with {metric.upper()} ---")
                N, D, A, X, K = generate_data(size, for_knn=True)

                # CPU run
                cpu_start = time.time()
                our_ann_improved(N, D, A, X[0], K, metric=metric, use_gpu=False)
                cpu_time = time.time() - cpu_start

                # GPU run
                if torch.cuda.is_available():
                    torch.cuda.synchronize()
                    gpu_start = time.time()
                    our_ann_improved(N, D, A, X[0], K, metric=metric, use_gpu=True)
                    torch.cuda.synchronize()
                    gpu_time = time.time() - gpu_start
                    print(f"CPU time: {cpu_time:.4f} sec")
                    print(f"GPU time: {gpu_time:.4f} sec")
                    print(f"GPU speedup: {cpu_time / gpu_time:.2f}x")
                else:
                    print("GPU not available, skipped GPU comparison")

            except Exception as e:
                print(f"[ANN COMPARISON] Skipped {size.upper()} with {metric.upper()} due to error: {e}")
                traceback.print_exc()

    print("\n======= Extrapolating for HUGE dataset =======")
    extrapolate_large_dataset()


def run_kmeans(N, D, A, K, metric, use_gpu, label):
    """
    Run and benchmark the K-means implementation
    """
    print(f"\n--- Running metric: {metric} on {label.upper()} ({N} samples) ---")
    start = time.time()
    labels = our_kmeans(N, D, A, K, metric=metric, use_gpu=use_gpu)
    end = time.time()
    print(f"First 10 labels: {labels[:10].tolist()}")
    print(f"Time on {label.upper()} ({metric}): {end - start:.4f} sec")

def run_ann_with_guarantees(N, D, A, X, K, metric, use_gpu, label):
    """
    Run and benchmark the improved ANN implementation with recall guarantees

    This is an optimization over traditional ANN by providing mathematical
    guarantees on recall rates, ensuring a minimum quality of results.
    """
    from task import our_ann_improved, our_knn
    import numpy as np
    import time
    import torch

    print(f"\n--- Running ANN (with recall guarantees) using {metric.upper()} on {label.upper()} ---")
    print(f"Dataset: {N} vectors, {D} dimensions, {X.shape[0]} queries, K={K}")

    # Set the appropriate device (CPU/GPU)
    device = "cuda" if use_gpu and torch.cuda.is_available() else "cpu"

    # Optimization: Convert data to tensors and move to device once
    # This avoids repeated data transfers during processing
    if torch.is_tensor(A):
        A_tensor = A.to(device)
    else:
        A_tensor = torch.tensor(A, dtype=torch.float32, device=device)

    # Test different target recall rates (percentage of true nearest neighbors found)
    recall_targets = [0.7]  # Can be expanded to test multiple target recalls
    num_test_queries = min(10, X.shape[0])

    for target_recall in recall_targets:
        print(f"\n=== Testing with target recall: {target_recall:.2f} ===")

        total_actual_recall = 0.0
        total_ann_time = 0.0
        total_knn_time = 0.0

        # Optimization: Determine optimal number of clusters based on dataset size
        num_clusters = min(max(int(np.sqrt(N)), 10), 200)

        # Test multiple queries to get reliable statistics
        for i in range(num_test_queries):
            # Extract single query and ensure it's properly formatted
            if X.ndim > 1:
                query = X[i].reshape(1, -1)
            else:
                query = X.reshape(1, -1)

            # Optimization: Move query to the correct device
            if torch.is_tensor(query):
                query_tensor = query.to(device)
            else:
                query_tensor = torch.tensor(query, dtype=torch.float32, device=device)

            # Run improved ANN with recall guarantee
            start = time.time()
            ann_indices = our_ann_improved(N, D, A_tensor, query_tensor[0], K,
                                         num_clusters=num_clusters,
                                         desired_recall=target_recall,
                                         metric=metric, use_gpu=use_gpu)

            # Ensure timing captures all GPU operations
            if device == "cuda":
                torch.cuda.synchronize()

            ann_time = time.time() - start
            total_ann_time += ann_time

            # Run exact KNN for comparison (ground truth)
            start = time.time()
            knn_indices = our_knn(N, D, A_tensor, query_tensor, K, metric=metric)

            if device == "cuda":
                torch.cuda.synchronize()

            knn_time = time.time() - start
            total_knn_time += knn_time

            # Calculate actual recall by comparing ANN results with exact KNN results
            if torch.is_tensor(knn_indices):
                knn_set = set(knn_indices[0].cpu().tolist())
            else:
                knn_set = set(knn_indices[0].tolist())

            ann_set = set(ann_indices)
            correct = len(ann_set.intersection(knn_set))
            actual_recall = correct / K if K > 0 else 0
            total_actual_recall += actual_recall

            print(f"Query {i+1}: Recall={actual_recall:.2f} ({correct}/{K}), "
                  f"ANN: {ann_time:.4f}s, KNN: {knn_time:.4f}s")

        # Calculate and report average statistics
        avg_recall = total_actual_recall / num_test_queries
        avg_ann_time = total_ann_time / num_test_queries
        avg_knn_time = total_knn_time / num_test_queries
        avg_speedup = avg_knn_time / avg_ann_time if avg_ann_time > 0 else 0

        print(f"\nTarget recall: {target_recall:.2f}")
        print(f"Actual average recall: {avg_recall:.2f}")
        print(f"Average times - ANN: {avg_ann_time:.4f}s, KNN: {avg_knn_time:.4f}s")
        print(f"Average speedup: {avg_speedup:.2f}x")

        # Verify if recall guarantee was met
        if avg_recall >= target_recall:
            print(f"✓ SUCCESS: Average recall ({avg_recall:.2f}) >= Target recall ({target_recall:.2f})")
        else:
            print(f"✗ FAILED: Average recall ({avg_recall:.2f}) < Target recall ({target_recall:.2f})")

def test_ann_with_guarantees(use_gpu):
    """
    Run comprehensive tests for the improved ANN implementation across different datasets
    """
    import torch

    print("\n======= Testing ANN with Mathematical Recall Guarantees =======")

    label = "gpu" if use_gpu else "cpu"

    # Set up device environment
    device = "cuda" if use_gpu and torch.cuda.is_available() else "cpu"
    if device == "cuda":
        torch.cuda.empty_cache()  # Clear GPU memory before tests

    # Test on multiple dataset sizes
    for size in ["small", "large"]:
        print(f"\n--- ANN Test on {size.upper()} dataset ---")
        N, D, A, X, K = generate_data(size, for_knn=True)

        # Optimization: Pre-convert data to tensors and move to device once
        # This avoids repeated data transfers during benchmarking
        A_tensor = torch.tensor(A, dtype=torch.float32, device=device)
        X_tensor = torch.tensor(X, dtype=torch.float32, device=device)

        # Test different distance metrics
        for metric in ["L2", "cosine"]:
            try:
                run_ann_with_guarantees(N, D, A_tensor, X_tensor, K, metric, use_gpu, label)
            except Exception as e:
                print(f"[ANN] Error with {metric.upper()} on {size} dataset: {e}")
                import traceback
                traceback.print_exc()

def main():
    """
    Main function to run tests based on command line argument
    """
    if len(sys.argv) != 2 or sys.argv[1] not in ["gpu", "cpu", "comparison", "all"]:
        print("Usage: python test.py [gpu|cpu|comparison|all]")
        return

    mode = sys.argv[1]

    if mode == "gpu" or mode == "all":
        # Run tests on GPU
        use_gpu = True
        label = "gpu"

        # Set up GPU environment if available
        if torch.cuda.is_available():
            device = torch.device("cuda")
            print(f"Using GPU: {torch.cuda.get_device_name(0)}")
            torch.set_default_tensor_type('torch.cuda.FloatTensor')  # Default to CUDA tensors
        else:
            device = torch.device("cpu")
            print("Warning: GPU requested but not available. Using CPU.")
            torch.set_default_tensor_type('torch.FloatTensor')

        metrics = ["L2", "cosine", "dot", "manhattan"]

        # Test KNN on different dataset sizes
        for size in ["medium", "large_dim2", "large_dim32k"]:
            print(f"\n======= Testing on {size.upper()} dataset =======")

            # Generate data for KNN
            N, D, A, X, K = generate_data(size, for_knn=True)
            for metric in metrics:
                try:
                    run_knn(N, D, A, X, K, metric, use_gpu, label)
                except Exception as e:
                    print(f"[KNN] Skipped {metric.upper()} due to error: {e}")

        # Test K-Means on GPU
        for size in ["small", "large"]:
            print(f"\n======= [KMeans] Testing on {size.upper()} dataset =======")
            N, D, A, K = generate_data(size)
            for metric in ["L2", "cosine"]:
                run_kmeans(N, D, A, K, metric, use_gpu, label)

        # Test ANN with recall guarantees on GPU
        test_ann_with_guarantees(use_gpu=True)

    if mode == "cpu" or mode == "all":
        # Run tests on CPU
        use_gpu = False
        label = "cpu"

        device = torch.device("cpu")
        print("Using CPU")
        torch.set_default_tensor_type('torch.FloatTensor')

        metrics = ["L2", "cosine", "dot", "manhattan"]

        # Test KNN on different dataset sizes
        for size in ["medium", "large_dim2", "large_dim32k"]:
            print(f"\n======= Testing on {size.upper()} dataset =======")

            # Generate data for KNN
            N, D, A, X, K = generate_data(size, for_knn=True)
            for metric in metrics:
                try:
                    run_knn(N, D, A, X, K, metric, use_gpu, label)
                except Exception as e:
                    print(f"[KNN] Skipped {metric.upper()} due to error: {e}")

        # Test K-Means on CPU
        for size in ["small", "large"]:
            print(f"\n======= [KMeans] Testing on {size.upper()} dataset =======")
            N, D, A, K = generate_data(size)
            for metric in ["L2", "cosine"]:
                run_kmeans(N, D, A, K, metric, use_gpu, label)

        # Test ANN with recall guarantees on CPU
        test_ann_with_guarantees(use_gpu=False)

    if mode == "comparison" or mode == "all":
      compare_all()
      extrapolate_large_dataset()

if __name__ == "__main__":
    main()

Writing test.py


In [None]:
!python test.py gpu

Using GPU: Tesla T4
  _C._set_default_tensor_type(t)


--- Running KNN with L2 on GPU (4000 db, 100 queries, 128 dimensions) ---
First 3 query results:
tensor([[  64, 3245, 1971, 2517, 3169,  640, 2389,  884,  969,  813],
        [1312, 1650, 2245, 2083, 2501, 1338, 3287,  679, 2753,   62],
        [2159, 2777, 1731, 2541, 2974, 3271, 1274,  474, 1098,  601]],
       device='cpu')
Time: 0.1571 sec

--- Running KNN with COSINE on GPU (4000 db, 100 queries, 128 dimensions) ---
First 3 query results:
tensor([[  64, 1971, 3245,  884, 1255, 3834,  783, 2389, 1299, 1642],
        [1650, 3287,  282, 3732, 1312, 1091, 2821, 2738, 1540, 3643],
        [2777, 2159, 1098, 3271, 3691, 2541, 3722, 3924, 1291,  482]],
       device='cpu')
Time: 0.0349 sec

--- Running KNN with DOT on GPU (4000 db, 100 queries, 128 dimensions) ---
First 3 query results:
tensor([[1507, 1209, 3173, 1683,  878, 1702, 2865, 3035, 3614, 2596],
        [3879, 3469,   50, 2535, 3980, 2582, 3780, 3385,  408, 1029],
        [

In [None]:
!python test.py cpu

Using CPU
  _C._set_default_tensor_type(t)


--- Running KNN with L2 on CPU (4000 db, 100 queries, 128 dimensions) ---
First 3 query results:
tensor([[2419, 1893, 1429,  718, 3803, 3614, 2734, 1142, 2692,   24],
        [3698, 3513,  139, 1693, 2378, 2253,   67, 2342, 1369, 1229],
        [1044, 3106,  979, 2440, 2540, 2859,  579, 3587,   74, 2721]])
Time: 0.2562 sec

--- Running KNN with COSINE on CPU (4000 db, 100 queries, 128 dimensions) ---
First 3 query results:
tensor([[1142, 1893, 2680, 2091, 2419,   24,  317, 2692,  668, 3069],
        [3698, 3513,  127, 1369, 2378, 3813, 2906, 2475,  660, 2253],
        [1044, 3106, 2859, 3506,  734,  177,  579,  979, 2670, 2440]])
Time: 0.0339 sec

--- Running KNN with DOT on CPU (4000 db, 100 queries, 128 dimensions) ---
First 3 query results:
tensor([[1558, 3019, 3197,  463, 2490, 1198,  180,  177, 3816, 3237],
        [3582,  965, 1616,  192,  903, 2418, 2111, 3339,  448, 1104],
        [ 193,  760, 3204, 1604, 1106, 3143, 2353, 1256,  804

In [None]:
!python test.py comparison



--- [KNN] Comparing on SMALL with L2 ---

  _C._set_default_tensor_type(t)
CPU time: 0.2507 sec
GPU time: 0.0007 sec
GPU speedup: 369.83x

--- [KNN] Comparing on SMALL with COSINE ---

CPU time: 0.0308 sec
GPU time: 0.0005 sec
GPU speedup: 60.45x

--- [KNN] Comparing on SMALL with DOT ---

CPU time: 0.0105 sec
GPU time: 0.0003 sec
GPU speedup: 35.98x

--- [KNN] Comparing on SMALL with MANHATTAN ---

CPU time: 0.0018 sec
GPU time: 0.0003 sec
GPU speedup: 6.63x

--- [KNN] Comparing on MEDIUM with L2 ---

CPU time: 0.0080 sec
GPU time: 0.0029 sec
GPU speedup: 2.76x

--- [KNN] Comparing on MEDIUM with COSINE ---

CPU time: 0.0029 sec
GPU time: 0.0058 sec
GPU speedup: 0.50x

--- [KNN] Comparing on MEDIUM with DOT ---

CPU time: 0.0023 sec
GPU time: 0.0024 sec
GPU speedup: 0.98x

--- [KNN] Comparing on MEDIUM with MANHATTAN ---

CPU time: 0.0095 sec
GPU time: 0.0093 sec
GPU speedup: 1.02x

--- [KNN] Comparing on LARGE with L2 ---

CPU time: 0.0294 sec
GPU time: 0.0290 sec
GPU speedup: 1.01

In [None]:
!python test.py all

Using GPU: Tesla T4
  _C._set_default_tensor_type(t)


--- Running KNN with L2 on GPU (4000 db, 100 queries, 128 dimensions) ---
First 3 query results:
tensor([[2226, 1682,   91, 3448, 3058, 1175, 1690, 1583,  883,  535],
        [3051,  413, 3761, 2909, 3406, 1224, 2843, 2698, 2386, 2938],
        [2698, 3527, 1284, 1550, 2591, 1922,  985, 3651, 1060, 2741]],
       device='cpu')
Time: 0.1446 sec

--- Running KNN with COSINE on GPU (4000 db, 100 queries, 128 dimensions) ---
First 3 query results:
tensor([[2213, 2226, 3427, 2654, 2113, 1052, 1852, 1583,  468,    6],
        [3835,  503,  413, 3051, 1285,  551, 3927, 1224, 3773,  458],
        [1550, 1284, 1554, 2688, 1922, 2542, 2788, 3527,  911,  859]],
       device='cpu')
Time: 0.0347 sec

--- Running KNN with DOT on GPU (4000 db, 100 queries, 128 dimensions) ---
First 3 query results:
tensor([[ 705, 1197, 3509, 2405, 2630, 3674,  904, 2496, 1117, 3718],
        [  42,  494,  537,  568, 1326, 2780, 2559, 1139, 2891, 2201],
        [