In [1]:
# Python reference for a Unified Random Sampler (Uniform, Ternary, Discrete Gaussian via CDT)
# Author: Muhammad Ogin Hasanuddin
# Requirements: numpy

import numpy as np

In [2]:
qp = np.array([281474976317441, 140737518764033, 140737470791681, 140737513783297,
        140737471578113, 140737513259009, 140737471971329, 140737509851137,
        140737480359937, 140737509457921, 140737481801729, 140737508671489,
        140737482981377, 140737506705409, 140737483898881, 140737504608257,
        140737484685313, 140737499496449, 140737485864961, 140737493729281,
        140737486520321, 140737490976769, 140737487306753, 140737488486401,
        281474975662081, 281474974482433, 281474966880257, 281474962554881,
        281474960326657, 281474957180929, 281474955476993, 281474952462337],
       dtype=object)

In [None]:
# choose smallest 48-bit prime q such that (1<<64)//q=65536 
q = 281470681808977

In [2]:
# choose smallest 47-bit prime q such that (1<<64)//q=131072
q = 140736414621701

In [15]:
def barrett(x: int, q: int) -> int:
    """
    Barrett reduction for 0 <= x < 2^63 and ~48-bit q.
    Choose k so that 2^k > q (k=64 is fine).
    r = x - floor((x*mu)/2^k) * q, then correct to [0,q).
    """
    k = 64
    mu = (1 << k) // q                      # floor(2^k / q)
    print(mu)
    t  = (x  >> 47)    # floor((x*mu)/2^k)
    print(t)
    r  = (x - t * q)          # r in [-(q-1), 2q-1]

    # Correct r into [0, q)
    r = r - q if r >= q else r

    return r


In [16]:
barrett(8214229796008494596, 140736414621701)

131072
58365


8220198294030

In [3]:
# Reference: https://ieeexplore.ieee.org/abstract/document/11043509
# ------------------------------
# Helpers (Barrett reduce, etc.)
# ------------------------------
import numpy as np

def barrett_classic(x: np.ndarray, q: int) -> np.ndarray:
    """
    Barrett reduction for 0 <= x < 2^63 and ~48-bit q.
    Choose k so that 2^k > q (k=64 is fine).
    r = x - floor((x*mu)/2^k) * q, then correct to [0,q).
    """
    k = 64
    mu = (1 << k) // q                      # floor(2^k / q)

    xo = x.astype(object)                   # Python big-int arithmetic
    t  = ((xo * mu) >> k).astype(object)    # floor((x*mu)/2^k)
    r  = (xo - t * q).astype(int)           # r in [-(q-1), 2q-1]

    # Correct r into [0, q)
    r = np.where(r >= q, r - q, r)
    r = np.where(r <  0, r + q, r)

    return r.astype(np.int64)


In [12]:
import numpy as np

def barrett_reduce(x: np.ndarray, q: int) -> np.ndarray:
    """
    Barrett reduction for 0 <= x < 2^63 and ~48-bit q.
    Choose k so that 2^k > q (k=64 is fine).
    r = x - floor((x*mu)/2^k) * q, then correct to [0,q).
    """
    k = 64
    mu = (1 << k) // q  # Precomputed value: floor(2^k / q)

    # Ensure x is handled with sufficient precision for the multiplication x * mu
    # Using .astype(object) allows Python's arbitrary precision integers for the calculation
    x_obj = x.astype(object)
    mu_obj = int(mu) # Ensure mu is a standard int

    # Calculate t = floor((x * mu) / 2^k) = (x * mu) >> k
    # Use big-integer arithmetic for the multiplication and shift
    # t_obj = (x_obj << mu_obj) >> k
    t_obj = (x_obj >> 48)

    # Calculate r = x - t * q
    # Use big-integer arithmetic for the multiplication t * q
    r_obj = x_obj - (t_obj * int(q))

    # Convert back to standard integer type for numpy operations
    # Ensure the result fits in int64
    r = r_obj.astype(np.int64)

    # Correct r into [0, q) if necessary
    # These operations should also be safe with int64 values if r is in the expected range
    r = np.where(r >= q, r - q, r)
    # Check for potential slight underflow after first correction, though unlikely with k=64
    # r = np.where(r < 0, r + q, r)

    return r

