Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to reply asynchronously to a ResendRequest #505

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Helps IDEA users apply some of the formatting rules enforced by checkstyle

root = true

[*.java]
indent_size = 4
max_line_length = 120
ij_java_method_brace_style = next_line
ij_java_block_brace_style = next_line
ij_java_else_on_new_line = true
ij_java_class_brace_style = next_line
ij_java_space_after_type_cast = false
ij_any_catch_on_new_line = true
ij_any_spaces_around_equality_operators = true
ij_java_continuation_indent_size = 4
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.artio.session;

import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
import uk.co.real_logic.artio.util.AsciiBuffer;

/**
* Customer interface to control whether resend requests are responded to or not.
Expand All @@ -33,11 +34,16 @@ public interface ResendRequestController
* (eg: begin sequence number > end sequence number or begin sequence number > last sent sequence number)
* then this callback won't be invoked.
*
* SessionProxy is now also notified immediately after this method is called, with additional parameters that
* allow to delay the processing of the ResendRequest. The SessionProxy can thus override the decision made by
* ResendRequestController.
*
* @param session the session that has received the resend request.
* @param resendRequest the decoded resend request in question.
* @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
* resend request uses 0 for its endSeqNo parameter.
* @param response respond to the resend request by calling methods on this object.
* @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
*/
void onResend(
Session session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package uk.co.real_logic.artio.session;

import uk.co.real_logic.artio.builder.AbstractRejectEncoder;
import uk.co.real_logic.artio.util.AsciiBuffer;

public class ResendRequestResponse
{
private boolean result;
private boolean resendNow;
private boolean delayProcessing;

private int refTagId;
private AbstractRejectEncoder rejectEncoder;
Expand All @@ -29,7 +31,8 @@ public class ResendRequestResponse
*/
public void resend()
{
result = true;
resendNow = true;
delayProcessing = false;
}

/**
Expand All @@ -41,14 +44,16 @@ public void reject(final int refTagId)
{
this.refTagId = refTagId;

result = false;
resendNow = false;
delayProcessing = false;
}

public void reject(final AbstractRejectEncoder rejectEncoder)
{
this.rejectEncoder = rejectEncoder;

result = false;
resendNow = false;
delayProcessing = false;
}

AbstractRejectEncoder rejectEncoder()
Expand All @@ -58,11 +63,36 @@ AbstractRejectEncoder rejectEncoder()

boolean result()
{
return result;
return resendNow;
}

int refTagId()
{
return refTagId;
}

/**
* Since version 0.148(?) it is possible to postpone the execution of a ResendRequest. This method indicates
* that the request must not be processed nor rejected. It is the responsibility of the caller to call
* Session.executeResendRequest() when ready.
*
* @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
* @return true if response to the request must not be done immediately
*/
public boolean shouldDelay()
{
return delayProcessing;
}

/**
* This method indicates that the request must not be processed nor rejected. It is the responsibility of
* the caller to call Session.executeResendRequest() when ready.
*
* @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
*/
public void delay()
{
resendNow = false;
delayProcessing = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2097,50 +2097,99 @@ Action onResendRequest(
final ResendRequestResponse resendRequestResponse = this.resendRequestResponse;
if (!backpressuredResendRequestResponse)
{
// historic behavior
resendRequestController.onResend(this, resendRequest, correctedEndSeqNo, resendRequestResponse);

// also invoke the proxy
if (Pressure.isBackPressured(proxy.onResend(this, resendRequest,
correctedEndSeqNo, resendRequestResponse, messageBuffer, messageOffset, messageLength)))
{
return ABORT;
}
}

if (resendRequestResponse.result())
{
final long correlationId = generateReplayCorrelationId();

// Notify the sender end point that a replay is going to happen.
if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
return executeResendRequest(
beginSeqNum, correctedEndSeqNo, oldLastReceivedMsgSeqNum, messageBuffer, messageOffset, messageLength
);
}
else if (!resendRequestResponse.shouldDelay())
{
final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
if (rejectEncoder != null)
{
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
correlationId, outboundPublication))
{
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
backpressuredResendRequestResponse = true;
backpressuredOutboundValidResendRequest = true;
return ABORT;
}

backpressuredOutboundValidResendRequest = false;
return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
}

return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
}
else
{
return CONTINUE;
}
}

private Action executeResendRequest(
final int beginSeqNum, final int correctedEndSeqNo, final int oldLastReceivedMsgSeqNum,
final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
)
{
final long correlationId = generateReplayCorrelationId();

// Notify the sender end point that a replay is going to happen.
if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
{
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
correlationId, inboundPublication))
correlationId, outboundPublication))
{
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
if (lastReceivedMsgSeqNum >= 0)
{
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
}
backpressuredResendRequestResponse = true;
backpressuredOutboundValidResendRequest = true;
return ABORT;
}

