diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b2dd94..2c49ba1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,22 +7,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -### Fixed - -- **`FirstFitBStackAllocator::new` — recovery triggered with `recovery_needed == 0` could fail spuriously under the `atomic` feature** (`alloc` + `set` features): When `new` decides to run recovery from an out-of-range `free_head` rather than from the flag itself (so the on-disk flag is still `0`), the recovery routine's final clear under the `atomic` feature performed `BStack::cas(1 → 0)` and surfaced a "recovery_needed was not set when clearing" error, making `new` return `Err` on a stack it could otherwise have repaired. Recovery now resets the flag with a direct `BStack::set` of zeros: it is authoritative and must run regardless of the prior flag value. Non-atomic behaviour is unchanged (the helper already did a plain set). -- **`FirstFitBStackAllocator::realloc` — tail-grow missing `recovery_needed` guard** (`alloc` + `set` features): A multi-step tail grow (`BStack::extend`, then zero, header, footer) without the recovery flag could leave an unrecoverable mid-arena layout on crash: for a grow delta of ≥ 24 bytes, a crash after `extend` but before the header write left the old block's valid header followed by zero bytes that recovery would read as a corrupt block (`"recovery: corrupted block header ... manual repair required"`), refusing to proceed. Tail-grow now sets `recovery_needed` before `extend` and clears it after the footer write in both atomic and non-atomic builds. - ### Added +- `BStack::try_extend_zeros` (`atomic`): appends zeros only if current size matches; conditional atomic extend. +- `BStack::get_batched` (`atomic`): reads multiple byte ranges under one lock. +- `BStack::get_batched_into` (`atomic`): like `get_batched` but into caller-provided buffers. +- `BStack::get_batched_gen` (`atomic`): like `get_batched_into` but with a generator closure for dependent reads. +- `BStack::cross_exchange` (`set` + `atomic`): atomically swaps two non-overlapping byte regions. +- `BStack::copy` (`set` + `atomic`): copies a byte region to another offset under one lock. +- `BStack::eq_crds` (`set` + `atomic`): writes region B only if region A equals an expected value (compare-and-swap across two regions). +- `BStack::ne_crds` (`set` + `atomic`): like `eq_crds` but writes when region A does not match. +- `BStack::masked_eq_crds` (`set` + `atomic`): like `eq_crds` with a bitmask applied to the comparison. +- `BStack::masked_ne_crds` (`set` + `atomic`): like `ne_crds` with a bitmask applied to the comparison. - **`CheckedSlabBStackAllocator`** (`alloc` + `set` features) *(Experimental)*: New crash-recoverable fixed-block slab allocator. Every block carries an 8-byte overhead prefix: zero when free (`data[0..8]` holds the next-free block offset, sentinel `0`), high bit set with the block count in the low bits when in use. The double-free guard reads the overhead and returns `InvalidInput` before touching the free list. Leaked blocks are recoverable by a linear scan. Constructor takes `data_size` (usable bytes per block, ≥ 8); the on-disk `block_size` is `data_size + 8`. Magic: `ALCK\x00\x01\x00\x00`. Multi-block allocations always extend the tail (the free list holds single blocks only). Crash-consistency model: block payloads are written before `free_head` is updated, and the in-use high bit is flipped last, so a crash at any step leaks at most the current block without corrupting the rest of the list. ### Changed -- **`FirstFitBStackAllocator` version bumped to 0.1.3** (`alloc` + `set` features): Magic number updated from `ALFF\x00\x01\x02\x00` to `ALFF\x00\x01\x03\x00`. This reflects the crash-recovery fix by adding the missing recovery guard around `realloc` tail-grow. This version new version of `FirstFitBStackAllocator` is also thread-safe when used with the `atomic` feature. Existing 0.1.x files remain fully compatible (only the first 6 bytes are checked on open). +- **`FirstFitBStackAllocator` version bumped to 0.1.3`** (`alloc` + `set` features): Magic number updated from `ALFF\x00\x01\x02\x00` to `ALFF\x00\x01\x03\x00`. This reflects the crash-recovery fix by adding the missing recovery guard around `realloc` tail-grow. This version new version of `FirstFitBStackAllocator` is also thread-safe when used with the `atomic` feature. Existing 0.1.x files remain fully compatible (only the first 6 bytes are checked on open). - **`FirstFitBStackAllocator` is `Send + Sync` with the `atomic` feature** (`alloc` + `set` features): Without `atomic`, `FirstFitBStackAllocator` remains `Send` but not `Sync` — operations mutate the on-disk free list in several steps, a data race under concurrent `&self` access. With the `atomic` feature it gains `Sync`: an internal `std::sync::Mutex` serializes the two compound operations not already made atomic by `BStack`'s per-call write lock — free-list mutation and stack extension/discard — while in-place writes within an already-allocated block (in-bucket grow, same-block zeroing) stay lock-free. The `recovery_needed` flag is now updated with a compare-and-swap (no extra cost over the disk write it performs regardless), which also rejects operating on a stack left in a needs-recovery state. Unlike `LinearBStackAllocator`'s optimistic `try_extend`/`try_discard` (which reports a lost tail race as `Unsupported`), a contended `FirstFit` operation blocks on the mutex. Structurally, the `#[cfg(not(feature = "atomic"))] PhantomData>` field opts out of `Sync`; under `atomic` that field is replaced by the `Mutex`, which makes the type `Sync` without an `unsafe impl`. Documentation updated across type-level docs, module overview, crate overview, and README. -- **`FirstFitBStackAllocator::alloc` and `realloc` - optimizes heap memory allocation out of the if statement and lock scope** (`alloc` + `set` features): Both methods now compute the aligned length and allocate the return buffer before acquiring the write lock, so the critical section only covers the actual stack mutation. This reduces contention and latency under concurrent access. +- **`FirstFitBStackAllocator::alloc` and `realloc` — optimizes heap memory allocation out of the if statement and lock scope** (`alloc` + `set` features): Both methods now compute the aligned length and allocate the return buffer before acquiring the write lock, so the critical section only covers the actual stack mutation. This reduces contention and latency under concurrent access. - **`FirstFitBStackAllocator::cascade_discard_free_tail` — remove unneeded recovery needed flag operations** (`alloc` + `set` features): This helper function is only called from `dealloc` when the freed block is at the tail, so the caller is already responsible for setting and clearing `recovery_needed` around the `discard` call. The helper no longer touches the flag, eliminating redundant writes and reentry into non-reentrant lock in atomic builds that is created by cas of `recovery_needed` in the loop. +### Fixed + +- **`FirstFitBStackAllocator::new` — recovery triggered with `recovery_needed == 0` could fail spuriously under the `atomic` feature** (`alloc` + `set` features): When `new` decides to run recovery from an out-of-range `free_head` rather than from the flag itself (so the on-disk flag is still `0`), the recovery routine's final clear under the `atomic` feature performed `BStack::cas(1 → 0)` and surfaced a "recovery_needed was not set when clearing" error, making `new` return `Err` on a stack it could otherwise have repaired. Recovery now resets the flag with a direct `BStack::set` of zeros: it is authoritative and must run regardless of the prior flag value. Non-atomic behaviour is unchanged (the helper already did a plain set). +- **`FirstFitBStackAllocator::realloc` — tail-grow missing `recovery_needed` guard** (`alloc` + `set` features): A multi-step tail grow (`BStack::extend`, then zero, header, footer) without the recovery flag could leave an unrecoverable mid-arena layout on crash: for a grow delta of ≥ 24 bytes, a crash after `extend` but before the header write left the old block's valid header followed by zero bytes that recovery would read as a corrupt block (`"recovery: corrupted block header ... manual repair required"`), refusing to proceed. Tail-grow now sets `recovery_needed` before `extend` and clears it after the footer write in both atomic and non-atomic builds. + ## [0.2.2] - 2026-05-26 ### Added diff --git a/Cargo.toml b/Cargo.toml index 3241576..3c220b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,18 @@ required-features = ["atomic"] name = "atomic_race" required-features = ["atomic"] +[[example]] +name = "move_and_cow" +required-features = ["set", "atomic"] + +[[example]] +name = "linked_list" +required-features = ["atomic"] + +[[example]] +name = "checksummed_cache" +required-features = ["set", "atomic"] + [dev-dependencies] rand = "0.10.1" criterion = { version = "0.5", features = ["html_reports"] } diff --git a/Makefile b/Makefile index c9a1313..226419f 100644 --- a/Makefile +++ b/Makefile @@ -267,7 +267,8 @@ clean-zip: rm -rf $(BUILD)/*.tar.gz $(BUILD)/*.zip clean-data: - rm -rf **/*.bstack + @rm -rf **/*.bstack + @rm -rf *.bstack # ── Zip ─────────────────────────────────────────────────────────────────────── zip: $(BUILD) diff --git a/c/Makefile b/c/Makefile index b3e69d3..5e1069c 100644 --- a/c/Makefile +++ b/c/Makefile @@ -62,14 +62,18 @@ LIB_BYTEVEC_SET = libbstack-bytevec-set.a TEST_BV_BIN = test_bstack_bytevec$(EXE_EXT) TEST_BV_OBJ = test_bstack_bytevec.o -EXAMPLE_NAMES = basic buffer_reuse journal reading vec_store -EXAMPLE_BINS = $(addprefix ../examples/,$(addsuffix $(EXE_EXT),$(EXAMPLE_NAMES))) -HASHMAP_BIN = ../examples/hashmap$(EXE_EXT) -ATOMIC_BIN = ../examples/atomic_ops$(EXE_EXT) +EXAMPLE_NAMES = basic buffer_reuse journal reading vec_store +EXAMPLE_BINS = $(addprefix ../examples/,$(addsuffix $(EXE_EXT),$(EXAMPLE_NAMES))) +HASHMAP_BIN = ../examples/hashmap$(EXE_EXT) +ATOMIC_BIN = ../examples/atomic_ops$(EXE_EXT) +MOVE_COW_BIN = ../examples/move_and_cow$(EXE_EXT) +LINKED_LIST_BIN = ../examples/linked_list$(EXE_EXT) +CHECKSUMMED_CACHE_BIN = ../examples/checksummed_cache$(EXE_EXT) .PHONY: all test test-set test-atomic test-set-atomic test-first-fit test-first-fit-atomic test-ghost-tree test-slab test-checked-slab test-bytevec leaks clean alloc \ example example-basic example-buffer_reuse example-journal \ - example-reading example-vec_store example-hashmap example-atomic_ops + example-reading example-vec_store example-hashmap example-atomic_ops example-move_and_cow \ + example-linked_list example-checksummed_cache all: $(LIB) @@ -241,6 +245,15 @@ endif ../examples/atomic_ops$(EXE_EXT): ../examples/atomic_ops.c $(LIB_SET_ATOMIC) $(CC) $(CFLAGS) -DBSTACK_FEATURE_SET -DBSTACK_FEATURE_ATOMIC -I. -o $@ $< -L. -lbstack-set-atomic -lpthread +../examples/move_and_cow$(EXE_EXT): ../examples/move_and_cow.c $(LIB_SET_ATOMIC) + $(CC) $(CFLAGS) -DBSTACK_FEATURE_SET -DBSTACK_FEATURE_ATOMIC -I. -o $@ $< -L. -lbstack-set-atomic -lpthread + +../examples/linked_list$(EXE_EXT): ../examples/linked_list.c $(LIB_ATOMIC) + $(CC) $(CFLAGS) -DBSTACK_FEATURE_ATOMIC -I. -o $@ $< -L. -lbstack-atomic -lpthread + +../examples/checksummed_cache$(EXE_EXT): ../examples/checksummed_cache.c $(LIB_SET_ATOMIC) + $(CC) $(CFLAGS) -DBSTACK_FEATURE_SET -DBSTACK_FEATURE_ATOMIC -I. -o $@ $< -L. -lbstack-set-atomic -lpthread + example-basic: ../examples/basic$(EXE_EXT) (cd ../examples && $(RUN)basic$(EXE_EXT)) @@ -262,7 +275,16 @@ example-hashmap: ../examples/hashmap$(EXE_EXT) example-atomic_ops: ../examples/atomic_ops$(EXE_EXT) (cd ../examples && $(RUN)atomic_ops$(EXE_EXT)) -example: $(EXAMPLE_BINS) $(HASHMAP_BIN) $(ATOMIC_BIN) +example-move_and_cow: ../examples/move_and_cow$(EXE_EXT) + (cd ../examples && $(RUN)move_and_cow$(EXE_EXT)) + +example-linked_list: ../examples/linked_list$(EXE_EXT) + (cd ../examples && $(RUN)linked_list$(EXE_EXT)) + +example-checksummed_cache: ../examples/checksummed_cache$(EXE_EXT) + (cd ../examples && $(RUN)checksummed_cache$(EXE_EXT)) + +example: $(EXAMPLE_BINS) $(HASHMAP_BIN) $(ATOMIC_BIN) $(MOVE_COW_BIN) $(LINKED_LIST_BIN) $(CHECKSUMMED_CACHE_BIN) @echo "==> basic" && (cd ../examples && $(RUN)basic$(EXE_EXT)) @echo "==> buffer_reuse" && (cd ../examples && $(RUN)buffer_reuse$(EXE_EXT)) @echo "==> journal" && (cd ../examples && $(RUN)journal$(EXE_EXT)) @@ -270,6 +292,9 @@ example: $(EXAMPLE_BINS) $(HASHMAP_BIN) $(ATOMIC_BIN) @echo "==> vec_store" && (cd ../examples && $(RUN)vec_store$(EXE_EXT)) @echo "==> hashmap" && (cd ../examples && $(RUN)hashmap$(EXE_EXT)) @echo "==> atomic_ops" && (cd ../examples && $(RUN)atomic_ops$(EXE_EXT)) + @echo "==> move_and_cow" && (cd ../examples && $(RUN)move_and_cow$(EXE_EXT)) + @echo "==> linked_list" && (cd ../examples && $(RUN)linked_list$(EXE_EXT)) + @echo "==> checksummed_cache" && (cd ../examples && $(RUN)checksummed_cache$(EXE_EXT)) clean: rm -f $(OBJ) $(TEST_OBJ) $(OBJ_SET) $(LIB) $(LIB_SET) $(TEST_BIN) \ @@ -281,4 +306,4 @@ clean: $(TEST_GT_OBJ) $(TEST_GT_BIN) \ $(TEST_SL_OBJ) $(TEST_SL_BIN) \ $(OBJ_BYTEVEC) $(LIB_BYTEVEC_SET) $(TEST_BV_OBJ) $(TEST_BV_BIN) \ - $(EXAMPLE_BINS) $(HASHMAP_BIN) $(ATOMIC_BIN) + $(EXAMPLE_BINS) $(HASHMAP_BIN) $(ATOMIC_BIN) $(MOVE_COW_BIN) $(LINKED_LIST_BIN) $(CHECKSUMMED_CACHE_BIN) diff --git a/c/bstack.c b/c/bstack.c index c4519cb..d92c3cb 100644 --- a/c/bstack.c +++ b/c/bstack.c @@ -1425,6 +1425,134 @@ int bstack_replace(bstack_t *bs, size_t n, return -1; } +int bstack_try_extend_zeros(bstack_t *bs, uint64_t s, size_t n, int *ok) +{ + BS_WRLOCK(bs); + + uint64_t raw_size; + if (file_size(bs->fd, &raw_size) != 0) + goto fail_unlock; + + uint64_t data_size = raw_size - HEADER_SIZE; + if (data_size != s) { + BS_WRUNLOCK(bs); + if (ok) *ok = 0; + return 0; + } + if (n == 0) { + BS_WRUNLOCK(bs); + if (ok) *ok = 1; + return 0; + } + + uint64_t new_len = data_size + (uint64_t)n; + if (plat_ftruncate(bs->fd, HEADER_SIZE + new_len) != 0 || + write_committed_len(bs->fd, new_len) != 0 || + plat_durable_sync(bs->fd) != 0) + { + /* Best-effort rollback. */ + plat_ftruncate(bs->fd, raw_size); + write_committed_len(bs->fd, data_size); + goto fail_unlock; + } + + BS_WRUNLOCK(bs); + if (ok) *ok = 1; + return 0; + +fail_unlock: + { int sv = errno; BS_WRUNLOCK(bs); errno = sv; } + return -1; +} + +int bstack_get_batched(bstack_t *bs, + const bstack_iovec_t *entries, size_t n_entries) +{ + if (n_entries == 0) + return 0; + + BS_RDLOCK(bs); + + uint64_t raw_size; + if (file_size(bs->fd, &raw_size) != 0) + goto fail_unlock; + + uint64_t data_size = raw_size - HEADER_SIZE; + + for (size_t i = 0; i < n_entries; i++) { + if ((uint64_t)entries[i].len > UINT64_MAX - entries[i].offset) { + BS_RDUNLOCK(bs); + errno = EINVAL; + return -1; + } + uint64_t end = entries[i].offset + (uint64_t)entries[i].len; + if (end > data_size) { + BS_RDUNLOCK(bs); + errno = EINVAL; + return -1; + } + if (entries[i].len > 0) { + if (plat_pread(bs->fd, entries[i].buf, entries[i].len, + HEADER_SIZE + entries[i].offset) != 0) + goto fail_unlock; + } + } + + BS_RDUNLOCK(bs); + return 0; + +fail_unlock: + { int sv = errno; BS_RDUNLOCK(bs); errno = sv; } + return -1; +} + +int bstack_get_batched_gen(bstack_t *bs, + int (*gen)(uint64_t *out_offset, uint8_t **out_buf, + size_t *out_len, void *ctx), + void *ctx) +{ + BS_RDLOCK(bs); + + uint64_t raw_size; + if (file_size(bs->fd, &raw_size) != 0) + goto fail_unlock; + + uint64_t data_size = raw_size - HEADER_SIZE; + + for (;;) { + uint64_t offset = 0; + uint8_t *buf = NULL; + size_t len = 0; + int r = gen(&offset, &buf, &len, ctx); + if (r == 0) + break; + if (r < 0) + goto fail_unlock; + if ((uint64_t)len > UINT64_MAX - offset) { + BS_RDUNLOCK(bs); + errno = EINVAL; + return -1; + } + uint64_t end = offset + (uint64_t)len; + if (end > data_size) { + BS_RDUNLOCK(bs); + errno = EINVAL; + return -1; + } + if (len > 0) { + if (plat_pread(bs->fd, buf, len, HEADER_SIZE + offset) != 0) + goto fail_unlock; + } + } + + BS_RDUNLOCK(bs); + return 0; + +fail_unlock: + { int sv = errno; BS_RDUNLOCK(bs); errno = sv; } + return -1; +} + #endif /* BSTACK_FEATURE_ATOMIC */ /* ------------------------------------------------------------------------- @@ -1616,6 +1744,355 @@ int bstack_process(bstack_t *bs, uint64_t start, uint64_t end, return -1; } +int bstack_cross_exchange(bstack_t *bs, uint64_t a, uint64_t b, uint64_t n) +{ + if (n > UINT64_MAX - a) { errno = EINVAL; return -1; } + if (n > UINT64_MAX - b) { errno = EINVAL; return -1; } + uint64_t a_end = a + n; + uint64_t b_end = b + n; + + /* Overlap check (only meaningful when n > 0). */ + if (n > 0) { + uint64_t lo = a < b ? a : b; + uint64_t hi = a < b ? b : a; + if (lo + n > hi) { + errno = EINVAL; + return -1; + } + } + + BS_WRLOCK(bs); + + uint64_t locked = ATOMIC_LOAD_ACQUIRE(&bs->locked); + if (a < locked) { BS_WRUNLOCK(bs); errno = EINVAL; return -1; } + if (b < locked) { BS_WRUNLOCK(bs); errno = EINVAL; return -1; } + + uint64_t raw_size; + if (file_size(bs->fd, &raw_size) != 0) + goto fail_unlock; + + uint64_t data_size = raw_size - HEADER_SIZE; + if (a_end > data_size) { BS_WRUNLOCK(bs); errno = EINVAL; return -1; } + if (b_end > data_size) { BS_WRUNLOCK(bs); errno = EINVAL; return -1; } + + if (n == 0) { + BS_WRUNLOCK(bs); + return 0; + } + +#if UINT64_MAX > SIZE_MAX + if (n > (uint64_t)SIZE_MAX) { BS_WRUNLOCK(bs); errno = EINVAL; return -1; } +#endif + uint8_t *buf_a = (uint8_t *)malloc((size_t)n); + if (!buf_a) goto fail_unlock; + uint8_t *buf_b = (uint8_t *)malloc((size_t)n); + if (!buf_b) { free(buf_a); goto fail_unlock; } + + if (plat_pread(bs->fd, buf_a, (size_t)n, HEADER_SIZE + a) != 0 || + plat_pread(bs->fd, buf_b, (size_t)n, HEADER_SIZE + b) != 0 || + plat_pwrite(bs->fd, buf_b, (size_t)n, HEADER_SIZE + a) != 0 || + plat_pwrite(bs->fd, buf_a, (size_t)n, HEADER_SIZE + b) != 0 || + plat_durable_sync(bs->fd) != 0) + { + free(buf_a); + free(buf_b); + goto fail_unlock; + } + free(buf_a); + free(buf_b); + + BS_WRUNLOCK(bs); + return 0; + +fail_unlock: + { int sv = errno; BS_WRUNLOCK(bs); errno = sv; } + return -1; +} + +int bstack_copy(bstack_t *bs, uint64_t from, uint64_t to, uint64_t n) +{ + if (n > UINT64_MAX - from) { errno = EINVAL; return -1; } + if (n > UINT64_MAX - to) { errno = EINVAL; return -1; } + uint64_t from_end = from + n; + uint64_t to_end = to + n; + + BS_WRLOCK(bs); + + uint64_t locked = ATOMIC_LOAD_ACQUIRE(&bs->locked); + if (to < locked) { BS_WRUNLOCK(bs); errno = EINVAL; return -1; } + + uint64_t raw_size; + if (file_size(bs->fd, &raw_size) != 0) + goto fail_unlock; + + uint64_t data_size = raw_size - HEADER_SIZE; + if (from_end > data_size) { BS_WRUNLOCK(bs); errno = EINVAL; return -1; } + if (to_end > data_size) { BS_WRUNLOCK(bs); errno = EINVAL; return -1; } + + if (n == 0) { + BS_WRUNLOCK(bs); + return 0; + } + +#if UINT64_MAX > SIZE_MAX + if (n > (uint64_t)SIZE_MAX) { BS_WRUNLOCK(bs); errno = EINVAL; return -1; } +#endif + uint8_t *buf = (uint8_t *)malloc((size_t)n); + if (!buf) goto fail_unlock; + + if (plat_pread(bs->fd, buf, (size_t)n, HEADER_SIZE + from) != 0 || + plat_pwrite(bs->fd, buf, (size_t)n, HEADER_SIZE + to) != 0 || + plat_durable_sync(bs->fd) != 0) + { + free(buf); + goto fail_unlock; + } + free(buf); + + BS_WRUNLOCK(bs); + return 0; + +fail_unlock: + { int sv = errno; BS_WRUNLOCK(bs); errno = sv; } + return -1; +} + +/* Shared helper: compare a_len bytes at a_offset against a_expected using a + * 256-byte stack buffer. Returns 1 if all bytes match, 0 if any differ, -1 + * on I/O error (errno set). Caller holds the write lock. */ +static int crds_compare_a(bstack_fd_t fd, + uint64_t a_offset, const uint8_t *a_expected, + size_t a_len) +{ + if (a_len == 0) + return 1; + uint8_t chunk[256]; + size_t remaining = a_len; + uint64_t file_off = HEADER_SIZE + a_offset; + const uint8_t *exp = a_expected; + while (remaining > 0) { + size_t batch = remaining < sizeof chunk ? remaining : sizeof chunk; + if (plat_pread(fd, chunk, batch, file_off) != 0) + return -1; + if (memcmp(chunk, exp, batch) != 0) + return 0; + exp += batch; + file_off += batch; + remaining -= batch; + } + return 1; +} + +/* Shared helper: like crds_compare_a but applies a bitwise mask to both sides + * before comparing: (A[i] & mask[i]) == (expected[i] & mask[i]). + * Returns 1 if all masked bytes match, 0 if any differ, -1 on I/O error. */ +static int crds_compare_a_masked(bstack_fd_t fd, + uint64_t a_offset, const uint8_t *mask, + const uint8_t *a_expected, size_t a_len) +{ + if (a_len == 0) + return 1; + uint8_t chunk[256]; + size_t remaining = a_len; + uint64_t file_off = HEADER_SIZE + a_offset; + const uint8_t *exp = a_expected; + const uint8_t *msk = mask; + while (remaining > 0) { + size_t batch = remaining < sizeof chunk ? remaining : sizeof chunk; + if (plat_pread(fd, chunk, batch, file_off) != 0) + return -1; + for (size_t j = 0; j < batch; j++) { + if ((chunk[j] & msk[j]) != (exp[j] & msk[j])) + return 0; + } + exp += batch; + msk += batch; + file_off += batch; + remaining -= batch; + } + return 1; +} + +/* Shared body for CRDS functions after the comparison result is known. + * When matched == 1, reads b_len bytes at b_offset into b_old_buf, writes + * b_new_buf, and syncs. Caller holds the write lock. + * Returns 0 on success, -1 on I/O error. */ +static int crds_do_swap(bstack_fd_t fd, + uint64_t b_offset, uint8_t *b_old_buf, + const uint8_t *b_new_buf, size_t b_len) +{ + if (b_len == 0) + return 0; + if (plat_pread(fd, b_old_buf, b_len, HEADER_SIZE + b_offset) != 0) + return -1; + if (plat_pwrite(fd, b_new_buf, b_len, HEADER_SIZE + b_offset) != 0) + return -1; + return plat_durable_sync(fd); +} + +/* Common setup for all CRDS operations: validates bounds and locked region, + * computes a_end / b_end, and acquires the write lock. + * Returns -1 (errno set) on validation failure; lock is NOT held on -1. + * On success the write lock is held and data_size is filled. */ +static int crds_setup(bstack_t *bs, + uint64_t a_offset, size_t a_len, + uint64_t b_offset, size_t b_len, + uint64_t *out_a_end, uint64_t *out_b_end, + uint64_t *out_data_size) +{ + if ((uint64_t)a_len > UINT64_MAX - a_offset || + (uint64_t)b_len > UINT64_MAX - b_offset) { + errno = EINVAL; + return -1; + } + *out_a_end = a_offset + (uint64_t)a_len; + *out_b_end = b_offset + (uint64_t)b_len; + + BS_WRLOCK(bs); + + uint64_t locked = ATOMIC_LOAD_ACQUIRE(&bs->locked); + if (b_len > 0 && b_offset < locked) { + BS_WRUNLOCK(bs); + errno = EINVAL; + return -1; + } + + uint64_t raw_size; + if (file_size(bs->fd, &raw_size) != 0) { + int sv = errno; + BS_WRUNLOCK(bs); + errno = sv; + return -1; + } + *out_data_size = raw_size - HEADER_SIZE; + + if (a_len > 0 && *out_a_end > *out_data_size) { + BS_WRUNLOCK(bs); + errno = EINVAL; + return -1; + } + if (b_len > 0 && *out_b_end > *out_data_size) { + BS_WRUNLOCK(bs); + errno = EINVAL; + return -1; + } + return 0; +} + +int bstack_eq_crds(bstack_t *bs, + uint64_t a_offset, const uint8_t *a_expected, size_t a_len, + uint64_t b_offset, uint8_t *b_old_buf, + const uint8_t *b_new_buf, size_t b_len, + int *ok) +{ + uint64_t a_end, b_end, data_size; + if (crds_setup(bs, a_offset, a_len, b_offset, b_len, + &a_end, &b_end, &data_size) != 0) + return -1; + (void)a_end; (void)b_end; (void)data_size; + + int cmp = crds_compare_a(bs->fd, a_offset, a_expected, a_len); + if (cmp < 0) goto fail_unlock; + if (cmp == 0) { BS_WRUNLOCK(bs); if (ok) *ok = 0; return 0; } + + if (crds_do_swap(bs->fd, b_offset, b_old_buf, b_new_buf, b_len) != 0) + goto fail_unlock; + + BS_WRUNLOCK(bs); + if (ok) *ok = 1; + return 0; + +fail_unlock: + { int sv = errno; BS_WRUNLOCK(bs); errno = sv; } + return -1; +} + +int bstack_ne_crds(bstack_t *bs, + uint64_t a_offset, const uint8_t *a_expected, size_t a_len, + uint64_t b_offset, uint8_t *b_old_buf, + const uint8_t *b_new_buf, size_t b_len, + int *ok) +{ + uint64_t a_end, b_end, data_size; + if (crds_setup(bs, a_offset, a_len, b_offset, b_len, + &a_end, &b_end, &data_size) != 0) + return -1; + (void)a_end; (void)b_end; (void)data_size; + + int cmp = crds_compare_a(bs->fd, a_offset, a_expected, a_len); + if (cmp < 0) goto fail_unlock; + if (cmp == 1) { BS_WRUNLOCK(bs); if (ok) *ok = 0; return 0; } + + if (crds_do_swap(bs->fd, b_offset, b_old_buf, b_new_buf, b_len) != 0) + goto fail_unlock; + + BS_WRUNLOCK(bs); + if (ok) *ok = 1; + return 0; + +fail_unlock: + { int sv = errno; BS_WRUNLOCK(bs); errno = sv; } + return -1; +} + +int bstack_masked_eq_crds(bstack_t *bs, + uint64_t a_offset, const uint8_t *mask, + const uint8_t *a_expected, size_t a_len, + uint64_t b_offset, uint8_t *b_old_buf, + const uint8_t *b_new_buf, size_t b_len, + int *ok) +{ + uint64_t a_end, b_end, data_size; + if (crds_setup(bs, a_offset, a_len, b_offset, b_len, + &a_end, &b_end, &data_size) != 0) + return -1; + (void)a_end; (void)b_end; (void)data_size; + + int cmp = crds_compare_a_masked(bs->fd, a_offset, mask, a_expected, a_len); + if (cmp < 0) goto fail_unlock; + if (cmp == 0) { BS_WRUNLOCK(bs); if (ok) *ok = 0; return 0; } + + if (crds_do_swap(bs->fd, b_offset, b_old_buf, b_new_buf, b_len) != 0) + goto fail_unlock; + + BS_WRUNLOCK(bs); + if (ok) *ok = 1; + return 0; + +fail_unlock: + { int sv = errno; BS_WRUNLOCK(bs); errno = sv; } + return -1; +} + +int bstack_masked_ne_crds(bstack_t *bs, + uint64_t a_offset, const uint8_t *mask, + const uint8_t *a_expected, size_t a_len, + uint64_t b_offset, uint8_t *b_old_buf, + const uint8_t *b_new_buf, size_t b_len, + int *ok) +{ + uint64_t a_end, b_end, data_size; + if (crds_setup(bs, a_offset, a_len, b_offset, b_len, + &a_end, &b_end, &data_size) != 0) + return -1; + (void)a_end; (void)b_end; (void)data_size; + + int cmp = crds_compare_a_masked(bs->fd, a_offset, mask, a_expected, a_len); + if (cmp < 0) goto fail_unlock; + if (cmp == 1) { BS_WRUNLOCK(bs); if (ok) *ok = 0; return 0; } + + if (crds_do_swap(bs->fd, b_offset, b_old_buf, b_new_buf, b_len) != 0) + goto fail_unlock; + + BS_WRUNLOCK(bs); + if (ok) *ok = 1; + return 0; + +fail_unlock: + { int sv = errno; BS_WRUNLOCK(bs); errno = sv; } + return -1; +} + #endif /* BSTACK_FEATURE_ATOMIC && BSTACK_FEATURE_SET */ #ifdef __cplusplus diff --git a/c/bstack.h b/c/bstack.h index 28acf7e..90074a6 100644 --- a/c/bstack.h +++ b/c/bstack.h @@ -26,11 +26,15 @@ * On Unix a pthread_rwlock protects each handle; on Windows an SRWLOCK is * used. bstack_push / bstack_extend / bstack_pop / bstack_discard / * bstack_set / bstack_zero / bstack_atrunc / bstack_splice / - * bstack_try_extend / bstack_try_discard(s, n>0) / bstack_swap / bstack_cas / - * bstack_replace / bstack_process + * bstack_try_extend / bstack_try_extend_zeros / bstack_try_discard(s, n>0) / + * bstack_swap / bstack_cas / bstack_replace / bstack_process / + * bstack_cross_exchange / bstack_copy / + * bstack_eq_crds / bstack_ne_crds / + * bstack_masked_eq_crds / bstack_masked_ne_crds * hold a write lock. bstack_try_discard(s, 0) holds a read lock. - * bstack_peek / bstack_get / bstack_len hold a read lock and may run - * concurrently with each other on both platforms. + * bstack_peek / bstack_get / bstack_get_batched / bstack_get_batched_gen / + * bstack_len hold a read lock and may run concurrently with each other on + * both platforms. * * Multi-process safety * -------------------- @@ -43,8 +47,11 @@ * ------------- * Compile with -DBSTACK_FEATURE_SET to enable bstack_set and bstack_zero. * Compile with -DBSTACK_FEATURE_ATOMIC to enable bstack_atrunc, bstack_splice, - * bstack_try_extend, bstack_try_discard, and bstack_replace. Both flags - * together also enable bstack_swap, bstack_cas, and bstack_process. + * bstack_try_extend, bstack_try_extend_zeros, bstack_try_discard, + * bstack_replace, bstack_get_batched, and bstack_get_batched_gen. Both + * flags together also enable bstack_swap, bstack_cas, bstack_process, + * bstack_cross_exchange, bstack_copy, bstack_eq_crds, bstack_ne_crds, + * bstack_masked_eq_crds, and bstack_masked_ne_crds. */ typedef struct bstack bstack_t; @@ -203,6 +210,16 @@ int bstack_set(bstack_t *bs, uint64_t offset, int bstack_zero(bstack_t *bs, uint64_t offset, size_t n); #endif /* BSTACK_FEATURE_SET */ +/* + * Descriptor for one entry in a batched read: logical byte offset, destination + * buffer pointer, and number of bytes to read. + */ +typedef struct { + uint64_t offset; + uint8_t *buf; + size_t len; +} bstack_iovec_t; + #ifdef BSTACK_FEATURE_ATOMIC /* * Atomically cut n bytes off the tail then append buf_len bytes from buf. @@ -288,6 +305,62 @@ int bstack_replace(bstack_t *bs, size_t n, uint8_t **new_buf, size_t *new_len, void *ctx), void *ctx); + +/* + * Append n zero bytes only if the current logical payload size equals s. + * + * *ok (if non-NULL) is set to 1 when the condition matched and n zero bytes + * were appended, or 0 when the size did not match (no-op). + * n = 0 with the size condition matching sets *ok = 1 without I/O. + * Returns 0 on success (condition-matched or not), -1 on I/O error. + * + * Only available when compiled with -DBSTACK_FEATURE_ATOMIC. + */ +int bstack_try_extend_zeros(bstack_t *bs, uint64_t s, size_t n, int *ok); + +/* + * Read multiple logical ranges into caller-provided buffers in a single + * lock acquisition. + * + * entries is an array of n_entries bstack_iovec_t descriptors. For each + * entry, entries[i].len bytes are read from logical offset entries[i].offset + * into entries[i].buf. n_entries == 0 is a valid no-op. + * + * All reads happen under the same shared read lock, so no write can + * interleave between them. + * + * Returns EINVAL if any entry has offset + len overflowing uint64_t or + * exceeding the current payload size. + * + * Only available when compiled with -DBSTACK_FEATURE_ATOMIC. + */ +int bstack_get_batched(bstack_t *bs, + const bstack_iovec_t *entries, size_t n_entries); + +/* + * Read a dependent chain of logical ranges in a single lock acquisition. + * + * gen is called repeatedly. Each call should populate *out_offset, + * *out_buf, and *out_len to request the next read, then return 1. When + * gen is called, the buffer supplied by the previous call (if any) has + * already been filled with its data. To stop the chain, gen returns 0. + * gen may return -1 on error (errno must be set); the operation aborts. + * + * The generator callback signature: + * int gen(uint64_t *out_offset, uint8_t **out_buf, size_t *out_len, + * void *ctx) + * + * All reads happen under the same shared read lock. + * + * Returns EINVAL if any yielded offset + len overflows uint64_t or exceeds + * the current payload size. Returns -1 (errno set) if gen returns -1. + * + * Only available when compiled with -DBSTACK_FEATURE_ATOMIC. + */ +int bstack_get_batched_gen(bstack_t *bs, + int (*gen)(uint64_t *out_offset, uint8_t **out_buf, + size_t *out_len, void *ctx), + void *ctx); #endif /* BSTACK_FEATURE_ATOMIC */ #if defined(BSTACK_FEATURE_ATOMIC) && defined(BSTACK_FEATURE_SET) @@ -343,6 +416,112 @@ int bstack_cas(bstack_t *bs, uint64_t offset, int bstack_process(bstack_t *bs, uint64_t start, uint64_t end, int (*cb)(uint8_t *buf, size_t len, void *ctx), void *ctx); + +/* + * Atomically swap two equal-size, non-overlapping regions within the file. + * + * Bytes at [a, a+n) and [b, b+n) are exchanged under a single write lock. + * The file size is never changed. n = 0 is a valid no-op (bounds are still + * checked). + * + * Returns EINVAL if either a+n or b+n overflows uint64_t, if the regions + * overlap, if either region exceeds the payload size, or if either region + * start lies within the locked prefix. + * + * Only available when compiled with both -DBSTACK_FEATURE_SET and + * -DBSTACK_FEATURE_ATOMIC. + */ +int bstack_cross_exchange(bstack_t *bs, uint64_t a, uint64_t b, uint64_t n); + +/* + * Copy n bytes from [from, from+n) to [to, to+n) under a single write lock. + * + * Overlapping regions are handled correctly (source is read into a temporary + * buffer before writing). The file size is never changed. n = 0 is a valid + * no-op (bounds are still checked). + * + * Returns EINVAL if either from+n or to+n overflows uint64_t, if either + * region exceeds the payload size, or if to lies within the locked prefix. + * + * Only available when compiled with both -DBSTACK_FEATURE_SET and + * -DBSTACK_FEATURE_ATOMIC. + */ +int bstack_copy(bstack_t *bs, uint64_t from, uint64_t to, uint64_t n); + +/* + * Cross-Region Dependent Swap — equal condition. + * + * Reads a_len bytes from a_offset and compares them to a_expected. If they + * are equal, reads b_len bytes from b_offset into b_old_buf and writes + * b_new_buf there. *ok (if non-NULL) is set to 1 when the swap was + * performed or 0 when the comparison failed. + * + * All operations happen under one write lock. a_len == 0 trivially matches. + * b_len == 0 skips the B swap when the condition passes (*ok = 1, no I/O). + * + * Returns EINVAL if either range overflows uint64_t, exceeds the payload + * size, or if b_offset lies within the locked prefix (when b_len > 0). + * + * Only available when compiled with both -DBSTACK_FEATURE_SET and + * -DBSTACK_FEATURE_ATOMIC. + */ +int bstack_eq_crds(bstack_t *bs, + uint64_t a_offset, const uint8_t *a_expected, size_t a_len, + uint64_t b_offset, uint8_t *b_old_buf, + const uint8_t *b_new_buf, size_t b_len, + int *ok); + +/* + * Cross-Region Dependent Swap — not-equal condition. + * + * Like bstack_eq_crds but performs the B swap only when the a_len bytes at + * a_offset are NOT equal to a_expected. *ok = 0 if the bytes compare equal + * (swap suppressed). + * + * Only available when compiled with both -DBSTACK_FEATURE_SET and + * -DBSTACK_FEATURE_ATOMIC. + */ +int bstack_ne_crds(bstack_t *bs, + uint64_t a_offset, const uint8_t *a_expected, size_t a_len, + uint64_t b_offset, uint8_t *b_old_buf, + const uint8_t *b_new_buf, size_t b_len, + int *ok); + +/* + * Cross-Region Dependent Swap — masked-equal condition. + * + * Like bstack_eq_crds but compares (A[i] & mask[i]) == (a_expected[i] & + * mask[i]) for every byte i. mask and a_expected must both have length + * a_len. + * + * Returns EINVAL if mask is NULL when a_len > 0. + * + * Only available when compiled with both -DBSTACK_FEATURE_SET and + * -DBSTACK_FEATURE_ATOMIC. + */ +int bstack_masked_eq_crds(bstack_t *bs, + uint64_t a_offset, const uint8_t *mask, + const uint8_t *a_expected, size_t a_len, + uint64_t b_offset, uint8_t *b_old_buf, + const uint8_t *b_new_buf, size_t b_len, + int *ok); + +/* + * Cross-Region Dependent Swap — masked-not-equal condition. + * + * Like bstack_masked_eq_crds but performs the B swap only when at least one + * masked byte differs: (A[i] & mask[i]) != (a_expected[i] & mask[i]). + * *ok = 0 if all masked bytes compare equal (swap suppressed). + * + * Only available when compiled with both -DBSTACK_FEATURE_SET and + * -DBSTACK_FEATURE_ATOMIC. + */ +int bstack_masked_ne_crds(bstack_t *bs, + uint64_t a_offset, const uint8_t *mask, + const uint8_t *a_expected, size_t a_len, + uint64_t b_offset, uint8_t *b_old_buf, + const uint8_t *b_new_buf, size_t b_len, + int *ok); #endif /* BSTACK_FEATURE_ATOMIC && BSTACK_FEATURE_SET */ #ifdef __cplusplus diff --git a/examples/checksummed_cache.c b/examples/checksummed_cache.c new file mode 100644 index 0000000..c4621db --- /dev/null +++ b/examples/checksummed_cache.c @@ -0,0 +1,255 @@ +/* + * Checksummed block with per-thread in-memory cache protected by CRDS. + * + * The bstack_t pointer is shared across threads. The per-thread cache + * (cached_checksum, cached_payload) lives in each thread's own + * checksummed_block_t, constructed inside the thread function. + * + * A pthread_barrier forces thread 2 to attempt its write only after thread 1 + * has already committed, guaranteeing that thread 2's empty cache is stale. + * + * Requires: -DBSTACK_FEATURE_SET -DBSTACK_FEATURE_ATOMIC + * + * Build and run: + * make -C ../c example-checksummed_cache + */ + +#if !defined(BSTACK_FEATURE_SET) || !defined(BSTACK_FEATURE_ATOMIC) +# error "checksummed_cache.c requires -DBSTACK_FEATURE_SET -DBSTACK_FEATURE_ATOMIC" +#endif + +#include "../c/bstack.h" + +#include +#include +#include +#include + +#define BLOCK_SIZE 64 +#define PAYLOAD_OFFSET 8 +#define PAYLOAD_SIZE 56 + +/* ── Little-endian helper ────────────────────────────────────────────────── */ + +static void write_le64(uint8_t *p, uint64_t v) +{ + p[0] = (uint8_t)(v); p[1] = (uint8_t)(v >> 8); + p[2] = (uint8_t)(v >> 16); p[3] = (uint8_t)(v >> 24); + p[4] = (uint8_t)(v >> 32); p[5] = (uint8_t)(v >> 40); + p[6] = (uint8_t)(v >> 48); p[7] = (uint8_t)(v >> 56); +} + +/* ── ChecksummedBlock ────────────────────────────────────────────────────── */ + +typedef struct { + bstack_t *stack; /* shared across threads */ + int has_checksum; + uint8_t cached_block[BLOCK_SIZE]; /* [0..8) checksum | [8..64) payload */ +} checksummed_block_t; + +static uint64_t compute_checksum(const uint8_t *payload, size_t len) +{ + uint64_t cs = 0; + for (size_t i = 0; i < len; i++) + cs ^= (uint64_t)payload[i]; + return cs; +} + +/* + * Write new_payload atomically via eq_crds. + * + * *committed is set to 1 if the write was committed, or 0 if the cached + * checksum was stale. On a stale miss the cache is refreshed via bstack_get + * so the caller can retry immediately. + */ +static int cb_write(checksummed_block_t *cb, + const uint8_t *new_payload, size_t len, + int *committed) +{ + uint64_t new_checksum = compute_checksum(new_payload, len); + + uint8_t block[BLOCK_SIZE] = {0}; + write_le64(block, new_checksum); + memcpy(block + PAYLOAD_OFFSET, new_payload, len); + + /* Use cached_block[0..8] as expected checksum (zeroes if no cache yet). + * cached_block also serves as b_old_buf: on match, eq_crds fills it with + * the old on-disk block before writing the new one. */ + static const uint8_t zeroes[8] = {0}; + const uint8_t *expected = cb->has_checksum ? cb->cached_block : zeroes; + + int ok = 0; + if (bstack_eq_crds(cb->stack, + 0, expected, 8, + 0, cb->cached_block, block, BLOCK_SIZE, + &ok) < 0) + return -1; + + if (ok) { + /* Committed — overwrite cached_block with the new state. */ + memcpy(cb->cached_block, block, BLOCK_SIZE); + cb->has_checksum = 1; + *committed = 1; + return 0; + } + + /* Stale — read current disk state directly into cached_block; no copy. */ + if (bstack_get(cb->stack, 0, BLOCK_SIZE, cb->cached_block) < 0) + return -1; + cb->has_checksum = 1; + *committed = 0; + return 0; +} + +/* ── Read generator ──────────────────────────────────────────────────────── */ + +typedef struct { + checksummed_block_t *cb; + int need_checksum; + uint8_t read_checksum[8]; +} read_ctx_t; + +static int read_gen(uint64_t *out_offset, uint8_t **out_buf, size_t *out_len, + void *userctx) +{ + read_ctx_t *ctx = userctx; + checksummed_block_t *cb = ctx->cb; + + if (ctx->need_checksum) { + ctx->need_checksum = 0; + *out_offset = 0; + *out_buf = ctx->read_checksum; + *out_len = 8; + return 1; + } + + /* Cache hit — checksum unchanged, skip payload fetch. */ + if (cb->has_checksum && + memcmp(ctx->read_checksum, cb->cached_block, 8) == 0) + return 0; + + /* Cache miss — update checksum bytes and fetch payload directly into + * cached_block; no copy needed for the payload. */ + memcpy(cb->cached_block, ctx->read_checksum, 8); + cb->has_checksum = 1; + *out_offset = PAYLOAD_OFFSET; + *out_buf = cb->cached_block + PAYLOAD_OFFSET; + *out_len = PAYLOAD_SIZE; + return 1; +} + +static int cb_read(checksummed_block_t *cb, const uint8_t **out_payload) +{ + read_ctx_t ctx = { .cb = cb, .need_checksum = 1, .read_checksum = {0} }; + if (bstack_get_batched_gen(cb->stack, read_gen, &ctx) < 0) + return -1; + *out_payload = cb->cached_block + PAYLOAD_OFFSET; + return 0; +} + +/* ── Helpers ─────────────────────────────────────────────────────────────── */ + +static void print_trimmed(const uint8_t *payload, size_t len) +{ + size_t trimmed = 0; + for (size_t i = 0; i < len; i++) + if (payload[i] != 0) trimmed = i + 1; + printf("\"%.*s\"", (int)trimmed, (const char *)payload); +} + +/* ── Thread functions ────────────────────────────────────────────────────── */ + +typedef struct { + bstack_t *stack; + pthread_mutex_t mtx; + pthread_cond_t cond; + int t1_written; /* set to 1 after thread 1 commits its write */ +} thread_arg_t; + +static void *thread1(void *arg) +{ + thread_arg_t *a = arg; + checksummed_block_t cb = { .stack = a->stack, .has_checksum = 0, .cached_block = {0} }; + + int committed = 0; + if (cb_write(&cb, (const uint8_t *)"Written by thread 1.", 20, &committed) < 0) { + perror("[t1] write"); return NULL; + } + printf("[t1] write → committed=%d\n", committed); + + const uint8_t *payload = NULL; + if (cb_read(&cb, &payload) < 0) { perror("[t1] read"); return NULL; } + printf("[t1] read → hit "); print_trimmed(payload, PAYLOAD_SIZE); putchar('\n'); + + /* Signal thread 2 that the write has landed. */ + pthread_mutex_lock(&a->mtx); + a->t1_written = 1; + pthread_cond_signal(&a->cond); + pthread_mutex_unlock(&a->mtx); + return NULL; +} + +static void *thread2(void *arg) +{ + thread_arg_t *a = arg; + checksummed_block_t cb = { .stack = a->stack, .has_checksum = 0, .cached_block = {0} }; + + /* Wait until thread 1 has committed its write; our cache (None) is stale. */ + pthread_mutex_lock(&a->mtx); + while (!a->t1_written) + pthread_cond_wait(&a->cond, &a->mtx); + pthread_mutex_unlock(&a->mtx); + + int committed = 0; + if (cb_write(&cb, (const uint8_t *)"Written by thread 2.", 20, &committed) < 0) { + perror("[t2] write (stale)"); return NULL; + } + printf("[t2] write (stale) → committed=%d, cache refreshed\n", committed); + + /* Retry — cache is now current. */ + if (cb_write(&cb, (const uint8_t *)"Written by thread 2.", 20, &committed) < 0) { + perror("[t2] write (retry)"); return NULL; + } + printf("[t2] write (retry) → committed=%d\n", committed); + + const uint8_t *payload = NULL; + if (cb_read(&cb, &payload) < 0) { perror("[t2] read"); return NULL; } + printf("[t2] read → hit "); print_trimmed(payload, PAYLOAD_SIZE); putchar('\n'); + + return NULL; +} + +/* ── Main ────────────────────────────────────────────────────────────────── */ + +int main(void) +{ + const char *path = "checksummed_cache_example.bstack"; + remove(path); + + bstack_t *stack = bstack_open(path); + if (!stack) { perror("bstack_open"); return 1; } + + /* Initialise with a zeroed block. */ + uint8_t zeros[BLOCK_SIZE] = {0}; + if (bstack_push(stack, zeros, BLOCK_SIZE, NULL) < 0) { + perror("bstack_push"); bstack_close(stack); return 1; + } + + thread_arg_t arg = { + .stack = stack, + .mtx = PTHREAD_MUTEX_INITIALIZER, + .cond = PTHREAD_COND_INITIALIZER, + .t1_written = 0, + }; + + pthread_t t1, t2; + pthread_create(&t1, NULL, thread1, &arg); + pthread_create(&t2, NULL, thread2, &arg); + pthread_join(t1, NULL); + pthread_join(t2, NULL); + + pthread_mutex_destroy(&arg.mtx); + pthread_cond_destroy(&arg.cond); + bstack_close(stack); + return 0; +} diff --git a/examples/checksummed_cache.rs b/examples/checksummed_cache.rs new file mode 100644 index 0000000..bb91946 --- /dev/null +++ b/examples/checksummed_cache.rs @@ -0,0 +1,216 @@ +//! Checksummed block with in-memory cache protected by CRDS operations. +//! +//! ## Layout +//! +//! A single 64-byte block stored at offset 0: +//! - Bytes [0..8): 8-byte XOR checksum (LE u64) +//! - Bytes [8..64): 56-byte payload +//! +//! ## Write path +//! +//! Use `eq_crds` to compare the cached checksum against the on-disk checksum +//! and, if they match, atomically replace the entire block. On a mismatch the +//! stale cache is refreshed via `get_into` and the call returns `false` so the +//! caller can retry. +//! +//! ## Read path +//! +//! Use `get_batched_gen` to read the checksum and, only on a mismatch with the +//! cached value, fetch the new payload — both under a single read lock. +//! +//! ## Thread model +//! +//! The underlying `BStack` is wrapped in an `Arc` and shared across threads. +//! The per-thread cache (`cached_checksum`, `cached_payload`) lives in each +//! thread's own `ChecksummedBlock` instance, constructed inside the thread. +//! +//! ## How to run +//! +//! ```text +//! cargo run --example checksummed_cache --features "set,atomic" +//! ``` + +#[cfg(all(feature = "set", feature = "atomic"))] +use bstack::BStack; +#[cfg(all(feature = "set", feature = "atomic"))] +use std::io; +#[cfg(all(feature = "set", feature = "atomic"))] +use std::sync::{Arc, Barrier}; +#[cfg(all(feature = "set", feature = "atomic"))] +use std::thread; + +#[cfg(all(feature = "set", feature = "atomic"))] +const BLOCK_SIZE: usize = 64; +#[cfg(all(feature = "set", feature = "atomic"))] +const PAYLOAD_OFFSET: u64 = 8; +#[cfg(all(feature = "set", feature = "atomic"))] +const PAYLOAD_SIZE: usize = 56; + +#[cfg(all(feature = "set", feature = "atomic"))] +struct ChecksummedBlock { + stack: Arc, + /// Per-thread cache — each thread owns its own ChecksummedBlock. + cached_checksum: Option<[u8; 8]>, + cached_payload: [u8; PAYLOAD_SIZE], +} + +#[cfg(all(feature = "set", feature = "atomic"))] +impl ChecksummedBlock { + /// Initialise the backing file and return a shared stack handle. + fn open(path: &str) -> io::Result> { + let _ = std::fs::remove_file(path); + let stack = BStack::open(path)?; + stack.push(&[0u8; BLOCK_SIZE])?; + Ok(Arc::new(stack)) + } + + /// Create a per-thread view over a shared stack. + fn new(stack: Arc) -> Self { + Self { + stack, + cached_checksum: None, + cached_payload: [0u8; PAYLOAD_SIZE], + } + } + + /// Compute XOR checksum of the payload. + fn compute_checksum(payload: &[u8]) -> u64 { + payload.iter().fold(0u64, |acc, &byte| acc ^ (byte as u64)) + } + + /// Write `new_payload` atomically. + /// + /// Returns `true` if the write was committed, or `false` if the cached + /// checksum was stale. On a stale miss the cache is refreshed from the + /// file via `get_into` so the caller can retry immediately. + fn write(&mut self, new_payload: &[u8]) -> io::Result { + assert!(new_payload.len() <= PAYLOAD_SIZE); + + let new_checksum = Self::compute_checksum(new_payload); + let mut block = [0u8; BLOCK_SIZE]; + block[0..8].copy_from_slice(&new_checksum.to_le_bytes()); + block[PAYLOAD_OFFSET as usize..PAYLOAD_OFFSET as usize + new_payload.len()] + .copy_from_slice(new_payload); + + // Verify cached checksum (or expect zeroed checksum for first write). + let expected_checksum = self.cached_checksum.unwrap_or([0u8; 8]); + + if self + .stack + .eq_crds(0, &expected_checksum, 0, block)? + .is_some() + { + // Committed — update the cache to reflect the new file state. + self.cached_checksum = Some(new_checksum.to_le_bytes()); + self.cached_payload[..new_payload.len()].copy_from_slice(new_payload); + self.cached_payload[new_payload.len()..].fill(0); + return Ok(true); + } + + // Cache is stale — refresh it so the caller can retry. + let mut disk = [0u8; BLOCK_SIZE]; + self.stack.get_into(0, &mut disk)?; + self.cached_checksum = Some(disk[0..8].try_into().unwrap()); + self.cached_payload + .copy_from_slice(&disk[PAYLOAD_OFFSET as usize..]); + Ok(false) + } + + fn read(&mut self) -> io::Result<&[u8]> { + let mut need_checksum_read = true; + let mut read_checksum = [0u8; 8]; + self.stack.get_batched_gen(|| { + if need_checksum_read { + need_checksum_read = false; + Some(( + 0, + // SAFETY: `read_checksum` lives for the duration of get_batched_gen + unsafe { std::slice::from_raw_parts_mut(read_checksum.as_mut_ptr(), 8) }, + )) + } else { + if let Some(old_checksum) = self.cached_checksum + && read_checksum == old_checksum + { + return None; + } + self.cached_checksum = Some(read_checksum); + Some(( + PAYLOAD_OFFSET, + // SAFETY: `cached_payload` lives for the duration of get_batched_gen + unsafe { + std::slice::from_raw_parts_mut( + self.cached_payload.as_mut_ptr(), + PAYLOAD_SIZE, + ) + }, + )) + } + })?; + Ok(&self.cached_payload) + } +} + +/// Return the payload bytes up to the last non-zero byte. +#[cfg(all(feature = "set", feature = "atomic"))] +fn trim(payload: &[u8]) -> &str { + let len = payload.iter().rposition(|&b| b != 0).map_or(0, |i| i + 1); + std::str::from_utf8(&payload[..len]).unwrap_or("") +} + +#[cfg(all(feature = "set", feature = "atomic"))] +fn main() -> io::Result<()> { + let stack = ChecksummedBlock::open("checksummed_cache_example.bstack")?; + + // A barrier lets thread 1 commit its write before thread 2 attempts its own, + // guaranteeing that thread 2's cache is stale when it first tries to write. + let barrier = Arc::new(Barrier::new(2)); + + // ── Thread 1: write first, then release thread 2 ────────────────────────── + let stack1 = Arc::clone(&stack); + let barrier1 = Arc::clone(&barrier); + let t1 = thread::spawn(move || -> io::Result<()> { + let mut block = ChecksummedBlock::new(stack1); + + assert!(block.write(b"Written by thread 1.")?); + println!("[t1] write → committed"); + + let payload = block.read()?; + println!("[t1] read → hit {:?}", trim(payload)); + + barrier1.wait(); // release thread 2 + Ok(()) + }); + + // ── Thread 2: wait for thread 1, then write with a stale cache ──────────── + let stack2 = Arc::clone(&stack); + let barrier2 = Arc::clone(&barrier); + let t2 = thread::spawn(move || -> io::Result<()> { + let mut block = ChecksummedBlock::new(stack2); + + barrier2.wait(); // thread 1 has written by now; our cache (None) is stale + + // First attempt: stale — eq_crds sees a checksum mismatch, cache is + // refreshed via get_into, returns false. + let committed = block.write(b"Written by thread 2.")?; + println!("[t2] write (stale) → committed={committed}, cache refreshed"); + + // Retry: cache is now current, write succeeds. + let committed = block.write(b"Written by thread 2.")?; + println!("[t2] write (retry) → committed={committed}"); + + let payload = block.read()?; + println!("[t2] read → hit {:?}", trim(payload)); + + Ok(()) + }); + + t1.join().unwrap()?; + t2.join().unwrap()?; + Ok(()) +} + +#[cfg(not(all(feature = "set", feature = "atomic")))] +fn main() { + eprintln!("This example requires the 'set' and 'atomic' features."); + eprintln!("Run: cargo run --example checksummed_cache --features \"set,atomic\""); +} diff --git a/examples/linked_list.c b/examples/linked_list.c new file mode 100644 index 0000000..83a0df2 --- /dev/null +++ b/examples/linked_list.c @@ -0,0 +1,235 @@ +/* + * Traversing a variable-sized linked list stored in bstack using + * bstack_get_batched_gen. + * + * Demonstrates the generator pattern for reading linked lists where nodes have + * different sizes. The generator reads all data payloads into a single + * contiguous buffer under one lock by: + * 1. Reading a node's header (next_ptr + data_size) into a stack-allocated slot + * 2. Parsing data_size to determine the payload length + * 3. Issuing the next read for the data directly into the buffer + * + * Headers are never stored in the buffer — only data payloads are accumulated. + * This avoids an interleaved header/data layout and eliminates the need to skip + * over headers during reconstruction. + * + * Layout (each node): + * next_offset : u64 LE (8 bytes) — offset to next node, or UINT64_MAX + * data_size : u64 LE (8 bytes) — payload length in bytes + * data : [u8] (variable) — node payload + * + * Build and run: + * make -C ../c example-linked_list + */ + +#ifndef BSTACK_FEATURE_ATOMIC +# error "linked_list.c requires -DBSTACK_FEATURE_ATOMIC" +#endif + +#include "../c/bstack.h" + +#include +#include +#include + +#define SENTINEL UINT64_MAX +#define HEADER_SIZE 16 /* next_offset (8) + data_size (8) */ +#define MAX_NODES 100 + +/* ── Little-endian helpers ───────────────────────────────────────────────── */ + +static uint64_t read_le64(const uint8_t *p) +{ + return (uint64_t)p[0] + | ((uint64_t)p[1] << 8) + | ((uint64_t)p[2] << 16) + | ((uint64_t)p[3] << 24) + | ((uint64_t)p[4] << 32) + | ((uint64_t)p[5] << 40) + | ((uint64_t)p[6] << 48) + | ((uint64_t)p[7] << 56); +} + +static void write_le64(uint8_t *p, uint64_t v) +{ + p[0] = (uint8_t)(v); + p[1] = (uint8_t)(v >> 8); + p[2] = (uint8_t)(v >> 16); + p[3] = (uint8_t)(v >> 24); + p[4] = (uint8_t)(v >> 32); + p[5] = (uint8_t)(v >> 40); + p[6] = (uint8_t)(v >> 48); + p[7] = (uint8_t)(v >> 56); +} + +/* ── Generator state ─────────────────────────────────────────────────────── */ + +typedef struct { + size_t data_size; + uint64_t next_offset; +} node_info_t; + +typedef struct { + uint8_t header[HEADER_SIZE]; /* stack-allocated, reused each iteration */ + uint8_t *buf; /* data payloads only */ + size_t buf_cap; + size_t buf_len; + /* nodes records (data_size, next_offset) so the post-traversal loop can + * print each node individually. In a real application you would consume + * each payload inside the generator itself and skip this array entirely. */ + node_info_t nodes[MAX_NODES]; + uint64_t current_offset; /* file offset of the next node header */ + size_t current_pos; /* write cursor into buf */ + int node_count; + int reading_header; /* 1 = next yield is a header read */ +} traversal_ctx_t; + +static int ensure_capacity(traversal_ctx_t *ctx, size_t needed) +{ + if (needed <= ctx->buf_cap) return 0; + size_t cap = ctx->buf_cap ? ctx->buf_cap : 64; + while (cap < needed) cap *= 2; + uint8_t *p = realloc(ctx->buf, cap); + if (!p) return -1; + ctx->buf = p; + ctx->buf_cap = cap; + return 0; +} + +/* + * Generator callback for bstack_get_batched_gen. + * + * On each invocation the buffer slice requested by the previous call has + * already been filled. Alternates between yielding the stack-allocated + * header slot and a data slot in the heap buffer for each node. + */ +static int list_gen(uint64_t *out_offset, uint8_t **out_buf, size_t *out_len, + void *userctx) +{ + traversal_ctx_t *ctx = userctx; + + if (ctx->current_offset == SENTINEL || ctx->node_count >= MAX_NODES) + return 0; + + if (ctx->reading_header) { + *out_offset = ctx->current_offset; + *out_buf = ctx->header; + *out_len = HEADER_SIZE; + ctx->reading_header = 0; + return 1; + } else { + /* Header has been filled; parse it. */ + uint64_t next_offset = read_le64(ctx->header); + uint64_t data_size = read_le64(ctx->header + 8); + uint64_t data_file_off = ctx->current_offset + HEADER_SIZE; + + if (ensure_capacity(ctx, ctx->current_pos + (size_t)data_size) < 0) return -1; + + ctx->nodes[ctx->node_count].data_size = (size_t)data_size; + ctx->nodes[ctx->node_count].next_offset = next_offset; + + *out_offset = data_file_off; + *out_buf = ctx->buf + ctx->current_pos; + *out_len = (size_t)data_size; + + ctx->current_offset = next_offset; + ctx->current_pos += (size_t)data_size; + ctx->buf_len = ctx->current_pos; + ctx->node_count++; + ctx->reading_header = 1; + return 1; + } +} + +/* ── Main ────────────────────────────────────────────────────────────────── */ + +int main(void) +{ + const char *path = "linked_list_example.bstack"; + remove(path); + + bstack_t *stack = bstack_open(path); + if (!stack) { perror("bstack_open"); return 1; } + + /* Build a linked list of text blocks by appending each as the new head. + * Iterate in reverse so that the final head points to the first block. */ + const char *blocks[] = { + "This example ", + "demonstrates ", + "the usage of ", + "get_batched_gen ", + "generator pattern", + }; + int n_blocks = (int)(sizeof(blocks) / sizeof(blocks[0])); + + uint64_t head_offset = SENTINEL; + for (int i = n_blocks - 1; i >= 0; i--) { + size_t data_len = strlen(blocks[i]); + uint64_t data_size = (uint64_t)data_len; + size_t node_len = HEADER_SIZE + data_len; + + uint8_t *node = malloc(node_len); + if (!node) { perror("malloc"); bstack_close(stack); return 1; } + + write_le64(node, head_offset); /* next pointer */ + write_le64(node + 8, data_size); /* payload size */ + memcpy(node + HEADER_SIZE, blocks[i], data_len); + + if (bstack_push(stack, node, node_len, &head_offset) < 0) { + perror("bstack_push"); free(node); bstack_close(stack); return 1; + } + free(node); + } + + printf("Built %d-node list; head at offset %llu\n", + n_blocks, (unsigned long long)head_offset); + + /* Traverse with bstack_get_batched_gen — entire chain under one lock. */ + traversal_ctx_t ctx = { + .header = {0}, + .buf = NULL, + .buf_cap = 0, + .buf_len = 0, + .current_offset = head_offset, + .current_pos = 0, + .node_count = 0, + .reading_header = 1, + }; + + if (bstack_get_batched_gen(stack, list_gen, &ctx) < 0) { + perror("bstack_get_batched_gen"); + free(ctx.buf); + bstack_close(stack); + return 1; + } + + printf("\nTraversed %d nodes under a single lock\n", ctx.node_count); + printf("Total buffer size: %zu bytes\n", ctx.buf_len); + + /* Print each node directly from the data-only buffer — no header-skipping needed. */ + size_t pos = 0; + + for (int i = 0; i < ctx.node_count; i++) { + size_t dsize = ctx.nodes[i].data_size; + uint64_t next = ctx.nodes[i].next_offset; + const uint8_t *data = ctx.buf + pos; + + char next_str[32]; + if (next == SENTINEL) + snprintf(next_str, sizeof(next_str), "null"); + else + snprintf(next_str, sizeof(next_str), "%llu", (unsigned long long)next); + + printf(" Node %d: size=%zu, text=%.*s, next=%s\n", + i, dsize, (int)dsize, data, next_str); + + pos += dsize; + } + + printf("\nReconstructed text: \"%.*s\"\n", (int)ctx.buf_len, ctx.buf); + printf("Expected: \"This example demonstrates the usage of get_batched_gen generator pattern\"\n"); + + free(ctx.buf); + bstack_close(stack); + return 0; +} diff --git a/examples/linked_list.rs b/examples/linked_list.rs new file mode 100644 index 0000000..9072d06 --- /dev/null +++ b/examples/linked_list.rs @@ -0,0 +1,166 @@ +//! Traversing a variable-sized linked list stored in bstack using `get_batched_gen`. +//! +//! This example demonstrates the generator pattern for reading linked lists where +//! nodes have different sizes. The generator reads all data payloads into a single +//! contiguous buffer under one lock by: +//! 1. Reading a node's header (next_ptr + size) into a stack-allocated slot +//! 2. Parsing the size to determine data length +//! 3. Issuing the next read for the data directly into the buffer +//! +//! Headers are never stored in the buffer — only data payloads are accumulated. +//! This avoids interleaved header/data layout and eliminates the need to skip +//! over headers during reconstruction. +//! +//! ## Layout +//! +//! Each node has variable size: +//! - `next_offset: u64` (8 bytes, LE) — offset to the next node, or `u64::MAX` for tail +//! - `data_size: u64` (8 bytes, LE) — size of the data payload in bytes +//! - `data: [u8]` (variable bytes) — node payload +//! +//! ## Example data +//! +//! The string "This example demonstrates the usage of get_batched_gen generator pattern" +//! is split into blocks and stored as a linked list. +//! +//! ## How to run +//! +//! ```text +//! cargo run --example linked_list --features atomic +//! ``` + +#[cfg(feature = "atomic")] +use bstack::BStack; +#[cfg(feature = "atomic")] +use std::io; + +#[cfg(feature = "atomic")] +const SENTINEL: u64 = u64::MAX; +#[cfg(feature = "atomic")] +const HEADER_SIZE: usize = 16; // next_offset (8) + data_size (8) + +#[cfg(feature = "atomic")] +fn main() -> io::Result<()> { + let path = "linked_list_example.bstack"; + let _ = std::fs::remove_file(path); + let stack = BStack::open(path)?; + + // Build a linked list of text blocks by appending each as the head. + // Layout: [next_offset: u64 | data_size: u64 | data: [u8]] + let mut blocks = [ + "This example ", + "demonstrates ", + "the usage of ", + "get_batched_gen ", + "generator pattern", + ]; + + blocks.reverse(); + + let mut head_offset = SENTINEL; // initially empty + for block in blocks.iter() { + let data_bytes = block.as_bytes(); + let data_size = data_bytes.len() as u64; + + let mut node = Vec::with_capacity(HEADER_SIZE + data_bytes.len()); + node.extend_from_slice(&head_offset.to_le_bytes()); // next pointer + node.extend_from_slice(&data_size.to_le_bytes()); // data size + node.extend_from_slice(data_bytes); // payload + + head_offset = stack.push(&node)?; + } + + println!( + "Built {}-node list; head at offset {}", + blocks.len(), + head_offset + ); + + // Traverse the list using get_batched_gen. Headers are read into a single + // stack-allocated slot (reused each iteration) and never stored in the main + // buffer. Only data payloads are accumulated, so no header-skipping is + // needed during reconstruction. + let mut buffer = Vec::new(); // data payloads only + let mut header = [0u8; HEADER_SIZE]; // reused for every node header + // node_info records (data_size, next_offset) so the post-traversal loop can + // print each node individually. In a real application you would consume each + // payload inside the generator itself (e.g. build the string on the fly) and + // skip this array entirely. + let mut node_info: Vec<(usize, u64)> = Vec::new(); + let mut current_offset = head_offset; + let mut current_pos = 0usize; + let mut reading_header = true; + + stack.get_batched_gen(|| { + if current_offset == SENTINEL || node_info.len() >= 100 { + return None; + } + + if reading_header { + // Yield the stack-allocated header slot. + reading_header = false; + // SAFETY: `header` lives for the duration of get_batched_gen + let buf_slice = + unsafe { std::slice::from_raw_parts_mut(header.as_mut_ptr(), HEADER_SIZE) }; + Some((current_offset, buf_slice)) + } else { + // Header has been filled; parse it. + let next_offset = u64::from_le_bytes(header[..8].try_into().unwrap()); + let data_size = u64::from_le_bytes(header[8..].try_into().unwrap()) as usize; + let data_file_offset = current_offset + HEADER_SIZE as u64; + + buffer.resize(current_pos + data_size, 0); + let buf_ptr = buffer[current_pos..current_pos + data_size].as_mut_ptr(); + + node_info.push((data_size, next_offset)); + current_offset = next_offset; + current_pos += data_size; + reading_header = true; + + // SAFETY: buffer is resized to include this range; slice is within bounds + let buf_slice = unsafe { std::slice::from_raw_parts_mut(buf_ptr, data_size) }; + Some((data_file_offset, buf_slice)) + } + })?; + + let node_count = node_info.len(); + println!("\nTraversed {} nodes under a single lock", node_count); + println!("Total buffer size: {} bytes", buffer.len()); + + // Print each node directly from the data-only buffer — no header-skipping needed. + let mut pos = 0usize; + for (i, &(data_size, next_offset)) in node_info.iter().enumerate() { + let data = &buffer[pos..pos + data_size]; + let text = std::str::from_utf8(data).unwrap(); + + println!( + " Node {}: size={}, text={:?}, next={}", + i, + data_size, + text, + if next_offset == SENTINEL { + "null".to_string() + } else { + next_offset.to_string() + } + ); + + pos += data_size; + } + + println!( + "\nReconstructed text: {:?}", + std::str::from_utf8(&buffer).unwrap() + ); + println!( + "Expected: \"This example demonstrates the usage of get_batched_gen generator pattern\"" + ); + + Ok(()) +} + +#[cfg(not(feature = "atomic"))] +fn main() { + eprintln!("This example requires the 'atomic' feature."); + eprintln!("Run: cargo run --example linked_list --features atomic"); +} diff --git a/examples/move_and_cow.c b/examples/move_and_cow.c new file mode 100644 index 0000000..89abab7 --- /dev/null +++ b/examples/move_and_cow.c @@ -0,0 +1,177 @@ +/* + * Move semantics with bstack_cross_exchange and copy-on-write with bstack_copy. + * + * Scenario 1: Move semantics with bstack_cross_exchange + * A fixed-size message queue where messages are 32-byte slots. When a + * consumer "takes" a message, we swap it with a sentinel value (all zeros) + * to mark it as consumed, atomically retrieving the old content. + * + * Scenario 2: Copy-on-write with bstack_copy + * A versioned key-value store where updates create new versions by copying + * the old record and appending the new one, leaving the original intact. + * + * Build and run: + * make -C ../c example-move_and_cow + */ + +#if !defined(BSTACK_FEATURE_SET) || !defined(BSTACK_FEATURE_ATOMIC) +# error "move_and_cow.c requires -DBSTACK_FEATURE_SET -DBSTACK_FEATURE_ATOMIC" +#endif + +#include "../c/bstack.h" + +#include +#include +#include +#include + +#define MSG_SIZE 32 + +static void move_semantics_demo(void) +{ + printf("=== Move semantics with cross_exchange ===\n"); + + bstack_t *stack = bstack_open("move_example.bstack"); + if (!stack) { perror("bstack_open"); exit(1); } + + /* Create a message queue with 4 fixed-size slots. */ + for (int i = 0; i < 4; i++) { + uint8_t msg[MSG_SIZE]; + memset(msg, 0, MSG_SIZE); + snprintf((char *)msg, MSG_SIZE, "Message #%d", i + 1); + bstack_push(stack, msg, MSG_SIZE, NULL); + printf("Enqueued: %s\n", (char *)msg); + } + + /* "Take" message #2 (offset 32) by swapping it with zeros. */ + uint8_t sentinel[MSG_SIZE]; + memset(sentinel, 0, MSG_SIZE); + uint64_t sentinel_offset; + bstack_push(stack, sentinel, MSG_SIZE, &sentinel_offset); + + printf("\nTaking message #2 (offset 32)...\n"); + + /* bstack_cross_exchange swaps [a, a+n) with [b, b+n). */ + if (bstack_cross_exchange(stack, MSG_SIZE, sentinel_offset, MSG_SIZE) != 0) { + perror("bstack_cross_exchange"); + bstack_close(stack); + exit(1); + } + + /* Now slot #2 is zeros, and the sentinel area holds the old message. */ + uint8_t taken[MSG_SIZE]; + bstack_get(stack, sentinel_offset, sentinel_offset + MSG_SIZE, taken); + printf("Taken: %s\n", (char *)taken); + + /* Show the queue state. */ + printf("\nQueue after take:\n"); + for (int i = 0; i < 4; i++) { + uint8_t msg[MSG_SIZE + 1]; + uint64_t offset = (uint64_t)i * MSG_SIZE; + bstack_get(stack, offset, offset + MSG_SIZE, msg); + msg[MSG_SIZE] = '\0'; + + /* Trim trailing nulls for display. */ + size_t len = strlen((char *)msg); + if (len == 0) { + printf(" Slot %d: \n", i); + } else { + printf(" Slot %d: %s\n", i, (char *)msg); + } + } + + bstack_close(stack); +} + +static void copy_on_write_demo(void) +{ + printf("\n=== Copy-on-write with copy ===\n"); + + bstack_t *stack = bstack_open("cow_example.bstack"); + if (!stack) { perror("bstack_open"); exit(1); } + + /* Version 1: initial key-value record (16 bytes: 8-byte key + 8-byte value). */ + uint64_t key = 42; + uint64_t value_v1 = 1000; + uint8_t record[16]; + memcpy(record, &key, 8); + memcpy(record + 8, &value_v1, 8); + + uint64_t offset_v1; + bstack_push(stack, record, 16, &offset_v1); + printf("v1: key=%llu, value=%llu at offset %llu\n", + (unsigned long long)key, (unsigned long long)value_v1, + (unsigned long long)offset_v1); + + /* Version 2: copy the old record and append an updated one. */ + uint64_t value_v2 = 2000; + + /* First, extend the stack to make room for the copy. */ + uint8_t zeros[16] = {0}; + bstack_push(stack, zeros, 16, NULL); + uint64_t offset_v2 = offset_v1 + 16; /* the new copy location */ + + /* Now copy v1 to v2. */ + if (bstack_copy(stack, offset_v1, offset_v2, 16) != 0) { + perror("bstack_copy"); + bstack_close(stack); + exit(1); + } + + /* Now overwrite just the value field (bytes 8..16) in the new copy. */ + if (bstack_set(stack, offset_v2 + 8, (const uint8_t *)&value_v2, 8) != 0) { + perror("bstack_set"); + bstack_close(stack); + exit(1); + } + printf("v2: key=%llu, value=%llu at offset %llu (copied from v1)\n", + (unsigned long long)key, (unsigned long long)value_v2, + (unsigned long long)offset_v2); + + /* Version 3: another update. */ + uint64_t value_v3 = 3000; + + /* Extend for v3. */ + bstack_push(stack, zeros, 16, NULL); + uint64_t offset_v3 = offset_v2 + 16; + + /* Copy v2 to v3. */ + if (bstack_copy(stack, offset_v2, offset_v3, 16) != 0) { + perror("bstack_copy"); + bstack_close(stack); + exit(1); + } + + if (bstack_set(stack, offset_v3 + 8, (const uint8_t *)&value_v3, 8) != 0) { + perror("bstack_set"); + bstack_close(stack); + exit(1); + } + printf("v3: key=%llu, value=%llu at offset %llu (copied from v2)\n", + (unsigned long long)key, (unsigned long long)value_v3, + (unsigned long long)offset_v3); + + /* All versions are preserved. */ + printf("\nAll versions in the stack:\n"); + uint64_t offsets[3] = {offset_v1, offset_v2, offset_v3}; + for (int i = 0; i < 3; i++) { + uint8_t rec[16]; + bstack_get(stack, offsets[i], offsets[i] + 16, rec); + + uint64_t k, v; + memcpy(&k, rec, 8); + memcpy(&v, rec + 8, 8); + + printf(" v%d: key=%llu, value=%llu\n", + i + 1, (unsigned long long)k, (unsigned long long)v); + } + + bstack_close(stack); +} + +int main(void) +{ + move_semantics_demo(); + copy_on_write_demo(); + return 0; +} diff --git a/examples/move_and_cow.rs b/examples/move_and_cow.rs new file mode 100644 index 0000000..f3d8642 --- /dev/null +++ b/examples/move_and_cow.rs @@ -0,0 +1,154 @@ +//! Move semantics with `cross_exchange` and copy-on-write with `copy`. +//! +//! ## Scenario 1: Move semantics with `cross_exchange` +//! +//! A file-backed message queue where messages are fixed-size slots. When a +//! consumer "takes" a message, we swap it with a sentinel value (all zeros) +//! to mark it as consumed, atomically retrieving the old content. +//! +//! ## Scenario 2: Copy-on-write with `copy` +//! +//! A versioned key-value store where updates create new versions by copying +//! the old record and appending the new one, leaving the original intact. +//! +//! ## How to run +//! +//! ```text +//! cargo run --example move_and_cow --features "set,atomic" +//! ``` + +#[cfg(all(feature = "set", feature = "atomic"))] +use bstack::BStack; +#[cfg(all(feature = "set", feature = "atomic"))] +use std::io; + +#[cfg(all(feature = "set", feature = "atomic"))] +const MSG_SIZE: u64 = 32; + +#[cfg(all(feature = "set", feature = "atomic"))] +fn main() -> io::Result<()> { + move_semantics_demo()?; + println!(); + copy_on_write_demo()?; + Ok(()) +} + +#[cfg(all(feature = "set", feature = "atomic"))] +fn move_semantics_demo() -> io::Result<()> { + println!("=== Move semantics with cross_exchange ==="); + + let path = "move_example.bstack"; + let _ = std::fs::remove_file(path); + let stack = BStack::open(path)?; + + // Create a message queue with 4 fixed-size slots. + for i in 0..4 { + let mut msg = vec![0u8; MSG_SIZE as usize]; + let text = format!("Message #{}", i + 1); + msg[..text.len()].copy_from_slice(text.as_bytes()); + stack.push(&msg)?; + println!("Enqueued: {}", text); + } + + // "Take" message #2 (offset 32) by swapping it with zeros. + // We need a staging area for the zeros and the retrieved message. + let sentinel_offset = stack.push(&vec![0u8; MSG_SIZE as usize])?; + println!("\nTaking message #2 (offset 32)..."); + + // cross_exchange(a, b, n): swaps [a, a+n) with [b, b+n). + // We swap slot #2 (offset 32) with the sentinel (offset 128). + stack.cross_exchange(MSG_SIZE, sentinel_offset, MSG_SIZE)?; + + // Now slot #2 is zeros, and the sentinel area holds the old message. + let taken = stack.get(sentinel_offset, sentinel_offset + MSG_SIZE)?; + let msg_text = String::from_utf8_lossy(&taken); + let msg_text = msg_text.trim_end_matches('\0'); + println!("Taken: {}", msg_text); + + // Show the queue state. + println!("\nQueue after take:"); + for i in 0..4 { + let offset = i * MSG_SIZE; + let msg = stack.get(offset, offset + MSG_SIZE)?; + let text = String::from_utf8_lossy(&msg) + .trim_end_matches('\0') + .to_string(); + if text.is_empty() { + println!(" Slot {}: ", i); + } else { + println!(" Slot {}: {}", i, text); + } + } + + Ok(()) +} + +#[cfg(all(feature = "set", feature = "atomic"))] +fn copy_on_write_demo() -> io::Result<()> { + println!("=== Copy-on-write with copy ==="); + + let path = "cow_example.bstack"; + let _ = std::fs::remove_file(path); + let stack = BStack::open(path)?; + + // Version 1: initial key-value record (16 bytes: 8-byte key + 8-byte value). + let key = 42u64; + let value_v1 = 1000u64; + let mut record = Vec::with_capacity(16); + record.extend_from_slice(&key.to_le_bytes()); + record.extend_from_slice(&value_v1.to_le_bytes()); + let offset_v1 = stack.push(&record)?; + println!( + "v1: key={}, value={} at offset {}", + key, value_v1, offset_v1 + ); + + // Version 2: copy the old record and append an updated one. + let value_v2 = 2000u64; + + // First, extend the stack to make room for the copy. + stack.push(&vec![0u8; 16])?; + let offset_v2 = offset_v1 + 16; // the new copy location + + // Now copy v1 to v2. + stack.copy(offset_v1, offset_v2, 16)?; + + // Now overwrite just the value field (bytes 8..16) in the new copy. + stack.set(offset_v2 + 8, &value_v2.to_le_bytes())?; + println!( + "v2: key={}, value={} at offset {} (copied from v1)", + key, value_v2, offset_v2 + ); + + // Version 3: another update. + let value_v3 = 3000u64; + + // Extend for v3. + stack.push(&vec![0u8; 16])?; + let offset_v3 = offset_v2 + 16; + + // Copy v2 to v3. + stack.copy(offset_v2, offset_v3, 16)?; + stack.set(offset_v3 + 8, &value_v3.to_le_bytes())?; + println!( + "v3: key={}, value={} at offset {} (copied from v2)", + key, value_v3, offset_v3 + ); + + // All versions are preserved. + println!("\nAll versions in the stack:"); + for (i, offset) in [offset_v1, offset_v2, offset_v3].iter().enumerate() { + let rec = stack.get(*offset, *offset + 16)?; + let k = u64::from_le_bytes(rec[0..8].try_into().unwrap()); + let v = u64::from_le_bytes(rec[8..16].try_into().unwrap()); + println!(" v{}: key={}, value={}", i + 1, k, v); + } + + Ok(()) +} + +#[cfg(not(all(feature = "set", feature = "atomic")))] +fn main() { + eprintln!("This example requires the 'set' and 'atomic' features."); + eprintln!("Run: cargo run --example move_and_cow --features \"set,atomic\""); +} diff --git a/src/lib.rs b/src/lib.rs index d2dd17e..fc2fca1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -135,11 +135,13 @@ //! |-----------|-----------------------|--------------| //! | `push`, `extend`, `pop`, `pop_into`, `discard` | write | write | //! | `set`, `zero` *(feature)* | write | write | -//! | `atrunc`, `splice`, `splice_into`, `try_extend` *(feature: atomic)* | write | write | +//! | `atrunc`, `splice`, `splice_into`, `try_extend`, `try_extend_zeros` *(feature: atomic)* | write | write | //! | `try_discard(s, n > 0)` *(feature: atomic)* | write | write | //! | `try_discard(s, 0)` *(feature: atomic)* | **read** | **read** | +//! | `get_batched`, `get_batched_into`, `get_batched_gen` *(feature: atomic)* | **read** | write | //! | `swap`, `swap_into`, `cas` *(features: set+atomic)* | write | write | -//! | `process` *(features: set+atomic)* | write | write | +//! | `cross_exchange`, `copy`, `process` *(features: set+atomic)* | write | write | +//! | `eq_crds`, `ne_crds`, `masked_eq_crds`, `masked_ne_crds` *(features: set+atomic)* | write | write | //! | `replace` *(feature: atomic)* | write | write | //! | `peek`, `peek_into`, `get`, `get_into` | **read** | write | //! | `len` | read | read | @@ -190,9 +192,13 @@ //! //! * **Write protection.** [`set`](BStack::set), [`zero`](BStack::zero), //! [`swap`](BStack::swap), [`swap_into`](BStack::swap_into), -//! [`cas`](BStack::cas), and [`process`](BStack::process) return -//! [`io::ErrorKind::InvalidInput`] when their target range overlaps the -//! locked region. [`atrunc`](BStack::atrunc), [`splice`](BStack::splice), +//! [`cas`](BStack::cas), [`process`](BStack::process), +//! [`cross_exchange`](BStack::cross_exchange), [`copy`](BStack::copy) +//! (destination only), [`eq_crds`](BStack::eq_crds), +//! [`ne_crds`](BStack::ne_crds), [`masked_eq_crds`](BStack::masked_eq_crds), +//! and [`masked_ne_crds`](BStack::masked_ne_crds) (region B) return +//! [`io::ErrorKind::InvalidInput`] when their write target range overlaps +//! the locked region. [`atrunc`](BStack::atrunc), [`splice`](BStack::splice), //! [`splice_into`](BStack::splice_into), and [`replace`](BStack::replace) //! return the same error when the operation would modify bytes inside it. //! @@ -314,7 +320,7 @@ //! |---------|-------------| //! | `set` | Enables [`BStack::set`] and [`BStack::zero`] — in-place overwrite of existing payload bytes (or with zeros) without changing the file size. | //! | `alloc` | Enables [`BStackAllocator`], [`BStackBulkAllocator`], [`BStackSlice`], [`BStackSliceReader`], and [`LinearBStackAllocator`] — region-based allocation over a `BStack` payload. Combined with `set`, also enables [`BStackSliceWriter`], [`FirstFitBStackAllocator`], [`GhostTreeBstackAllocator`], and [`BStackByteVec`]. | -//! | `atomic` | Enables [`BStack::atrunc`], [`BStack::splice`], [`BStack::splice_into`], [`BStack::try_extend`], [`BStack::try_discard`], and [`BStack::replace`] — compound read-modify-write operations that hold the write lock across what would otherwise be separate calls. Combined with `set`, also enables [`BStack::swap`], [`BStack::swap_into`], [`BStack::cas`], and [`BStack::process`]. | +//! | `atomic` | Enables [`BStack::atrunc`], [`BStack::splice`], [`BStack::splice_into`], [`BStack::try_extend`], [`BStack::try_extend_zeros`], [`BStack::try_discard`], [`BStack::replace`], [`BStack::get_batched`], [`BStack::get_batched_into`], and [`BStack::get_batched_gen`] — compound read-modify-write operations that hold the lock across what would otherwise be separate calls. Combined with `set`, also enables [`BStack::swap`], [`BStack::swap_into`], [`BStack::cas`], [`BStack::process`], [`BStack::cross_exchange`], [`BStack::copy`], [`BStack::eq_crds`], [`BStack::ne_crds`], [`BStack::masked_eq_crds`], and [`BStack::masked_ne_crds`]. | //! //! Enable with: //! @@ -1720,6 +1726,47 @@ impl BStack { Ok(true) } + /// Append `n` zero bytes only if the current logical payload size equals `s`. + /// + /// Returns `Ok(true)` if the size matched and `n` zero bytes were appended + /// (or `n = 0` and no I/O was needed). Returns `Ok(false)` without + /// modifying the file if the size does not match. + /// + /// # Feature flag + /// + /// Only available when the `atomic` Cargo feature is enabled. + /// + /// # Errors + /// + /// Returns [`io::ErrorKind::InvalidInput`] if adding `n` to the current + /// payload size would overflow `u64`. Propagates any I/O error from + /// `set_len`, `write_committed_len`, or `durable_sync`. + #[cfg(feature = "atomic")] + pub fn try_extend_zeros(&self, s: u64, n: u64) -> io::Result { + let mut file = self.lock.write().unwrap(); + let file_end = file.seek(SeekFrom::End(0))?; + let data_size = file_end - HEADER_SIZE; + if data_size != s { + return Ok(false); + } + if n == 0 { + return Ok(true); + } + let new_len = data_size.checked_add(n).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "try_extend_zeros: data_size + n overflows u64", + ) + })?; + file.set_len(HEADER_SIZE + new_len)?; + if let Err(e) = write_committed_len(&mut file, new_len).and_then(|_| durable_sync(&file)) { + let _ = file.set_len(file_end); + let _ = write_committed_len(&mut file, data_size); + return Err(e); + } + Ok(true) + } + /// Discard `n` bytes only if the current logical payload size equals `s`. /// /// Returns `Ok(true)` if the size matched and `n` bytes were removed (or @@ -1770,6 +1817,232 @@ impl BStack { Ok(true) } + /// Read multiple logical ranges in a single lock acquisition. + /// + /// Takes any iterator whose items are [`Range`](std::ops::Range) and + /// returns a [`Vec`] of owned byte buffers, one per input range, in the + /// same order. An empty iterator returns an empty `Vec`. An empty range + /// (`start == end`) produces an empty inner `Vec`. + /// + /// All reads happen under the same shared lock, so no write can interleave + /// between them. On Unix and Windows the shared read lock is taken once + /// for all non-locked ranges; on other platforms the write lock serialises + /// all reads. + /// + /// # Feature flag + /// + /// Only available when the `atomic` Cargo feature is enabled. + /// + /// # Errors + /// + /// Returns [`io::ErrorKind::InvalidInput`] if any range has `end < start` + /// or if any `end` exceeds the current payload size. + #[cfg(feature = "atomic")] + pub fn get_batched(&self, ranges: I) -> io::Result>> + where + I: IntoIterator>, + { + let ranges: Vec> = ranges.into_iter().collect(); + if ranges.is_empty() { + return Ok(Vec::new()); + } + for r in &ranges { + if r.end < r.start { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("get_batched: end ({}) < start ({})", r.end, r.start), + )); + } + } + #[cfg(any(unix, windows))] + { + let file = self.lock.read().unwrap(); + let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); + let mut results = Vec::with_capacity(ranges.len()); + for r in &ranges { + if r.end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "get_batched: end ({}) exceeds payload size ({data_size})", + r.end + ), + )); + } + results.push(pread_exact( + &file, + HEADER_SIZE + r.start, + (r.end - r.start) as usize, + )?); + } + Ok(results) + } + #[cfg(not(any(unix, windows)))] + { + let mut file = self.lock.write().unwrap(); + let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); + let mut results = Vec::with_capacity(ranges.len()); + for r in &ranges { + if r.end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "get_batched: end ({}) exceeds payload size ({data_size})", + r.end + ), + )); + } + file.seek(SeekFrom::Start(HEADER_SIZE + r.start))?; + let mut buf = vec![0u8; (r.end - r.start) as usize]; + file.read_exact(&mut buf)?; + results.push(buf); + } + Ok(results) + } + } + + /// Read multiple logical ranges into caller-provided buffers in a single lock acquisition. + /// + /// Takes any iterator whose items are `(u64, &mut [u8])` — a start offset + /// and a mutable buffer to fill. The number of bytes read for each entry + /// equals `buf.len()`. An empty iterator returns immediately. + /// + /// All reads happen under the same shared lock, so no write can interleave + /// between them. On Unix and Windows the shared read lock is taken once + /// for all reads; on other platforms the write lock serialises all reads. + /// + /// # Feature flag + /// + /// Only available when the `atomic` Cargo feature is enabled. + /// + /// # Errors + /// + /// Returns [`io::ErrorKind::InvalidInput`] if any `offset + buf.len()` + /// overflows `u64` or if any read would extend beyond the current payload + /// size. + #[cfg(feature = "atomic")] + pub fn get_batched_into<'a, I>(&self, bufs: I) -> io::Result<()> + where + I: IntoIterator, + { + let bufs: Vec<(u64, &'a mut [u8])> = bufs.into_iter().collect(); + if bufs.is_empty() { + return Ok(()); + } + #[cfg(any(unix, windows))] + { + let file = self.lock.read().unwrap(); + let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); + for (ptr, buf) in bufs { + let end = ptr.checked_add(buf.len() as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "get_batched_into: offset + buf.len() overflows u64", + ) + })?; + if end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("get_batched_into: end ({end}) exceeds payload size ({data_size})",), + )); + } + pread_exact_into(&file, HEADER_SIZE + ptr, buf)?; + } + Ok(()) + } + #[cfg(not(any(unix, windows)))] + { + let mut file = self.lock.write().unwrap(); + let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); + for (ptr, buf) in bufs { + let end = ptr.checked_add(buf.len() as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "get_batched_into: offset + buf.len() overflows u64", + ) + })?; + if end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("get_batched_into: end ({end}) exceeds payload size ({data_size})",), + )); + } + file.seek(SeekFrom::Start(HEADER_SIZE + ptr))?; + file.read_exact(buf)?; + } + Ok(()) + } + } + + /// Read a dependent chain of logical ranges in a single lock acquisition. + /// + /// `gen` is called once per read step. Each call returns `Some((offset, + /// buf))` to request a read of `buf.len()` bytes starting at `offset` into + /// `buf`, or `None` to stop. When `gen` is called, the buffer supplied by + /// the *previous* call has already been filled with its data — the call + /// itself signals that the prior buffer is ready. + /// + /// All reads happen under the same shared lock (Unix/Windows: read lock; + /// other platforms: write lock), so no write can interleave between steps. + /// + /// # Feature flag + /// + /// Only available when the `atomic` Cargo feature is enabled. + /// + /// # Errors + /// + /// Returns [`io::ErrorKind::InvalidInput`] if any `offset + buf.len()` + /// overflows `u64` or exceeds the current payload size. + #[cfg(feature = "atomic")] + pub fn get_batched_gen<'a, F>(&self, mut f: F) -> io::Result<()> + where + F: FnMut() -> Option<(u64, &'a mut [u8])>, + { + #[cfg(any(unix, windows))] + { + let file = self.lock.read().unwrap(); + let data_size = file.metadata()?.len().saturating_sub(HEADER_SIZE); + while let Some((offset, buf)) = f() { + let end = offset.checked_add(buf.len() as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "get_batched_gen: offset + buf.len() overflows u64", + ) + })?; + if end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("get_batched_gen: end ({end}) exceeds payload size ({data_size})"), + )); + } + pread_exact_into(&file, HEADER_SIZE + offset, buf)?; + } + Ok(()) + } + #[cfg(not(any(unix, windows)))] + { + let mut file = self.lock.write().unwrap(); + let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); + while let Some((offset, buf)) = f() { + let end = offset.checked_add(buf.len() as u64).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "get_batched_gen: offset + buf.len() overflows u64", + ) + })?; + if end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("get_batched_gen: end ({end}) exceeds payload size ({data_size})"), + )); + } + file.seek(SeekFrom::Start(HEADER_SIZE + offset))?; + file.read_exact(buf)?; + } + Ok(()) + } + } + /// Pop `n` bytes off the tail, pass them read-only to a callback that /// returns the new tail bytes, then write the new tail. /// @@ -2031,6 +2304,153 @@ impl BStack { Ok(true) } + /// Atomically swap two equal-size, non-overlapping regions within the file. + /// + /// Bytes at `[a, a + n)` and `[b, b + n)` are exchanged under a single + /// write lock, so no other thread can observe an intermediate state. + /// The file size is never changed. `n = 0` is a valid no-op (bounds are + /// still checked). + /// + /// # Feature flags + /// + /// Only available when both the `set` and `atomic` Cargo features are + /// enabled. + /// + /// # Errors + /// + /// Returns [`io::ErrorKind::InvalidInput`] if either `a + n` or `b + n` + /// overflows `u64`, if the regions overlap, if either region exceeds the + /// current payload size, or if either region overlaps the locked prefix. + /// Propagates any I/O error from `read_exact`, `write_all`, or + /// `durable_sync`. + #[cfg(all(feature = "set", feature = "atomic"))] + pub fn cross_exchange(&self, a: u64, b: u64, n: u64) -> io::Result<()> { + let a_end = a.checked_add(n).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "cross_exchange: a + n overflows u64", + ) + })?; + let b_end = b.checked_add(n).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "cross_exchange: b + n overflows u64", + ) + })?; + if n > 0 { + let (lo, hi) = if a < b { (a, b) } else { (b, a) }; + if lo + n > hi { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("cross_exchange: regions [{a}, {a_end}) and [{b}, {b_end}) overlap"), + )); + } + } + let mut file = self.lock.write().unwrap(); + let locked = self.locked.load(Ordering::Acquire); + if a < locked { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "cross_exchange: region [{a}, {a_end}) overlaps locked region [0, {locked})" + ), + )); + } + if b < locked { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "cross_exchange: region [{b}, {b_end}) overlaps locked region [0, {locked})" + ), + )); + } + let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); + if a_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("cross_exchange: region [{a}, {a_end}) exceeds payload size ({data_size})"), + )); + } + if b_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("cross_exchange: region [{b}, {b_end}) exceeds payload size ({data_size})"), + )); + } + if n == 0 { + return Ok(()); + } + file.seek(SeekFrom::Start(HEADER_SIZE + a))?; + let mut buf_a = vec![0u8; n as usize]; + file.read_exact(&mut buf_a)?; + file.seek(SeekFrom::Start(HEADER_SIZE + b))?; + let mut buf_b = vec![0u8; n as usize]; + file.read_exact(&mut buf_b)?; + file.seek(SeekFrom::Start(HEADER_SIZE + a))?; + file.write_all(&buf_b)?; + file.seek(SeekFrom::Start(HEADER_SIZE + b))?; + file.write_all(&buf_a)?; + durable_sync(&file) + } + + /// Copy `n` bytes from `from..from+n` to `to..to+n` under a single write lock. + /// + /// The source is read into a temporary buffer before writing, so overlapping + /// regions are handled correctly. `n = 0` is a valid no-op (bounds are + /// still checked). The file size is never changed. + /// + /// # Feature flags + /// + /// Only available when both the `set` and `atomic` Cargo features are + /// enabled. + /// + /// # Errors + /// + /// Returns [`io::ErrorKind::InvalidInput`] if either `from + n` or `to + n` + /// overflows `u64`, if either region exceeds the current payload size, or + /// if the destination region overlaps the locked prefix. + /// Propagates any I/O error from `read_exact`, `write_all`, or + /// `durable_sync`. + #[cfg(all(feature = "set", feature = "atomic"))] + pub fn copy(&self, from: u64, to: u64, n: u64) -> io::Result<()> { + let from_end = from.checked_add(n).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "copy: from + n overflows u64") + })?; + let to_end = to.checked_add(n).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "copy: to + n overflows u64") + })?; + let mut file = self.lock.write().unwrap(); + let locked = self.locked.load(Ordering::Acquire); + if to < locked { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("copy: destination [{to}, {to_end}) overlaps locked region [0, {locked})"), + )); + } + let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); + if from_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("copy: source [{from}, {from_end}) exceeds payload size ({data_size})"), + )); + } + if to_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("copy: destination [{to}, {to_end}) exceeds payload size ({data_size})"), + )); + } + if n == 0 { + return Ok(()); + } + file.seek(SeekFrom::Start(HEADER_SIZE + from))?; + let mut buf = vec![0u8; n as usize]; + file.read_exact(&mut buf)?; + file.seek(SeekFrom::Start(HEADER_SIZE + to))?; + file.write_all(&buf)?; + durable_sync(&file) + } + /// Read bytes in the half-open logical range `[start, end)`, pass them to /// a callback that may mutate them in place, then write the modified bytes /// back. @@ -2092,6 +2512,411 @@ impl BStack { } Ok(()) } + + /// Cross-Region Dependent Swap — equal condition. + /// + /// Reads `a_expected.len()` bytes from logical offset `a_offset` and + /// compares them to `a_expected`. If they are **equal**, atomically swaps + /// region B: reads `b_buf.len()` bytes from `b_offset`, writes the current + /// contents of `b_buf` there, and returns the old region-B bytes as + /// `Ok(Some(Vec))`. If the comparison fails, returns `Ok(None)` + /// without modifying the file. + /// + /// The read of region A, the comparison, the read of region B, and the + /// write to region B all happen under the same write lock, so no other + /// thread can observe an intermediate state. The file size is never + /// changed. + /// + /// An empty `a_expected` trivially compares equal (zero bytes match zero + /// bytes). An empty `b_buf` skips the B swap and returns + /// `Ok(Some(Vec::new()))` when the condition passes. + /// + /// # Feature flags + /// + /// Only available when both the `set` and `atomic` Cargo features are + /// enabled. + /// + /// # Errors + /// + /// Returns [`io::ErrorKind::InvalidInput`] if either `a_offset + a_len` or + /// `b_offset + b_len` overflows `u64`, exceeds the current payload size, + /// or if region B overlaps the locked prefix. Propagates any I/O error + /// from `read_exact`, `write_all`, or `durable_sync`. + #[cfg(all(feature = "set", feature = "atomic"))] + pub fn eq_crds( + &self, + a_offset: u64, + a_expected: impl AsRef<[u8]>, + b_offset: u64, + b_buf: impl AsRef<[u8]>, + ) -> io::Result>> { + let a_expected = a_expected.as_ref(); + let b_buf = b_buf.as_ref(); + let a_len = a_expected.len() as u64; + let b_len = b_buf.len() as u64; + let a_end = a_offset.checked_add(a_len).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "eq_crds: a_offset + a_len overflows u64", + ) + })?; + let b_end = b_offset.checked_add(b_len).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "eq_crds: b_offset + b_len overflows u64", + ) + })?; + let mut file = self.lock.write().unwrap(); + let locked = self.locked.load(Ordering::Acquire); + if !b_buf.is_empty() && b_offset < locked { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "eq_crds: B range [{b_offset}, {b_end}) overlaps locked region [0, {locked})" + ), + )); + } + let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); + if !a_expected.is_empty() && a_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "eq_crds: A range [{a_offset}, {a_end}) exceeds payload size ({data_size})" + ), + )); + } + if !b_buf.is_empty() && b_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "eq_crds: B range [{b_offset}, {b_end}) exceeds payload size ({data_size})" + ), + )); + } + let mut a_current = vec![0u8; a_expected.len()]; + if !a_expected.is_empty() { + file.seek(SeekFrom::Start(HEADER_SIZE + a_offset))?; + file.read_exact(&mut a_current)?; + } + if a_current != a_expected { + return Ok(None); + } + if b_buf.is_empty() { + return Ok(Some(Vec::new())); + } + file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; + let mut old_b = vec![0u8; b_buf.len()]; + file.read_exact(&mut old_b)?; + file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; + file.write_all(b_buf)?; + durable_sync(&file)?; + Ok(Some(old_b)) + } + + /// Cross-Region Dependent Swap — not-equal condition. + /// + /// Like [`eq_crds`](Self::eq_crds) but performs the region-B swap only + /// when the `a_expected.len()` bytes at `a_offset` are **not equal** to + /// `a_expected`. If the bytes are not equal, atomically swaps region B: + /// reads `b_buf.len()` bytes from `b_offset`, writes the contents of + /// `b_buf` there, and returns the old region-B bytes as + /// `Ok(Some(Vec))`. Returns `Ok(None)` if the bytes compare equal + /// (swap suppressed). + /// + /// # Feature flags + /// + /// Only available when both the `set` and `atomic` Cargo features are + /// enabled. + /// + /// # Errors + /// + /// Same conditions as [`eq_crds`](Self::eq_crds). + #[cfg(all(feature = "set", feature = "atomic"))] + pub fn ne_crds( + &self, + a_offset: u64, + a_expected: impl AsRef<[u8]>, + b_offset: u64, + b_buf: impl AsRef<[u8]>, + ) -> io::Result>> { + let a_expected = a_expected.as_ref(); + let b_buf = b_buf.as_ref(); + let a_len = a_expected.len() as u64; + let b_len = b_buf.len() as u64; + let a_end = a_offset.checked_add(a_len).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "ne_crds: a_offset + a_len overflows u64", + ) + })?; + let b_end = b_offset.checked_add(b_len).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "ne_crds: b_offset + b_len overflows u64", + ) + })?; + let mut file = self.lock.write().unwrap(); + let locked = self.locked.load(Ordering::Acquire); + if !b_buf.is_empty() && b_offset < locked { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "ne_crds: B range [{b_offset}, {b_end}) overlaps locked region [0, {locked})" + ), + )); + } + let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); + if !a_expected.is_empty() && a_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "ne_crds: A range [{a_offset}, {a_end}) exceeds payload size ({data_size})" + ), + )); + } + if !b_buf.is_empty() && b_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "ne_crds: B range [{b_offset}, {b_end}) exceeds payload size ({data_size})" + ), + )); + } + let mut a_current = vec![0u8; a_expected.len()]; + if !a_expected.is_empty() { + file.seek(SeekFrom::Start(HEADER_SIZE + a_offset))?; + file.read_exact(&mut a_current)?; + } + if a_current == a_expected { + return Ok(None); + } + if b_buf.is_empty() { + return Ok(Some(Vec::new())); + } + file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; + let mut old_b = vec![0u8; b_buf.len()]; + file.read_exact(&mut old_b)?; + file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; + file.write_all(b_buf)?; + durable_sync(&file)?; + Ok(Some(old_b)) + } + + /// Cross-Region Dependent Swap — masked-equal condition. + /// + /// Like [`eq_crds`](Self::eq_crds) but the comparison applies a bitwise + /// AND mask before comparing: for each byte `i`, the condition is + /// `(A[i] & mask[i]) == (a_expected[i] & mask[i])`. `mask` and + /// `a_expected` must have the same length, which determines how many + /// bytes are read from region A. If the masked condition holds, + /// atomically swaps region B: reads `b_buf.len()` bytes from `b_offset`, + /// writes the contents of `b_buf` there, and returns the old region-B + /// bytes as `Ok(Some(Vec))`. Returns `Ok(None)` if the masked + /// condition does not hold. + /// + /// # Feature flags + /// + /// Only available when both the `set` and `atomic` Cargo features are + /// enabled. + /// + /// # Errors + /// + /// Returns [`io::ErrorKind::InvalidInput`] if `mask.len() != a_expected.len()`. + /// Same additional conditions as [`eq_crds`](Self::eq_crds). + #[cfg(all(feature = "set", feature = "atomic"))] + pub fn masked_eq_crds( + &self, + a_offset: u64, + mask: impl AsRef<[u8]>, + a_expected: impl AsRef<[u8]>, + b_offset: u64, + b_buf: impl AsRef<[u8]>, + ) -> io::Result>> { + let mask = mask.as_ref(); + let a_expected = a_expected.as_ref(); + let b_buf = b_buf.as_ref(); + if mask.len() != a_expected.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "masked_eq_crds: mask length ({}) != a_expected length ({})", + mask.len(), + a_expected.len() + ), + )); + } + let a_len = a_expected.len() as u64; + let b_len = b_buf.len() as u64; + let a_end = a_offset.checked_add(a_len).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "masked_eq_crds: a_offset + a_len overflows u64", + ) + })?; + let b_end = b_offset.checked_add(b_len).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "masked_eq_crds: b_offset + b_len overflows u64", + ) + })?; + let mut file = self.lock.write().unwrap(); + let locked = self.locked.load(Ordering::Acquire); + if !b_buf.is_empty() && b_offset < locked { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "masked_eq_crds: B range [{b_offset}, {b_end}) overlaps locked region [0, {locked})" + ), + )); + } + let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); + if !a_expected.is_empty() && a_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "masked_eq_crds: A range [{a_offset}, {a_end}) exceeds payload size ({data_size})" + ), + )); + } + if !b_buf.is_empty() && b_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "masked_eq_crds: B range [{b_offset}, {b_end}) exceeds payload size ({data_size})" + ), + )); + } + let mut a_current = vec![0u8; a_expected.len()]; + if !a_expected.is_empty() { + file.seek(SeekFrom::Start(HEADER_SIZE + a_offset))?; + file.read_exact(&mut a_current)?; + } + let masked_match = a_current + .iter() + .zip(mask.iter()) + .zip(a_expected.iter()) + .all(|((&a, &m), &e)| (a & m) == (e & m)); + if !masked_match { + return Ok(None); + } + if b_buf.is_empty() { + return Ok(Some(Vec::new())); + } + file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; + let mut old_b = vec![0u8; b_buf.len()]; + file.read_exact(&mut old_b)?; + file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; + file.write_all(b_buf)?; + durable_sync(&file)?; + Ok(Some(old_b)) + } + + /// Cross-Region Dependent Swap — masked-not-equal condition. + /// + /// Like [`masked_eq_crds`](Self::masked_eq_crds) but performs the + /// region-B swap only when **any** masked byte differs: + /// `(A[i] & mask[i]) != (a_expected[i] & mask[i])` for at least one `i`. + /// If any masked byte differs, atomically swaps region B: reads + /// `b_buf.len()` bytes from `b_offset`, writes the contents of `b_buf` + /// there, and returns the old region-B bytes as `Ok(Some(Vec))`. + /// Returns `Ok(None)` if all masked bytes compare equal (swap suppressed). + /// + /// # Feature flags + /// + /// Only available when both the `set` and `atomic` Cargo features are + /// enabled. + /// + /// # Errors + /// + /// Returns [`io::ErrorKind::InvalidInput`] if `mask.len() != a_expected.len()`. + /// Same additional conditions as [`eq_crds`](Self::eq_crds). + #[cfg(all(feature = "set", feature = "atomic"))] + pub fn masked_ne_crds( + &self, + a_offset: u64, + mask: impl AsRef<[u8]>, + a_expected: impl AsRef<[u8]>, + b_offset: u64, + b_buf: impl AsRef<[u8]>, + ) -> io::Result>> { + let mask = mask.as_ref(); + let a_expected = a_expected.as_ref(); + let b_buf = b_buf.as_ref(); + if mask.len() != a_expected.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "masked_ne_crds: mask length ({}) != a_expected length ({})", + mask.len(), + a_expected.len() + ), + )); + } + let a_len = a_expected.len() as u64; + let b_len = b_buf.len() as u64; + let a_end = a_offset.checked_add(a_len).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "masked_ne_crds: a_offset + a_len overflows u64", + ) + })?; + let b_end = b_offset.checked_add(b_len).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "masked_ne_crds: b_offset + b_len overflows u64", + ) + })?; + let mut file = self.lock.write().unwrap(); + let locked = self.locked.load(Ordering::Acquire); + if !b_buf.is_empty() && b_offset < locked { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "masked_ne_crds: B range [{b_offset}, {b_end}) overlaps locked region [0, {locked})" + ), + )); + } + let data_size = file.seek(SeekFrom::End(0))?.saturating_sub(HEADER_SIZE); + if !a_expected.is_empty() && a_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "masked_ne_crds: A range [{a_offset}, {a_end}) exceeds payload size ({data_size})" + ), + )); + } + if !b_buf.is_empty() && b_end > data_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "masked_ne_crds: B range [{b_offset}, {b_end}) exceeds payload size ({data_size})" + ), + )); + } + let mut a_current = vec![0u8; a_expected.len()]; + if !a_expected.is_empty() { + file.seek(SeekFrom::Start(HEADER_SIZE + a_offset))?; + file.read_exact(&mut a_current)?; + } + let masked_match = a_current + .iter() + .zip(mask.iter()) + .zip(a_expected.iter()) + .all(|((&a, &m), &e)| (a & m) == (e & m)); + if masked_match { + return Ok(None); + } + if b_buf.is_empty() { + return Ok(Some(Vec::new())); + } + file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; + let mut old_b = vec![0u8; b_buf.len()]; + file.read_exact(&mut old_b)?; + file.seek(SeekFrom::Start(HEADER_SIZE + b_offset))?; + file.write_all(b_buf)?; + durable_sync(&file)?; + Ok(Some(old_b)) + } } // --------------------------------------------------------------------------- diff --git a/src/test.rs b/src/test.rs index 86870c5..489e7ab 100644 --- a/src/test.rs +++ b/src/test.rs @@ -4416,6 +4416,604 @@ mod atomic_tests { assert_eq!(err.kind(), ErrorKind::InvalidInput); assert_eq!(s.len().unwrap(), 10); } + + // ----------------------------------------------------------------------- + // try_extend_zeros (require atomic) + + #[cfg(feature = "atomic")] + #[test] + fn try_extend_zeros_matching_size_appends_zeros() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let ok = s.try_extend_zeros(5, 3).unwrap(); + assert!(ok); + assert_eq!(s.len().unwrap(), 8); + assert_eq!(s.peek(0).unwrap(), b"hello\x00\x00\x00"); + } + + #[cfg(feature = "atomic")] + #[test] + fn try_extend_zeros_mismatching_size_returns_false() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let ok = s.try_extend_zeros(3, 3).unwrap(); + assert!(!ok); + assert_eq!(s.len().unwrap(), 5); + assert_eq!(s.peek(0).unwrap(), b"hello"); + } + + #[cfg(feature = "atomic")] + #[test] + fn try_extend_zeros_n_zero_is_noop_returns_true() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let ok = s.try_extend_zeros(5, 0).unwrap(); + assert!(ok); + assert_eq!(s.len().unwrap(), 5); + } + + #[cfg(feature = "atomic")] + #[test] + fn try_extend_zeros_content_is_zeros() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"ab").unwrap(); + s.try_extend_zeros(2, 4).unwrap(); + assert_eq!(s.get(2, 6).unwrap(), b"\x00\x00\x00\x00"); + } + + #[cfg(feature = "atomic")] + #[test] + fn try_extend_zeros_persists_across_reopen() { + let (s, p) = mk_stack(); + let _g = Guard(p.clone()); + s.push(b"hi").unwrap(); + s.try_extend_zeros(2, 2).unwrap(); + drop(s); + let s2 = BStack::open(&p).unwrap(); + assert_eq!(s2.len().unwrap(), 4); + assert_eq!(s2.peek(0).unwrap(), b"hi\x00\x00"); + } + + // ----------------------------------------------------------------------- + // get_batched (require atomic) + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_reads_multiple_ranges() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + let results = s.get_batched([0..5, 5..10]).unwrap(); + assert_eq!(results.len(), 2); + assert_eq!(results[0], b"hello"); + assert_eq!(results[1], b"world"); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_empty_input_returns_empty_vec() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let results = s + .get_batched(std::iter::empty::>()) + .unwrap(); + assert!(results.is_empty()); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_zero_length_range_returns_empty_buf() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let results = s.get_batched([3..3]).unwrap(); + assert_eq!(results.len(), 1); + assert!(results[0].is_empty()); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_out_of_bounds_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let err = s.get_batched([0..10]).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_end_less_than_start_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + let err = s.get_batched([5..3]).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_order_matches_input() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"abcde").unwrap(); + let results = s.get_batched([4..5, 0..2, 2..4]).unwrap(); + assert_eq!(results[0], b"e"); + assert_eq!(results[1], b"ab"); + assert_eq!(results[2], b"cd"); + } + + // ----------------------------------------------------------------------- + // get_batched_into (require atomic) + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_into_fills_buffers() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + let mut a = [0u8; 5]; + let mut b = [0u8; 5]; + s.get_batched_into([(0, a.as_mut_slice()), (5, b.as_mut_slice())]) + .unwrap(); + assert_eq!(&a, b"hello"); + assert_eq!(&b, b"world"); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_into_empty_input_is_noop() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + s.get_batched_into(std::iter::empty::<(u64, &mut [u8])>()) + .unwrap(); + assert_eq!(s.len().unwrap(), 5); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_into_out_of_bounds_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hi").unwrap(); + let mut buf = [0u8; 10]; + let err = s.get_batched_into([(0, buf.as_mut_slice())]).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_into_matches_get() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"abcdefgh").unwrap(); + let mut buf1 = [0u8; 3]; + let mut buf2 = [0u8; 4]; + s.get_batched_into([(0, buf1.as_mut_slice()), (4, buf2.as_mut_slice())]) + .unwrap(); + assert_eq!(&buf1, &s.get(0, 3).unwrap()[..]); + assert_eq!(&buf2, &s.get(4, 8).unwrap()[..]); + } + + // ----------------------------------------------------------------------- + // get_batched_gen (require atomic) + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_gen_reads_chain() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + let mut bufs: [Vec; 2] = [vec![0u8; 5], vec![0u8; 5]]; + let ptr0 = bufs[0].as_mut_ptr(); + let ptr1 = bufs[1].as_mut_ptr(); + let mut step = 0usize; + s.get_batched_gen(|| { + let r = match step { + 0 => Some((0u64, unsafe { std::slice::from_raw_parts_mut(ptr0, 5) })), + 1 => Some((5u64, unsafe { std::slice::from_raw_parts_mut(ptr1, 5) })), + _ => None, + }; + step += 1; + r + }) + .unwrap(); + assert_eq!(&bufs[0], b"hello"); + assert_eq!(&bufs[1], b"world"); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_gen_immediate_none_is_noop() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + s.get_batched_gen(|| None).unwrap(); + assert_eq!(s.len().unwrap(), 5); + } + + #[cfg(feature = "atomic")] + #[test] + fn get_batched_gen_out_of_bounds_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hi").unwrap(); + let mut buf = [0u8; 10]; + let ptr = buf.as_mut_ptr(); + let mut called = false; + let err = s + .get_batched_gen(|| { + if called { + return None; + } + called = true; + Some((0u64, unsafe { std::slice::from_raw_parts_mut(ptr, 10) })) + }) + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + // ----------------------------------------------------------------------- + // cross_exchange (require set + atomic) + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn cross_exchange_swaps_two_regions() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + s.cross_exchange(0, 5, 5).unwrap(); + assert_eq!(s.peek(0).unwrap(), b"worldhello"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn cross_exchange_n_zero_is_noop() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + s.cross_exchange(0, 5, 0).unwrap(); + assert_eq!(s.peek(0).unwrap(), b"helloworld"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn cross_exchange_overlapping_regions_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + let err = s.cross_exchange(0, 3, 5).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + assert_eq!(s.peek(0).unwrap(), b"helloworld"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn cross_exchange_out_of_bounds_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let err = s.cross_exchange(0, 3, 5).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn cross_exchange_locked_region_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + s.lock_up_to(5).unwrap(); + let err = s.cross_exchange(0, 5, 5).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + assert_eq!(s.peek(0).unwrap(), b"helloworld"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn cross_exchange_persists_across_reopen() { + let (s, p) = mk_stack(); + let _g = Guard(p.clone()); + s.push(b"abXY").unwrap(); + s.cross_exchange(0, 2, 2).unwrap(); + drop(s); + let s2 = BStack::open(&p).unwrap(); + assert_eq!(s2.peek(0).unwrap(), b"XYab"); + } + + // ----------------------------------------------------------------------- + // copy (require set + atomic) + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn copy_copies_bytes() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + s.copy(0, 5, 5).unwrap(); + assert_eq!(s.peek(0).unwrap(), b"hellohello"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn copy_overlapping_source_to_dest_correct() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"abcde").unwrap(); + // Copy [0,3) → [1,4): source read before write, so result is "aabcde"[0..5] = "aabcd" + s.copy(0, 1, 3).unwrap(); + assert_eq!(s.peek(0).unwrap(), b"aabce"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn copy_n_zero_is_noop() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + s.copy(0, 4, 0).unwrap(); + assert_eq!(s.peek(0).unwrap(), b"hello"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn copy_out_of_bounds_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let err = s.copy(0, 0, 10).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn copy_destination_in_locked_region_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + s.lock_up_to(5).unwrap(); + let err = s.copy(5, 0, 5).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + assert_eq!(s.peek(0).unwrap(), b"helloworld"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn copy_persists_across_reopen() { + let (s, p) = mk_stack(); + let _g = Guard(p.clone()); + s.push(b"abcd").unwrap(); + s.copy(0, 2, 2).unwrap(); + drop(s); + let s2 = BStack::open(&p).unwrap(); + assert_eq!(s2.peek(0).unwrap(), b"abab"); + } + + // ----------------------------------------------------------------------- + // eq_crds (require set + atomic) + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn eq_crds_match_swaps_b_returns_old() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"aabbcc").unwrap(); // A=[0,2)=aa, B=[2,4)=bb + let old = s.eq_crds(0, b"aa", 2, b"XX").unwrap(); + assert_eq!(old, Some(b"bb".to_vec())); + assert_eq!(s.get(2, 4).unwrap(), b"XX"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn eq_crds_no_match_returns_none() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"aabbcc").unwrap(); + let result = s.eq_crds(0, b"zz", 2, b"XX").unwrap(); + assert_eq!(result, None); + assert_eq!(s.get(2, 4).unwrap(), b"bb"); // unchanged + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn eq_crds_empty_a_always_matches() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let old = s.eq_crds(0, b"", 0, b"HH").unwrap(); + assert_eq!(old, Some(b"he".to_vec())); + assert_eq!(s.get(0, 2).unwrap(), b"HH"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn eq_crds_empty_b_buf_returns_some_empty_vec() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let old = s.eq_crds(0, b"hello", 0, b"").unwrap(); + assert_eq!(old, Some(Vec::new())); + assert_eq!(s.peek(0).unwrap(), b"hello"); // unchanged + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn eq_crds_b_in_locked_region_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + s.lock_up_to(5).unwrap(); + let err = s.eq_crds(5, b"world", 0, b"HELLO").unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn eq_crds_out_of_bounds_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hi").unwrap(); + let err = s.eq_crds(0, b"hello", 0, b"world").unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + // ----------------------------------------------------------------------- + // ne_crds (require set + atomic) + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn ne_crds_no_match_swaps_b_returns_old() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"aabbcc").unwrap(); + let old = s.ne_crds(0, b"zz", 2, b"XX").unwrap(); + assert_eq!(old, Some(b"bb".to_vec())); + assert_eq!(s.get(2, 4).unwrap(), b"XX"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn ne_crds_match_returns_none() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"aabbcc").unwrap(); + let result = s.ne_crds(0, b"aa", 2, b"XX").unwrap(); + assert_eq!(result, None); + assert_eq!(s.get(2, 4).unwrap(), b"bb"); // unchanged + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn ne_crds_empty_a_trivially_equal_returns_none() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let result = s.ne_crds(0, b"", 0, b"XX").unwrap(); + assert_eq!(result, None); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn ne_crds_b_in_locked_region_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"helloworld").unwrap(); + s.lock_up_to(5).unwrap(); + let err = s.ne_crds(5, b"XXXXX", 0, b"HELLO").unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + // ----------------------------------------------------------------------- + // masked_eq_crds (require set + atomic) + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn masked_eq_crds_match_swaps_b_returns_old() { + let (s, p) = mk_stack(); + let _g = Guard(p); + // A = [0xFF, 0x0F], expected = [0xFF, 0x0F], mask = [0xFF, 0xF0] + // masked A = [0xFF, 0x00], masked expected = [0xFF, 0x00] → equal + s.push(b"\xff\x0f--").unwrap(); + let old = s + .masked_eq_crds(0, b"\xff\xf0", b"\xff\x0f", 2, b"ZZ") + .unwrap(); + assert_eq!(old, Some(b"--".to_vec())); + assert_eq!(s.get(2, 4).unwrap(), b"ZZ"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn masked_eq_crds_no_match_returns_none() { + let (s, p) = mk_stack(); + let _g = Guard(p); + // A = [0x0F], expected = [0xFF], mask = [0xFF] → 0x0F != 0xFF + s.push(b"\x0f--").unwrap(); + let result = s.masked_eq_crds(0, b"\xff", b"\xff", 1, b"ZZ").unwrap(); + assert_eq!(result, None); + assert_eq!(s.get(1, 3).unwrap(), b"--"); // unchanged + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn masked_eq_crds_mask_len_mismatch_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let err = s + .masked_eq_crds(0, b"\xff\xff", b"\xff", 0, b"") + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn masked_eq_crds_partial_mask_ignores_masked_out_bits() { + let (s, p) = mk_stack(); + let _g = Guard(p); + // A = 0xAB, expected = 0xCD, mask = 0x00 → all bits masked out → always matches + s.push(b"\xab--").unwrap(); + let old = s.masked_eq_crds(0, b"\x00", b"\xcd", 1, b"ZZ").unwrap(); + assert_eq!(old, Some(b"--".to_vec())); + } + + // ----------------------------------------------------------------------- + // masked_ne_crds (require set + atomic) + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn masked_ne_crds_no_match_swaps_b_returns_old() { + let (s, p) = mk_stack(); + let _g = Guard(p); + // A = [0x0F], expected = [0xFF], mask = [0xFF] → 0x0F != 0xFF → swap + s.push(b"\x0f--").unwrap(); + let old = s.masked_ne_crds(0, b"\xff", b"\xff", 1, b"ZZ").unwrap(); + assert_eq!(old, Some(b"--".to_vec())); + assert_eq!(s.get(1, 3).unwrap(), b"ZZ"); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn masked_ne_crds_match_returns_none() { + let (s, p) = mk_stack(); + let _g = Guard(p); + // A = [0xFF, 0x0F], expected = [0xFF, 0x0F], mask = [0xFF, 0xF0] + // masked equal → no swap + s.push(b"\xff\x0f--").unwrap(); + let result = s + .masked_ne_crds(0, b"\xff\xf0", b"\xff\x0f", 2, b"ZZ") + .unwrap(); + assert_eq!(result, None); + assert_eq!(s.get(2, 4).unwrap(), b"--"); // unchanged + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn masked_ne_crds_mask_len_mismatch_returns_error() { + let (s, p) = mk_stack(); + let _g = Guard(p); + s.push(b"hello").unwrap(); + let err = s + .masked_ne_crds(0, b"\xff\xff", b"\xff", 0, b"") + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InvalidInput); + } + + #[cfg(all(feature = "set", feature = "atomic"))] + #[test] + fn masked_ne_crds_all_bits_masked_out_always_equal_returns_none() { + let (s, p) = mk_stack(); + let _g = Guard(p); + // mask = 0x00 → all bits masked, always equal → no swap + s.push(b"\xab--").unwrap(); + let result = s.masked_ne_crds(0, b"\x00", b"\xcd", 1, b"ZZ").unwrap(); + assert_eq!(result, None); + } } #[cfg(test)]