In [2]:
import os
import json
import numpy as np
import cupy as cp
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def load_embeddings_from_jsonl(filename):
    vectors = []
    with open(filename, 'r', encoding='utf-8') as f:
        for line in f:
            obj = json.loads(line)
            vectors.append(obj['output_embeddings'][0])
    return np.array(vectors)

def sample_embeddings_from_files(file_list, n_samples=10000):
    """Randomly sample embeddings from multiple files for quantizer training."""
    samples = []
    total_collected = 0
    random.shuffle(file_list)
    for file in file_list:
        with open(file, 'r', encoding='utf-8') as f:
            lines = f.readlines()
            n = min(len(lines), n_samples - total_collected)
            if n <= 0:
                break
            chosen = random.sample(range(len(lines)), n)
            for idx in chosen:
                obj = json.loads(lines[idx])
                samples.append(obj['output_embeddings'][0])
                total_collected += 1
                if total_collected >= n_samples:
                    return np.array(samples)
    return np.array(samples)

def train_scalar_quantizer(embeddings):
    from cuvs.preprocessing.quantize import scalar
    embeddings_gpu = cp.asarray(embeddings)
    params = scalar.QuantizerParams(quantile=0.99)
    quantizer = scalar.train(params, embeddings_gpu)
    return quantizer

def train_binary_quantizer(embeddings):
    # For most binary quantization, explicit training is not needed.
    # This function is here for symmetry and future extensibility.
    return None

def binary_quantize_cuvs(embeddings, quantizer=None):
    try:
        from cuvs.preprocessing.quantize import binary
        start_time = time.time()
        embeddings_gpu = cp.asarray(embeddings)
        transformed = binary.transform(embeddings_gpu)
        quantization_time = time.time() - start_time
        # Free GPU memory
        del embeddings_gpu
        cp.get_default_memory_pool().free_all_blocks()
        return cp.asnumpy(transformed), quantization_time, True
    except Exception as e:
        import traceback
        print(f"Error during binary quantization: {str(e)}")
        print("Traceback:")
        print(traceback.format_exc())
        return None, 0.0, False

def scalar_quantize_cuvs(embeddings, quantizer):
    try:
        from cuvs.preprocessing.quantize import scalar
        embeddings_gpu = cp.asarray(embeddings)
        start_time = time.time()
        transformed = scalar.transform(quantizer, embeddings_gpu)
        quantization_time = time.time() - start_time
        # Free GPU memory
        del embeddings_gpu
        cp.get_default_memory_pool().free_all_blocks()
        return cp.asnumpy(transformed), quantization_time, True
    except Exception as e:
        import traceback
        print(f"Error during scalar quantization: {str(e)}")
        print("Traceback:")
        print(traceback.format_exc())
        return None, 0.0, False

def process_file(
    filepath, idx, output_dirs, quantization, quantize_from="full_precision", 
    full_precision_folder=None, scalar_quantizer=None, binary_quantizer=None
):
    try:
        print(f"Processing {filepath} ...")
        base = f"{idx:06d}"

        embeddings = None
        if "full_precision" in quantization or quantize_from == "raw":
            embeddings = load_embeddings_from_jsonl(filepath)

        if "full_precision" in quantization:
            fp_path = os.path.join(full_precision_folder, f"{base}_full.npy")
            np.save(fp_path, embeddings)
            print(f"Saved full-precision to {fp_path}")

        if quantize_from == "full_precision":
            fp_path = os.path.join(full_precision_folder, f"{base}_full.npy")
            if not os.path.exists(fp_path):
                if embeddings is None:
                    embeddings = load_embeddings_from_jsonl(filepath)
                np.save(fp_path, embeddings)
            quant_embeddings = np.load(fp_path)
        elif quantize_from == "raw":
            quant_embeddings = embeddings
        else:
            raise ValueError("quantize_from must be 'full_precision' or 'raw'")

        if "scalar" in quantization:
            sq_path = os.path.join(output_dirs["scalar"], f"{base}_scalar.npy")
            quantized, quant_time, success = scalar_quantize_cuvs(quant_embeddings, scalar_quantizer)
            if success:
                np.save(sq_path, quantized)
                print(f"Saved scalar quantized to {sq_path} (quantization time: {quant_time:.2f}s)")
            else:
                print(f"Scalar quantization failed for {filepath}")

        if "binary" in quantization:
            bq_path = os.path.join(output_dirs["binary"], f"{base}_binary.npy")
            quantized, quant_time, success = binary_quantize_cuvs(quant_embeddings, binary_quantizer)
            if success:
                np.save(bq_path, quantized)
                print(f"Saved binary quantized to {bq_path} (quantization time: {quant_time:.2f}s)")
            else:
                print(f"Binary quantization failed for {filepath}")

    except Exception as e:
        print(f"Failed to process {filepath}: {e}")

