In [5]:
import json
import urllib.request
import time

In [6]:
BEACON_URL = "https://beacon.nist.gov/beacon/2.0/chain/last/pulse/last"

In [7]:
def fetch_latest_pulse():
    """Fetch latest NIST beacon pulse as dict."""
    req = urllib.request.Request(
        BEACON_URL,
        headers={"Accept": "application/json"}
    )
    with urllib.request.urlopen(req, timeout=10) as r:
        data = r.read().decode("utf-8")
    obj = json.loads(data)

    pulse = obj.get("pulse")
    if not pulse:
        raise ValueError("Response missing 'pulse'")

    return pulse


def wait_for_next_pulse_int(poll_interval=1.0):
    start_time = time.time()

    last = fetch_latest_pulse()
    last_id = last.get("pulseIndex")

    while True:
        cur = fetch_latest_pulse()
        cur_id = cur.get("pulseIndex")
        if cur_id != last_id and cur_id is not None:
            hex_out = cur.get("outputValue") or cur.get("randOut")
            if not hex_out:
                raise ValueError("New pulse missing outputValue/randOut")
            return int(hex_out, 16)
        
        now_time = time.time()
        elaps_time = now_time - start_time
        print(f"\rWaiting for next pulse, elapsed time: {round(elaps_time):4.0f}s ", end="", flush=True)
        time.sleep(poll_interval)


class SinglePulseBitPool:
    """A fixed 512-bit pool. No refills allowed."""
    def __init__(self, pulse_int_512):
        self.pool = pulse_int_512
        self.pool_bits = 512

    def get_bits(self, k):
        if k <= 0:
            return 0
        if k > self.pool_bits:
            raise RuntimeError(
                f"Single pulse exhausted: need {k} more bits, "
                f"only {self.pool_bits} left."
            )
        mask = (1 << k) - 1
        val = self.pool & mask
        self.pool >>= k
        self.pool_bits -= k
        return val


def bits_needed_for_span(span: int) -> int:
    """Ceiling log2(span), with exact handling for powers of two."""
    if span <= 1:
        return 0
    bl = span.bit_length()
    if span == 1 << (bl - 1):
        return bl - 1
    return bl


def quantum_randint(low, high, n_samples=1):
    if high < low:
        raise ValueError("high must be >= low")
    if n_samples < 1:
        raise ValueError("n_samples must be >= 1")

    span = high - low + 1
    if span == 1:
        return low if n_samples == 1 else [low] * n_samples

    bits_per_sample = bits_needed_for_span(span)

    # start with one pulse
    pool = SinglePulseBitPool(wait_for_next_pulse_int())

    def one_sample():
        k = bits_per_sample
        while True:
            try:
                u = pool.get_bits(k)
            except RuntimeError:
                # refill with next pulse
                pool.__init__(wait_for_next_pulse_int())
                u = pool.get_bits(k)
            if u < span:
                return low + u

    if n_samples == 1:
        return one_sample()
    return [one_sample() for _ in range(n_samples)]

In [8]:
# FUNCTIONALITY TEST
coin_flips = quantum_randint(0, 1, n_samples=8)
print("coin flips:", coin_flips)
dice_rolls = quantum_randint(1, 6, n_samples=5)
print("dice:", dice_rolls)
randint64 = quantum_randint(-(1 << 63), (1 << 63) - 1)
print("int64:", randint64)


Waiting for next pulse, elapsed time:   35s coin flips: [1, 0, 0, 1, 1, 1, 1, 1]
Waiting for next pulse, elapsed time:   58s dice: [6, 6, 5, 6, 4]
Waiting for next pulse, elapsed time:   60s int64: -9098039026923359170
