Skip to content

Commit

Permalink
fix(broker): expand log block index if capacity is reached
Browse files Browse the repository at this point in the history
  • Loading branch information
menski committed Jan 18, 2019
1 parent 550b58f commit 1b6adfe
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,18 @@
* requested.
*/
public class LogBlockIndex implements SnapshotSupport {
protected final AtomicBuffer indexBuffer;
public static final double GROW_FACTOR = 1.25;

protected final int capacity;
private final Function<Integer, AtomicBuffer> bufferAllocator;

protected AtomicBuffer indexBuffer;
protected int capacity;

protected long lastVirtualPosition = -1;

public LogBlockIndex(int capacity, Function<Integer, AtomicBuffer> bufferAllocator) {
final int requiredBufferCapacity = dataOffset() + (capacity * entryLength());

this.indexBuffer = bufferAllocator.apply(requiredBufferCapacity);
this.bufferAllocator = bufferAllocator;
this.indexBuffer = allocateBuffer(capacity);
this.capacity = capacity;

reset();
Expand Down Expand Up @@ -164,9 +166,7 @@ public int addBlock(long logPosition, long storageAddr) {
final int newIndexSize = 1 + currentIndexSize;

if (newIndexSize > capacity) {
throw new RuntimeException(
String.format(
"LogBlockIndex capacity of %d entries reached. Cannot add new block.", capacity));
expandIndexBuffer();
}

if (lastVirtualPosition >= logPosition) {
Expand All @@ -189,6 +189,21 @@ public int addBlock(long logPosition, long storageAddr) {
return newIndexSize;
}

private AtomicBuffer allocateBuffer(int capacity) {
final int requiredBufferCapacity = dataOffset() + (capacity * entryLength());
return bufferAllocator.apply(requiredBufferCapacity);
}

private void expandIndexBuffer() {
final int newCapacity = Math.toIntExact(Math.round(capacity * GROW_FACTOR));
final AtomicBuffer newBuffer = allocateBuffer(newCapacity);

newBuffer.putBytes(0, indexBuffer.byteArray());

this.indexBuffer = newBuffer;
this.capacity = newCapacity;
}

/** @return the current size of the index */
public int size() {
return indexBuffer.getIntVolatile(indexSizeOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package io.zeebe.logstreams.log;

import static io.zeebe.logstreams.impl.log.index.LogBlockIndex.GROW_FACTOR;
import static org.assertj.core.api.Assertions.assertThat;

import io.zeebe.logstreams.impl.log.index.LogBlockIndex;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -40,11 +42,7 @@ public void setup() {
}

protected LogBlockIndex createNewBlockIndex(int capacity) {
return new LogBlockIndex(
capacity,
c -> {
return new UnsafeBuffer(ByteBuffer.allocate(c));
});
return new LogBlockIndex(capacity, c -> new UnsafeBuffer(ByteBuffer.allocate(c)));
}

@Test
Expand Down Expand Up @@ -74,22 +72,28 @@ public void shouldAddBlock() {
}

@Test
public void shouldNotAddBlockIfCapacityReached() {
public void shouldExpandAndAddBlockIfCapacityReached() {
// given
final int capacity = blockIndex.capacity();
int elements = 0;
final Function<Integer, Long> addrFunc = x -> x * 10L;

while (capacity > blockIndex.size()) {
blockIndex.addBlock(blockIndex.size(), 0);
for (; elements <= capacity; elements++) {
blockIndex.addBlock(elements, addrFunc.apply(elements));
}

// when
for (; elements < capacity + 5; elements++) {
blockIndex.addBlock(elements, addrFunc.apply(elements));
}

// then
exception.expect(RuntimeException.class);
exception.expectMessage(
String.format(
"LogBlockIndex capacity of %d entries reached. Cannot add new block.", capacity));
assertThat(blockIndex.size()).isEqualTo(elements);
assertThat(blockIndex.capacity()).isEqualTo(Math.round(capacity * GROW_FACTOR));

// when
blockIndex.addBlock(blockIndex.size(), 0);
for (int i = 0; i < elements; i++) {
assertThat(blockIndex.getAddress(i)).isEqualTo(addrFunc.apply(i));
}
}

@Test
Expand Down

0 comments on commit 1b6adfe

Please sign in to comment.