Skip to content

Commit

Permalink
[Java] Use putIntOrdered in OneToOneRingBuffer#tryClaim in order to e…
Browse files Browse the repository at this point in the history
…nsure that pre-zeroing of the next message header is done first. (#230)

* [Java] Write message body before the header.

* [Java] Use `putIntOrdered` to write negative length in the `tryClaim` to ensure that pre-zeroing of the next header happens before the current header is written.
  • Loading branch information
vyazelenko committed Dec 7, 2020
1 parent 357eb34 commit 9a6090c
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
Expand Up @@ -91,8 +91,8 @@ public boolean write(final int msgTypeId, final DirectBuffer srcBuffer, final in
buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);
UnsafeAccess.UNSAFE.storeFence();

buffer.putInt(typeOffset(recordIndex), msgTypeId);
buffer.putBytes(encodedMsgOffset(recordIndex), srcBuffer, offset, length);
buffer.putInt(typeOffset(recordIndex), msgTypeId);
buffer.putIntOrdered(lengthOffset(recordIndex), recordLength);

return true;
Expand Down
Expand Up @@ -115,10 +115,7 @@ public int tryClaim(final int msgTypeId, final int length)
}

buffer.putInt(typeOffset(recordIndex), msgTypeId);
// Note: putInt is used to write negative length of the message since we are not yet publishing the message and
// hence the order of writes of type field and negative length does not matter.
// It is safe to do so, because the header was pre-zeroed during the capacity claim.
buffer.putInt(lengthOffset(recordIndex), -recordLength);
buffer.putIntOrdered(lengthOffset(recordIndex), -recordLength);

return encodedMsgOffset(recordIndex);
}
Expand Down Expand Up @@ -445,6 +442,8 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength)
padding = toBufferEndLength;
}

buffer.putLongOrdered(tailPositionIndex, tail + alignedRecordLength + padding);

if (0 != padding)
{
buffer.putLong(0, 0L);
Expand All @@ -454,7 +453,6 @@ private int claimCapacity(final AtomicBuffer buffer, final int recordLength)
}

buffer.putLong(recordIndex + alignedRecordLength, 0L); // pre-zero next message header
buffer.putLongOrdered(tailPositionIndex, tail + alignedRecordLength + padding);

return recordIndex;
}
Expand Down
Expand Up @@ -77,8 +77,8 @@ public void shouldWriteToEmptyBuffer()

final InOrder inOrder = inOrder(buffer);
inOrder.verify(buffer).putIntOrdered(lengthOffset((int)tail), -recordLength);
inOrder.verify(buffer).putInt(typeOffset((int)tail), MSG_TYPE_ID);
inOrder.verify(buffer).putBytes(encodedMsgOffset((int)tail), srcBuffer, srcIndex, length);
inOrder.verify(buffer).putInt(typeOffset((int)tail), MSG_TYPE_ID);
inOrder.verify(buffer).putIntOrdered(lengthOffset((int)tail), recordLength);
}

Expand Down Expand Up @@ -148,8 +148,8 @@ public void shouldInsertPaddingRecordPlusMessageOnBufferWrap()
inOrder.verify(buffer).putIntOrdered(lengthOffset((int)tail), HEADER_LENGTH);

inOrder.verify(buffer).putIntOrdered(lengthOffset(0), -recordLength);
inOrder.verify(buffer).putInt(typeOffset(0), MSG_TYPE_ID);
inOrder.verify(buffer).putBytes(encodedMsgOffset(0), srcBuffer, srcIndex, length);
inOrder.verify(buffer).putInt(typeOffset(0), MSG_TYPE_ID);
inOrder.verify(buffer).putIntOrdered(lengthOffset(0), recordLength);
}

Expand Down Expand Up @@ -178,8 +178,8 @@ public void shouldInsertPaddingRecordPlusMessageOnBufferWrapWithHeadEqualToTail(
inOrder.verify(buffer).putIntOrdered(lengthOffset((int)tail), HEADER_LENGTH);

inOrder.verify(buffer).putIntOrdered(lengthOffset(0), -recordLength);
inOrder.verify(buffer).putInt(typeOffset(0), MSG_TYPE_ID);
inOrder.verify(buffer).putBytes(encodedMsgOffset(0), srcBuffer, srcIndex, length);
inOrder.verify(buffer).putInt(typeOffset(0), MSG_TYPE_ID);
inOrder.verify(buffer).putIntOrdered(lengthOffset(0), recordLength);
}

