Skip to content

Commit

Permalink
[Java] Added the ability to do a zero copy publish by enabling the cl…
Browse files Browse the repository at this point in the history
…aim of a range in the publication log buffer.
  • Loading branch information
mjpt777 committed Nov 14, 2014
1 parent 09e415d commit 8ed70c4
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 53 deletions.
76 changes: 52 additions & 24 deletions aeron-client/src/main/java/uk/co/real_logic/aeron/Publication.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.co.real_logic.aeron;

import uk.co.real_logic.aeron.common.concurrent.logbuffer.BufferClaim;
import uk.co.real_logic.agrona.DirectBuffer;
import uk.co.real_logic.aeron.common.TermHelper;
import uk.co.real_logic.aeron.common.concurrent.logbuffer.LogAppender;
Expand Down Expand Up @@ -109,7 +110,18 @@ public int sessionId()

public void close()
{
release();
synchronized (clientConductor)
{
if (--refCount == 0)
{
clientConductor.releasePublication(this);

for (final ManagedBuffer managedBuffer : managedBuffers)
{
managedBuffer.close();
}
}
}
}

/**
Expand All @@ -129,11 +141,11 @@ public boolean offer(final DirectBuffer buffer)
* @param buffer containing message.
* @param offset offset in the buffer at which the encoded message begins.
* @param length in bytes of the encoded message.
* @return true if the message can be published otherwise false.
* @return true if the message was published otherwise false.
*/
public boolean offer(final DirectBuffer buffer, final int offset, final int length)
{
boolean offerSucceeded = false;
boolean succeeded = false;
final LogAppender logAppender = logAppenders[activeIndex];
final int currentTail = logAppender.tailVolatile();

Expand All @@ -142,7 +154,7 @@ public boolean offer(final DirectBuffer buffer, final int offset, final int leng
switch (logAppender.append(buffer, offset, length))
{
case SUCCESS:
offerSucceeded = true;
succeeded = true;
break;

case TRIPPED:
Expand All @@ -154,39 +166,55 @@ public boolean offer(final DirectBuffer buffer, final int offset, final int leng
}
}

return offerSucceeded;
return succeeded;
}

long registrationId()
/**
* Try to claim a range in the publication log for writing a message into with zero copy semantics.
*
* <b>Note:</b> This method can only be used for message lengths less than MTU size minus header.
*
* @param length of the range to claim.
* @param bufferClaim to be populate if the claim succeeds.
* @return true of the claim was successful otherwise false.
* @throws IllegalArgumentException if the length is greater than max payload size within an MTU.
*/
public boolean tryClaim(final int length, final BufferClaim bufferClaim)
{
return registrationId;
}
boolean succeeded = false;
final LogAppender logAppender = logAppenders[activeIndex];
final int currentTail = logAppender.tailVolatile();

void incRef()
{
synchronized (clientConductor)
if (isWithinFlowControlLimit(currentTail))
{
++refCount;
switch (logAppender.claim(length, bufferClaim))
{
case SUCCESS:
succeeded = true;
break;

case TRIPPED:
nextTerm();
break;

case FAILURE:
break;
}
}

return succeeded;
}

private void release()
long registrationId()
{
synchronized (clientConductor)
{
if (--refCount == 0)
{
clientConductor.releasePublication(this);
closeBuffers();
}
}
return registrationId;
}

private void closeBuffers()
void incRef()
{
for (final ManagedBuffer managedBuffer : managedBuffers)
synchronized (clientConductor)
{
managedBuffer.close();
++refCount;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.verification.VerificationMode;
import uk.co.real_logic.aeron.common.concurrent.logbuffer.BufferClaim;
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
import uk.co.real_logic.aeron.common.concurrent.logbuffer.LogAppender;
import uk.co.real_logic.aeron.common.concurrent.logbuffer.LogBufferDescriptor;
Expand All @@ -34,7 +35,7 @@
import static uk.co.real_logic.aeron.common.TermHelper.BUFFER_COUNT;
import static uk.co.real_logic.aeron.common.TermHelper.termIdToBufferIndex;
import static uk.co.real_logic.agrona.concurrent.broadcast.RecordDescriptor.RECORD_ALIGNMENT;
import static uk.co.real_logic.aeron.common.concurrent.logbuffer.LogAppender.AppendStatus.*;
import static uk.co.real_logic.aeron.common.concurrent.logbuffer.LogAppender.ActionStatus.*;
import static uk.co.real_logic.aeron.common.concurrent.logbuffer.LogBufferDescriptor.MIN_LOG_SIZE;

public class PublicationTest
Expand Down Expand Up @@ -71,6 +72,7 @@ public void setUp()
final byte[] header = new byte[DataHeaderFlyweight.HEADER_LENGTH];
headers[i] = header;
when(appenders[i].append(any(), anyInt(), anyInt())).thenReturn(SUCCESS);
when(appenders[i].claim(anyInt(), any())).thenReturn(SUCCESS);
when(appenders[i].defaultHeader()).thenReturn(header);
when(appenders[i].capacity()).thenReturn(MIN_LOG_SIZE);
}
Expand Down Expand Up @@ -128,6 +130,30 @@ public void shouldRotateWhenAppendTrips()
assertThat(dataHeaderFlyweight.termId(), is(TERM_ID_1 + 1));
}

@Test
public void shouldRotateWhenClaimTrips()
{
when(appenders[termIdToBufferIndex(TERM_ID_1)].claim(anyInt(), any())).thenReturn(TRIPPED);
when(appenders[termIdToBufferIndex(TERM_ID_1)].tailVolatile()).thenReturn(MIN_LOG_SIZE - RECORD_ALIGNMENT);
when(limit.position()).thenReturn(Long.MAX_VALUE);

final BufferClaim bufferClaim = new BufferClaim();
assertFalse(publication.tryClaim(SEND_BUFFER_CAPACITY, bufferClaim));
assertTrue(publication.tryClaim(SEND_BUFFER_CAPACITY, bufferClaim));

final InOrder inOrder = inOrder(appenders[0], appenders[1], appenders[2]);
inOrder.verify(appenders[termIdToBufferIndex(TERM_ID_1 + 1)]).status();
inOrder.verify(appenders[termIdToBufferIndex(TERM_ID_1 + 2)]).statusOrdered(LogBufferDescriptor.NEEDS_CLEANING);

// written data to the next record
inOrder.verify(appenders[termIdToBufferIndex(TERM_ID_1 + 1)])
.claim(SEND_BUFFER_CAPACITY, bufferClaim);

// updated the term id in the header
dataHeaderFlyweight.wrap(headers[termIdToBufferIndex(TERM_ID_1 + 1)]);
assertThat(dataHeaderFlyweight.termId(), is(TERM_ID_1 + 1));
}

@Test
public void shouldUnmapBuffersWhenReleased() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
*/
public class LogAppender extends LogBuffer
{
public enum AppendStatus
public enum ActionStatus
{
SUCCESS,
TRIPPED,
Expand All @@ -51,7 +51,7 @@ public enum AppendStatus
private final int headerLength;
private final int maxMessageLength;
private final int maxFrameLength;
private final int maxPayload;
private final int maxPayloadLength;

/**
* Construct a view over a log buffer and state buffer for appending frames.
Expand All @@ -73,7 +73,7 @@ public LogAppender(
this.headerLength = defaultHeader.length;
this.maxFrameLength = maxFrameLength;
this.maxMessageLength = FrameDescriptor.calculateMaxMessageLength(capacity());
this.maxPayload = maxFrameLength - headerLength;
this.maxPayloadLength = maxFrameLength - headerLength;
}

/**
Expand All @@ -93,7 +93,7 @@ public int maxMessageLength()
*/
public int maxPayloadLength()
{
return maxPayload;
return maxPayloadLength;
}

/**
Expand Down Expand Up @@ -122,23 +122,74 @@ public byte[] defaultHeader()
* @param srcBuffer containing the encoded message.
* @param srcOffset at which the encoded message begins.
* @param length of the message in bytes.
* @return SUCCESS if appended in the log, FAILURE if not appended in the log, TRIPPED if first failure.
* @return SUCCESS if append was successful, FAILURE if beyond end of the log in the log, TRIPPED if first failure.
* @throws IllegalArgumentException if the length is greater than {@link #maxMessageLength()}
*/
public AppendStatus append(final DirectBuffer srcBuffer, final int srcOffset, final int length)
public ActionStatus append(final DirectBuffer srcBuffer, final int srcOffset, final int length)
{
checkMessageLength(length);

if (length <= maxPayload)
if (length <= maxPayloadLength)
{
return appendUnfragmentedMessage(srcBuffer, srcOffset, length);
}

return appendFragmentedMessage(srcBuffer, srcOffset, length);
}

private AppendStatus appendUnfragmentedMessage(final DirectBuffer srcBuffer, final int srcOffset, final int length)
/**
* Claim a range within the buffer for recording a message payload.
*
* @param length of the message payload
* @param bufferClaim to be completed for the claim if successful.
* @return SUCCESS if claim was successful, FAILURE if beyond end of the log in the log, TRIPPED if first failure.
*/
public ActionStatus claim(final int length, final BufferClaim bufferClaim)
{
if (length > maxPayloadLength)
{
final String s = String.format("claim exceeds maxPayloadLength of %d, length=%d", maxPayloadLength, length);
throw new IllegalArgumentException(s);
}

final int headerLength = this.headerLength;
final int frameLength = length + headerLength;
final int alignedLength = align(frameLength, FRAME_ALIGNMENT);
final int frameOffset = getTailAndAdd(alignedLength);

final UnsafeBuffer logBuffer = logBuffer();
final int capacity = capacity();
if (isBeyondLogBufferCapacity(frameOffset, alignedLength, capacity))
{
if (frameOffset < capacity)
{
appendPaddingFrame(logBuffer, frameOffset);
return ActionStatus.TRIPPED;
}
else if (frameOffset == capacity)
{
return ActionStatus.TRIPPED;
}

return ActionStatus.FAILURE;
}

logBuffer.putBytes(frameOffset, defaultHeader, 0, headerLength);
frameFlags(logBuffer, frameOffset, UNFRAGMENTED);
frameTermOffset(logBuffer, frameOffset, frameOffset);

bufferClaim.buffer(logBuffer)
.offset(frameOffset + headerLength)
.length(length)
.frameLengthOffset(lengthOffset(frameOffset))
.frameLength(frameLength);

return ActionStatus.SUCCESS;
}

private ActionStatus appendUnfragmentedMessage(final DirectBuffer srcBuffer, final int srcOffset, final int length)
{
final int headerLength = this.headerLength;
final int frameLength = length + headerLength;
final int alignedLength = align(frameLength, FRAME_ALIGNMENT);
final int frameOffset = getTailAndAdd(alignedLength);
Expand All @@ -150,14 +201,14 @@ private AppendStatus appendUnfragmentedMessage(final DirectBuffer srcBuffer, fin
if (frameOffset < capacity)
{
appendPaddingFrame(logBuffer, frameOffset);
return AppendStatus.TRIPPED;
return ActionStatus.TRIPPED;
}
else if (frameOffset == capacity)
{
return AppendStatus.TRIPPED;
return ActionStatus.TRIPPED;
}

return AppendStatus.FAILURE;
return ActionStatus.FAILURE;
}

logBuffer.putBytes(frameOffset, defaultHeader, 0, headerLength);
Expand All @@ -167,13 +218,14 @@ else if (frameOffset == capacity)
frameTermOffset(logBuffer, frameOffset, frameOffset);
frameLengthOrdered(logBuffer, frameOffset, frameLength);

return AppendStatus.SUCCESS;
return ActionStatus.SUCCESS;
}

private AppendStatus appendFragmentedMessage(final DirectBuffer srcBuffer, final int srcOffset, final int length)
private ActionStatus appendFragmentedMessage(final DirectBuffer srcBuffer, final int srcOffset, final int length)
{
final int numMaxPayloads = length / maxPayload;
final int remainingPayload = length % maxPayload;
final int numMaxPayloads = length / maxPayloadLength;
final int remainingPayload = length % maxPayloadLength;
final int headerLength = this.headerLength;
final int requiredCapacity =
align(remainingPayload + headerLength, FRAME_ALIGNMENT) + (numMaxPayloads * maxFrameLength);
int frameOffset = getTailAndAdd(requiredCapacity);
Expand All @@ -185,21 +237,21 @@ private AppendStatus appendFragmentedMessage(final DirectBuffer srcBuffer, final
if (frameOffset < capacity)
{
appendPaddingFrame(logBuffer, frameOffset);
return AppendStatus.TRIPPED;
return ActionStatus.TRIPPED;
}
else if (frameOffset == capacity)
{
return AppendStatus.TRIPPED;
return ActionStatus.TRIPPED;
}

return AppendStatus.FAILURE;
return ActionStatus.FAILURE;
}

byte flags = BEGIN_FRAG;
int remaining = length;
do
{
final int bytesToWrite = Math.min(remaining, maxPayload);
final int bytesToWrite = Math.min(remaining, maxPayloadLength);
final int frameLength = bytesToWrite + headerLength;
final int alignedLength = align(frameLength, FRAME_ALIGNMENT);

Expand All @@ -210,7 +262,7 @@ else if (frameOffset == capacity)
srcOffset + (length - remaining),
bytesToWrite);

if (remaining <= maxPayload)
if (remaining <= maxPayloadLength)
{
flags |= END_FRAG;
}
Expand All @@ -225,7 +277,7 @@ else if (frameOffset == capacity)
}
while (remaining > 0);

return AppendStatus.SUCCESS;
return ActionStatus.SUCCESS;
}

private boolean isBeyondLogBufferCapacity(final int frameOffset, final int alignedFrameLength, final int capacity)
Expand All @@ -252,11 +304,7 @@ private void checkMessageLength(final int length)
{
if (length > maxMessageLength)
{
final String s = String.format(
"encoded message exceeds maxMessageLength of %d, length=%d",
maxMessageLength,
length);

final String s = String.format("encoded message exceeds maxMessageLength of %d, length=%d", maxMessageLength, length);
throw new IllegalArgumentException(s);
}
}
Expand Down

0 comments on commit 8ed70c4

Please sign in to comment.