def test_barrett_reduction():
    """
    Driver function to test the corrected barrett_reduce function.
    """
    # Provided moduli
    qp = np.array([281474976317441, 140737518764033, 140737470791681, 140737513783297,
                   140737471578113, 140737513259009, 140737471971329, 140737509851137,
                   140737480359937, 140737509457921, 140737481801729, 140737508671489,
                   140737482981377, 140737506705409, 140737483898881, 140737504608257,
                   140737484685313, 140737499496449, 140737485864961, 140737493729281,
                   140737486520321, 140737490976769, 140737487306753, 140737488486401,
                   281474975662081, 281474974482433, 281474966880257, 281474962554881,
                   281474960326657, 281474957180929, 281474955476993, 281474952462337],
                  dtype=object)

    num_tests_per_q = 1000 # Number of random tests to perform per modulus
    max_x = (1 << 63) - 1 # Maximum allowed x value (2^63 - 1)

    print(f"Testing CORRECTED Barrett Reduction with {len(qp)} moduli...")
    print(f"Each modulus will be tested with {num_tests_per_q} random values x, where 0 <= x < 2^63.")
    print("-" * 50)

    all_passed = True
    for i, q in enumerate(qp):
        print(f"Testing q = {q} (index {i+1}/{len(qp)})...")
        # Generate random x values in the range [0, 2^63 - 1]
        # Using numpy's random integer generator for efficiency
        # Note: numpy int64 max is 2^63 - 1, so we generate uint64 and cast carefully if needed,
        # but since max_x < 2^63, standard int64 generation should be fine if max_x is within int64 range.
        # max_x = 2^63 - 1 = 9223372036854775807, which is exactly int64 max.
        # Using uint64 is safer to guarantee the full range [0, 2^63-1] can be represented before modulo check.
        x_raw = np.random.randint(0, high=max_x+1, size=num_tests_per_q, dtype=np.uint64)
        # Ensure x_raw values are within [0, 2^63 - 1] explicitly (though randint should handle this)
        x_raw = np.clip(x_raw, 0, max_x).astype(np.uint64)
        # Convert to standard int64 array for input to the function
        x_test = x_raw.astype(np.int64)

        # Calculate expected result using standard modulo
        expected_results = np.mod(x_test, q).astype(np.int64)

        # Calculate result using the CORRECTED Barrett function
        try:
            barrett_results = barrett_reduce(x_test, int(q)) # Ensure q is a standard int for the function
        except Exception as e:
            print(f"  ERROR in barrett_reduce call for q={q}: {e}")
            all_passed = False
            continue

        # Check for correctness
        # Use numpy's array_equal for integer comparison
        if np.array_equal(barrett_results, expected_results):
            print(f"  PASS: All {num_tests_per_q} results matched for q={q}")
        else:
            print(f"  FAIL: Mismatch found for q={q}")
            # Optional: Print first few mismatched values for debugging
            mismatch_indices = np.where(barrett_results != expected_results)[0]
            if len(mismatch_indices) > 0:
                first_mismatch_idx = mismatch_indices[0]
                print(f"    First mismatch at index {first_mismatch_idx}: x={x_test[first_mismatch_idx]}, Barrett={barrett_results[first_mismatch_idx]}, Expected={expected_results[first_mismatch_idx]}")
            all_passed = False

    print("-" * 50)
    if all_passed:
        print("OVERALL RESULT: All tests PASSED! Corrected Barrett reduction function seems correct.")
    else:
        print("OVERALL RESULT: Some tests FAILED! Check the output above for details.")

if __name__ == "__main__":
    # Set a random seed for reproducible testing (optional)
    # np.random.seed(42)
    test_barrett_reduction()

Testing CORRECTED Barrett Reduction with 32 moduli...
Each modulus will be tested with 1000 random values x, where 0 <= x < 2^63.
--------------------------------------------------
Testing q = 281474976317441 (index 1/32)...
  PASS: All 1000 results matched for q=281474976317441
Testing q = 140737518764033 (index 2/32)...
  FAIL: Mismatch found for q=140737518764033
    First mismatch at index 0: x=3477984934839428000, Barrett=1738891415472272219, Expected=79371142644504
Testing q = 140737470791681 (index 3/32)...
  FAIL: Mismatch found for q=140737470791681
    First mismatch at index 0: x=614347460809006153, Barrett=307117562070766530, Expected=28400803318588