def embed_data(
    input_folder,
    output_base_folder,
    quantization=["full_precision", "scalar", "binary"],
    max_workers=2,
    quantize_from="full_precision",  # "full_precision" or "raw"
    n_samples=10000
):
    output_dirs = {}
    for q in quantization:
        if q != "full_precision":
            out_dir = os.path.join(output_base_folder, q)
            os.makedirs(out_dir, exist_ok=True)
            output_dirs[q] = out_dir

    full_precision_folder = os.path.join(output_base_folder, "full_precision")
    os.makedirs(full_precision_folder, exist_ok=True)

    files = sorted([os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith('.json')])

    # Train quantizers on random samples
    print("Sampling embeddings for quantizer training...")
    sample_array = sample_embeddings_from_files(files, n_samples=n_samples)
    print(f"Sampled {sample_array.shape[0]} embeddings for quantizer training.")

    scalar_quantizer = None
    binary_quantizer = None
    if "scalar" in quantization:
        print("Training scalar quantizer...")
        scalar_quantizer = train_scalar_quantizer(sample_array)
        print("Scalar quantizer trained.")
    if "binary" in quantization:
        print("Training binary quantizer (if needed)...")
        binary_quantizer = train_binary_quantizer(sample_array)
        print("Binary quantizer ready.")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(
                process_file, f, idx, output_dirs, quantization, quantize_from, 
                full_precision_folder, scalar_quantizer, binary_quantizer
            )
            for idx, f in enumerate(files)
        ]
        for fut in as_completed(futures):
            fut.result()  # to raise exceptions if any



In [3]:
embed_data(
    input_folder="/home/ubuntu/amazon_ads/amazon_ads_data",
    output_base_folder="/home/ubuntu/amazon_ads/embeddings",
    quantization=["scalar"],
    max_workers=2,  # Keep this low to avoid memory issues
    quantize_from="full_precision",
    n_samples=500000
)

Sampling embeddings for quantizer training...
Sampled 500000 embeddings for quantizer training.
Training scalar quantizer...
Scalar quantizer trained.
Processing /home/ubuntu/amazon_ads/amazon_ads_data/part-04614-6c18e020-9565-4603-a05c-42cdd450a8d5-c000.json ...
Processing /home/ubuntu/amazon_ads/amazon_ads_data/part-04378-6c18e020-9565-4603-a05c-42cdd450a8d5-c000.json ...
Saved scalar quantized to /home/ubuntu/amazon_ads/embeddings/scalar/000000_scalar.npy (quantization time: 0.01s)
Processing /home/ubuntu/amazon_ads/amazon_ads_data/part-01383-6c18e020-9565-4603-a05c-42cdd450a8d5-c000.json ...
Saved scalar quantized to /home/ubuntu/amazon_ads/embeddings/scalar/000002_scalar.npy (quantization time: 0.00s)
Processing /home/ubuntu/amazon_ads/amazon_ads_data/part-00244-6c18e020-9565-4603-a05c-42cdd450a8d5-c000.json ...
Saved scalar quantized to /home/ubuntu/amazon_ads/embeddings/scalar/000003_scalar.npy (quantization time: 0.00s)
Processing /home/ubuntu/amazon_ads/amazon_ads_data/part-08

In [None]:
# Write another code base to compare HNSW FAISS (FP, SQ) and CAGRA (FP, SQ) to create full sweep paretos on a subset of data
# (index build time, query throughput, query latency)


In [None]:
# Build a cagra index with quantized embeddings and save them-


# Build an hnsw index with faiss BQ vectors and quantize them-



In [12]:
import numpy as np
np.load("/home/ubuntu/amazon_ads/embeddings/binary/000001_bq.npy")

array([[131, 218, 191, ..., 223,  12, 184],
       [231,  74,  37, ..., 220,   5, 248],
       [135,  89, 189, ..., 126,   4, 112],
       ...,
       [215, 219,  25, ..., 214,  74, 176],
       [180, 214, 159, ..., 214,  14, 169],
       [174,  82, 189, ..., 126,   4, 189]], shape=(27547, 8), dtype=uint8)

In [12]:
import numpy as np
np.load("/home/ubuntu/data/home/ubuntu/amazon_ads/embeddings/half_precision/000004_fp16.npy").shape, np.load("/home/ubuntu/data/home/ubuntu/amazon_ads/embeddings/half_precision/000004_fp16.npy").dtype

((27548, 64), dtype('float16'))

In [10]:
np.load("/home/ubuntu/data/home/ubuntu/amazon_ads/embeddings/full_precision/000004_full.npy").shape, np.load("/home/ubuntu/data/home/ubuntu/amazon_ads/embeddings/full_precision/000004_full.npy").dtype

((27548, 64), dtype('float64'))

247932000