Expand Down
Expand Up @@ -73,8 +73,8 @@ public void shouldWriteToEmptyBuffer()
assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, srcIndex, length));

final InOrder inOrder = inOrder(buffer);
inOrder.verify(buffer).putLong((int)tail + alignedRecordLength, 0L);
inOrder.verify(buffer).putLongOrdered(TAIL_COUNTER_INDEX, tail + alignedRecordLength);
inOrder.verify(buffer).putLong((int)tail + alignedRecordLength, 0L);
inOrder.verify(buffer).putBytes(encodedMsgOffset((int)tail), srcBuffer, srcIndex, length);
inOrder.verify(buffer).putInt(typeOffset((int)tail), MSG_TYPE_ID);
inOrder.verify(buffer).putIntOrdered(lengthOffset((int)tail), recordLength);
Expand Down Expand Up @@ -137,12 +137,13 @@ public void shouldInsertPaddingRecordPlusMessageOnBufferWrap()
assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, srcIndex, length));

final InOrder inOrder = inOrder(buffer);
inOrder.verify(buffer).putLongOrdered(TAIL_COUNTER_INDEX, tail + alignedRecordLength + HEADER_LENGTH);

inOrder.verify(buffer).putLong(0, 0L);
inOrder.verify(buffer).putInt(typeOffset((int)tail), PADDING_MSG_TYPE_ID);
inOrder.verify(buffer).putIntOrdered(lengthOffset((int)tail), HEADER_LENGTH);

inOrder.verify(buffer).putLong(alignedRecordLength, 0L);
inOrder.verify(buffer).putLongOrdered(TAIL_COUNTER_INDEX, tail + alignedRecordLength + HEADER_LENGTH);

inOrder.verify(buffer).putBytes(encodedMsgOffset(0), srcBuffer, srcIndex, length);
inOrder.verify(buffer).putInt(typeOffset(0), MSG_TYPE_ID);
Expand All @@ -167,12 +168,13 @@ public void shouldInsertPaddingRecordPlusMessageOnBufferWrapWithHeadEqualToTail(
assertTrue(ringBuffer.write(MSG_TYPE_ID, srcBuffer, srcIndex, length));

final InOrder inOrder = inOrder(buffer);
inOrder.verify(buffer).putLongOrdered(TAIL_COUNTER_INDEX, tail + alignedRecordLength + HEADER_LENGTH);

inOrder.verify(buffer).putLong(0, 0L);
inOrder.verify(buffer).putInt(typeOffset((int)tail), PADDING_MSG_TYPE_ID);
inOrder.verify(buffer).putIntOrdered(lengthOffset((int)tail), HEADER_LENGTH);

inOrder.verify(buffer).putLong(alignedRecordLength, 0L);
inOrder.verify(buffer).putLongOrdered(TAIL_COUNTER_INDEX, tail + alignedRecordLength + HEADER_LENGTH);

inOrder.verify(buffer).putBytes(encodedMsgOffset(0), srcBuffer, srcIndex, length);
inOrder.verify(buffer).putInt(typeOffset(0), MSG_TYPE_ID);
Expand Down Expand Up @@ -395,10 +397,10 @@ public void tryClaimReturnsOffsetAtWhichMessageBodyCanBeWritten()
assertEquals(HEADER_LENGTH, index);

final InOrder inOrder = inOrder(buffer);
inOrder.verify(buffer).putLong(alignedRecordLength, 0L);
inOrder.verify(buffer).putLongOrdered(TAIL_COUNTER_INDEX, alignedRecordLength);
inOrder.verify(buffer).putLong(alignedRecordLength, 0L);
inOrder.verify(buffer).putInt(typeOffset(0), msgTypeId);
inOrder.verify(buffer).putInt(lengthOffset(0), -recordLength);
inOrder.verify(buffer).putIntOrdered(lengthOffset(0), -recordLength);
}

@Test
Expand Down

0 comments on commit 9a6090c

Please sign in to comment.