Testing q = 140737513783297 (index 4/32)...
  FAIL: Mismatch found for q=140737513783297
    First mismatch at index 0: x=8568090417902203619, Barrett=4284040498338642939, Expected=131316288865556
Testing q = 140737471578113 (index 5/32)...
  FAIL: Mismatch found for q=140737471578113
    First mismatch at index 0: x=10124067255

In [12]:
# ------------------------------
# Testbench
# ------------------------------
def test_barrett_reduce():
    # Choose a modulus q (e.g., a 30-bit prime commonly used in FHE/RNS)
    q = qp[0]  # Example: 2^30 - 35, a common FHE-friendly prime

    # Generate random test values: 0 <= x < q^2
    np.random.seed(42)
    num_tests = 1000
    max_x = q * q - 1
    x_vals = np.random.randint(0, min(max_x, 2**63 - 1), size=num_tests, dtype=np.int64)

    # Compute expected results using built-in modulo
    expected = x_vals % q

    # Compute Barrett reduction
    actual = barrett_reduce(x_vals, q)

    # Compare
    if np.array_equal(actual, expected):
        print("✅ All tests passed!")
    else:
        # Find first mismatch
        mismatches = np.where(actual != expected)[0]
        if len(mismatches) > 0:
            i = mismatches[0]
            print(f"❌ Mismatch at index {i}:")
            print(f"  x = {x_vals[i]}")
            print(f"  Expected: {expected[i]}")
            print(f"  Got:      {actual[i]}")
            print(f"  Modulus q = {q}")
        assert False, "Barrett reduction failed!"

if __name__ == "__main__":
    test_barrett_reduce()

✅ All tests passed!


In [10]:
6908764162452635058%281474976317441

242343717363154

In [None]:
# ------------------------------
# Uniform mod-q sampler
# ------------------------------
def sample_uniform_mod_q(n: int, q: int, use_barrett: bool = False,
                         rng: np.random.Generator | None = None) -> np.ndarray:
    """
    Uniform residues in [0, q). The hardware typically uses Barrett; for
    Python we can use x % q or enable barrett for parity with RTL.
    """
    if rng is None:
        rng = np.random.default_rng()
    # Draw 64-bit randoms (as if from a PRNG/TRNG) and reduce mod q
    raw = rng.integers(0, 1 << 63, size=n, dtype=np.int64)
    if use_barrett:
        return barrett_reduce(raw, q)
    else:
        return (raw % q).astype(np.int64)

In [2]:
import numpy as np
import math
from typing import Union

# --- Constants (Typically defined elsewhere in OpenFHE) ---
DUG_CHUNK_WIDTH = 32  # Standard 32-bit chunks
DUG_CHUNK_MIN = 0     # Minimum value for a chunk (typically 0)
DUG_CHUNK_MAX = 2**DUG_CHUNK_WIDTH - 1  # Maximum value for a chunk (0xFFFFFFFF for 32-bit)

