Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/embedded_ring_queue.zig
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,32 @@ pub fn EmbeddedRingQueue(comptime TElement: type) type {

// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

/// Returns true if the length of `storage` can be increased via `resizeNoCopy`.
pub fn canResize(self: *Self) bool {
if (self.len() == 0) return true;
const tail_index = self.tail % self.storage.len;
const head_index = self.head % self.storage.len;
return tail_index > head_index;
}

/// Replaces `storage` with `new_storage`. The caller guarantees that `new_storage`
/// contains the same data as the previous storage (ie. it's the same region of
/// memory but with a larger `len`, or the caller has copied the previous memory).
pub fn resize(self: *Self, new_storage: []Element) void {
// The backing storage can't be increased if the range of entries wraps past the end of the
// backing buffer, as we'd be adding invalid entries into the middle of the queue.
assert(new_storage.len >= self.storage.len and self.canResize());
const prev_len = self.len();
if (prev_len > 0) {
self.head = self.head % self.storage.len;
self.tail = self.head + prev_len;
}

self.storage = new_storage;
}

// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

pub fn enqueue(self: *Self, value: Element) Error!void {
if (self.enqueueIfNotFull(value)) {
return;
Expand Down
29 changes: 27 additions & 2 deletions src/pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,10 @@ pub fn Pool(

fn updateSlices(self: *Self) void {
var slice = self._storage.slice();
self._free_queue.storage = slice.items(.@"Pool._free_queue");

const free_queue_storage = slice.items(.@"Pool._free_queue");
self._free_queue.resize(free_queue_storage);

self._curr_cycle = slice.items(.@"Pool._curr_cycle");
inline for (column_fields, 0..) |column_field, i| {
const F = column_field.type;
Expand Down Expand Up @@ -659,7 +662,8 @@ pub fn Pool(

fn didGetNewHandleNoResize(self: *Self, handle: *AddressableHandle) bool {
if (self._storage.len < max_capacity and
self._storage.len < self._storage.capacity)
self._storage.len < self._storage.capacity and
self._free_queue.canResize())
{
const new_index = self._storage.addOneAssumeCapacity();
updateSlices(self);
Expand Down Expand Up @@ -1427,4 +1431,25 @@ test "Pool.setColumns() calls ColumnType.deinit()" {
try expectEqual(@as(u32, 6), deinit_count);
}

test "Adds and removes triggering resize" {
const TestPool = Pool(16, 16, void, struct {});

var pool = TestPool.init(std.testing.allocator);
defer pool.deinit();

var handles: std.ArrayListUnmanaged(TestPool.Handle) = .empty;
defer handles.deinit(std.testing.allocator);

for (0..16) |ix| {
for (0..5 * ix) |_| {
(try handles.addOne(std.testing.allocator)).* = try pool.add(.{});
}

for (0..3 * ix) |_| {
const handle = handles.orderedRemove(0);
try pool.remove(handle);
}
}
}

//------------------------------------------------------------------------------
Loading