backpressuredResendRequestResponse = false;
replaysInFlight++;
return CONTINUE;
backpressuredOutboundValidResendRequest = false;
}
else

if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
correlationId, inboundPublication))
{
final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
if (rejectEncoder != null)
if (lastReceivedMsgSeqNum >= 0)
{
return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
}

return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
backpressuredResendRequestResponse = true;
return ABORT;
}

backpressuredResendRequestResponse = false;
replaysInFlight++;
return CONTINUE;
}


/**
* Executes a resend request. Used to be done immediately when receiving such a request, but
* it is now possible to delay the execution, so this method must be called when ready.
*
* @param beginSeqNum begin sequence number found in received ResendRequest
* @param correctedEndSeqNo corrected end sequence number
* @param messageBuffer buffer containing the ResendRequest message
* @param messageOffset offset of message in buffer
* @param messageLength length of message in buffer
* @return an Action: be sure to handle back pressure!
* @see SessionProxy#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
*/
public Action executeResendRequest(
final int beginSeqNum, final int correctedEndSeqNo,
final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength
)
{
return executeResendRequest(beginSeqNum, correctedEndSeqNo, -1, messageBuffer, messageOffset, messageLength);
}

private Action sendCustomReject(final int oldLastReceivedMsgSeqNum, final AbstractRejectEncoder rejectEncoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package uk.co.real_logic.artio.session;

import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
import uk.co.real_logic.artio.dictionary.FixDictionary;
import uk.co.real_logic.artio.fields.RejectReason;
import uk.co.real_logic.artio.messages.CancelOnDisconnectOption;
import uk.co.real_logic.artio.messages.DisconnectReason;
import uk.co.real_logic.artio.util.AsciiBuffer;

/**
* A proxy that allows users to hook the sending of FIX session protocol messages through an external system. This can
Expand Down Expand Up @@ -116,4 +118,34 @@ long sendSequenceReset(
* @return true if asynchronous, false otherwise.
*/
boolean isAsync();

/**
* Equivalent to onResend() method in ResendRequestController, but with finer control. It receives the buffer
* containing the ResendRequest message, so a copy can be made in case we want to delay the processing of the
* Resend request.
*
* @param session the session that has received the resend request.
* @param resendRequest the decoded resend request in question.
* @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
* resend request uses 0 for its endSeqNo parameter.
* @param response respond to the resend request by calling methods on this object.
* @param messageBuffer buffer containing the ResendRequest message
* @param messageOffset offset of message in buffer
* @param messageLength length of message in buffer
* @return a null or negative number if back pressured
* @see Session#executeResendRequest(int, int, AsciiBuffer, int, int)
* @see ResendRequestController#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse)
*/
default long onResend(
Session session,
AbstractResendRequestDecoder resendRequest,
int correctedEndSeqNo,
ResendRequestResponse response,
AsciiBuffer messageBuffer,
int messageOffset,
int messageLength
)
{
return 1;
}
}
Loading
Loading