def sample_discrete_uniform_openfhe_style(modulus: Union[int, np.integer], rng: np.random.Generator | None = None) -> int:
    """
    Generates a single random integer uniformly in the range [0, modulus) using
    the chunked rejection sampling approach similar to OpenFHE's DiscreteUniformGeneratorImpl.

    Args:
        modulus: The upper bound (exclusive) for the generated integer.
        rng: A numpy random number generator. If None, numpy's default generator is used.

    Returns:
        A random integer r such that 0 <= r < modulus.

    Raises:
        ValueError: If modulus is not a positive integer.
    """
    if rng is None:
        rng = np.random.default_rng()

    if not isinstance(modulus, (int, np.integer)) or modulus <= 0:
        raise ValueError("Modulus must be a positive integer.")

    if modulus == 1:
        return 0 # Only option is 0

    # --- Pre-calculate values similar to SetModulus in OpenFHE ---
    # Get the number of bits needed to represent the modulus (excluding the sign bit for large ints)
    modulus_bits = int(math.ceil(math.log2(modulus)))
    
    # Calculate how many full chunks we need
    # The formula (modulus_bits - 1) // DUG_CHUNK_WIDTH is from the C++ code.
    # Example: modulus=7681 (13 bits). (13-1)//32 = 0. So m_chunksPerValue = 0.
    # Example: modulus=2^32+1 (33 bits). (33-1)//32 = 1. So m_chunksPerValue = 1.
    m_chunks_per_value = (modulus_bits - 1) // DUG_CHUNK_WIDTH
    
    # Calculate the shift amount for the top chunk
    m_shift_chunk = m_chunks_per_value * DUG_CHUNK_WIDTH

    # Calculate the upper bound for the top chunk
    # This is (m_modulus >> m_shift_chunk).ConvertToInt() from the C++ code.
    top_chunk_modulus_val = modulus >> m_shift_chunk
    # The bound for the uniform distribution is [DUG_CHUNK_MIN, top_chunk_modulus_val]
    # However, if top_chunk_modulus_val > DUG_CHUNK_MAX, we need to cap it.
    # In typical usage, top_chunk_modulus_val is much smaller than 2^32 - 1.
    m_bound = min(top_chunk_modulus_val, DUG_CHUNK_MAX)

    # --- Rejection Sampling Loop ---
    while True:
        result = 0
        
        # Generate full chunks
        for i in range(m_chunks_per_value):
            shift = i * DUG_CHUNK_WIDTH
            # Generate a random 32-bit integer in [DUG_CHUNK_MIN, DUG_CHUNK_MAX]
            chunk_val = rng.integers(DUG_CHUNK_MIN, DUG_CHUNK_MAX + 1, dtype=np.uint32)
            result += int(chunk_val) << shift # Cast to int to handle large shifts

        # Generate the top chunk
        # Generate a random integer in [DUG_CHUNK_MIN, m_bound]
        top_chunk_val = rng.integers(DUG_CHUNK_MIN, m_bound + 1, dtype=np.uint32)
        result += int(top_chunk_val) << m_shift_chunk

        # Rejection check
        if result < modulus:
            return result

# --- Example Usage ---
if __name__ == "__main__":
    # modulus_q = 7681 # Example modulus from OpenFHE context
    modulus_q = 281474976317441 # Another example modulus
    num_samples = 1 << 16
    seed = 42
    rng = np.random.default_rng(seed)

    samples = [sample_discrete_uniform_openfhe_style(modulus_q, rng) for _ in range(num_samples)]

    # Basic validation
    assert all(0 <= s < modulus_q for s in samples), "Sample out of range!"
    print(f"Generated {num_samples} samples modulo {modulus_q}")
    print(f"Min: {min(samples)}, Max: {max(samples)}")
    print(f"Mean (expected ~ {modulus_q/2:.2f}): {np.mean(samples):.2f}")
    print(f"Std (expected ~ {modulus_q/3.464:.2f}): {np.std(samples):.2f}")
    # The expected std dev for uniform [0, q-1] is (q^2 - 1)/12, std = sqrt((q^2-1)/12)
    expected_std = np.sqrt((modulus_q**2 - 1) / 12)
    print(f"Expected Std Dev: {expected_std:.2f}")

    # Plot histogram for visual check (optional, requires matplotlib)
    # import matplotlib.pyplot as plt
    # plt.hist(samples, bins=min(50, modulus_q), density=True, alpha=0.7)
    # plt.title(f'Distribution of {num_samples} Uniform Samples mod {modulus_q}')
    # plt.xlabel('Value')
    # plt.ylabel('Probability Density')
    # plt.axhline(y=1.0/modulus_q, color='r', linestyle='--', label='Expected Uniform Density')
    # plt.legend()
    # plt.show()


Generated 65536 samples modulo 281474976317441
Min: 285378300, Max: 281473699850184
Mean (expected ~ 140737488158720.50): 141045321367020.19
Std (expected ~ 81257210253302.83): 81217009103461.89
Expected Std Dev: 81254826673509.05


In [3]:
import numpy as np
import math
from typing import Union

# --- Constants (Typically defined elsewhere in OpenFHE) ---
DUG_CHUNK_WIDTH = 32  # Standard 32-bit chunks
DUG_CHUNK_MIN = 0     # Minimum value for a chunk (typically 0)
DUG_CHUNK_MAX = 2**DUG_CHUNK_WIDTH - 1  # Maximum value for a chunk (0xFFFFFFFF for 32-bit)

def sample_discrete_uniform_openfhe_style(modulus: int, rng: np.random.Generator | None = None) -> int:
    """
    Generates a single random integer uniformly in the range [0, modulus) using
    the chunked rejection sampling approach similar to OpenFHE's DiscreteUniformGeneratorImpl.

    Args:
        modulus: The upper bound (exclusive) for the generated integer (must be positive).
        rng: A numpy random number generator. If None, numpy's default generator is used.

    Returns:
        A random integer r such that 0 <= r < modulus.

    Raises:
        ValueError: If modulus is not a positive integer.
    """
    if rng is None:
        rng = np.random.default_rng()

    if not isinstance(modulus, int) or modulus <= 0:
        raise ValueError("Modulus must be a positive integer.")

    if modulus == 1:
        return 0 # Only option is 0

    # --- Pre-calculate values similar to SetModulus in OpenFHE ---
    # Get the number of bits needed to represent the modulus (excluding the sign bit for large ints)
    modulus_bits = modulus.bit_length() # More direct way than math.ceil(math.log2(modulus))
    
    # Calculate how many full chunks we need
    # The formula (modulus_bits - 1) // DUG_CHUNK_WIDTH is from the C++ code.
    m_chunks_per_value = (modulus_bits - 1) // DUG_CHUNK_WIDTH
    
    # Calculate the shift amount for the top chunk
    m_shift_chunk = m_chunks_per_value * DUG_CHUNK_WIDTH

    # Calculate the upper bound for the top chunk's distribution parameter
    # This is (m_modulus >> m_shift_chunk).
    top_chunk_modulus_val = modulus >> m_shift_chunk
    # The bound for the uniform distribution is [DUG_CHUNK_MIN, top_chunk_modulus_val]
    # However, if top_chunk_modulus_val > DUG_CHUNK_MAX, we need to cap it.
    # In typical usage, top_chunk_modulus_val is much smaller than 2^32 - 1.
    m_bound_upper = min(top_chunk_modulus_val, DUG_CHUNK_MAX)
    # For this Python version, we just store the upper bound value directly
    # In C++, m_bound is std::uniform_int_distribution<uint32_t>::param_type(DUG_CHUNK_MIN, m_bound_upper)
    # So, a.b() would be m_bound_upper, and a.a() would be DUG_CHUNK_MIN

    print("DiscreteUniformGeneratorImpl GenerateInteger")

    # --- Rejection Sampling Loop ---
    while True:
        result = 0
        
        # Generate full chunks
        for i in range(m_chunks_per_value):
            shift = i * DUG_CHUNK_WIDTH
            # Generate a random 32-bit integer in [DUG_CHUNK_MIN, DUG_CHUNK_MAX]
            chunk_val = rng.integers(DUG_CHUNK_MIN, DUG_CHUNK_MAX + 1, dtype=np.uint32)
            result += int(chunk_val) << shift # Cast to int to handle large shifts
            print("shift")
            print(shift)

        # Generate the top chunk
        # Generate a random integer in [DUG_CHUNK_MIN, m_bound_upper]
        top_chunk_val = rng.integers(DUG_CHUNK_MIN, m_bound_upper + 1, dtype=np.uint32)
        result += int(top_chunk_val) << m_shift_chunk

        print("m_shiftChunk")
        print(m_shift_chunk)
        print("m_bound upper limit (b):", m_bound_upper) # Access the 'b' member (upper bound)
        print("m_bound lower limit (a):", DUG_CHUNK_MIN) # Access the 'a' member (should be DUG_CHUNK_MIN)
        print(modulus)
        
        # Rejection check
        if result < modulus:
            print("result")
            print(result)
            return result
        # If result >= modulus, loop continues to generate a new candidate

# --- Example Usage ---
if __name__ == "__main__":
    modulus_q = 7681 # Example modulus from OpenFHE context
    # modulus_q = 281474976317441 # Another example modulus
    seed = 42
    rng = np.random.default_rng(seed)

    print(f"Sampling one value modulo {modulus_q}...")
    sample = sample_discrete_uniform_openfhe_style(modulus_q, rng)
    print(f"Final Sample: {sample}")
    print(f"Is sample < modulus? {sample < modulus_q}")

Sampling one value modulo 7681...
DiscreteUniformGeneratorImpl GenerateInteger
m_shiftChunk
0
m_bound upper limit (b): 7681
m_bound lower limit (a): 0
7681
result
685
Final Sample: 685
Is sample < modulus? True
