Skip to content
Browse files

Initial refactoring with new ch-commons-util package

  • Loading branch information...
2 parents dd68508 + 7b598e3 commit 0126bde8934e2bbccab0950d66ce2c9f06da59a3 @jjlauer jjlauer committed May 2, 2011
View
11 ReleaseNotes.txt
@@ -3,6 +3,17 @@ http://twitter.com/
http://www.cloudhopper.com/
SMPP Library
--------------------------------------------------------------------------------
+4.0 - ?
+
+ * Added SmppFuture as a replacement for using Window object from ch-commons-util
+ * Modified internals of SmppSession to call the "requestWindow"
+ a "sendWindow". Since either endpoint of a session can technically
+ generate requests, this actually represents the window just for sending.
+ * Added 2 new time measurements on asynchronous responses (window wait time,
+ and response time).
+ * Added windowWaitTimeout
+
+
3.2 - 2011-04-28
* Upgraded netty dependency from 3.1.5 to 3.2.4 (to keep it current, not to
fix any specific problem)
View
4 pom.xml
@@ -7,7 +7,7 @@
<groupId>cloudhopper</groupId>
<artifactId>ch-smpp</artifactId>
<packaging>jar</packaging>
- <version>3.2</version>
+ <version>4.0-SNAPSHOT</version>
<name>ch-smpp</name>
<parent>
@@ -100,7 +100,7 @@
<netty.version>[3.2,)</netty.version>
<ch-commons-charset.version>[2.0,)</ch-commons-charset.version>
<ch-commons-gsm.version>[2.0,)</ch-commons-gsm.version>
- <ch-commons-util.version>[4.0,)</ch-commons-util.version>
+ <ch-commons-util.version>3.3-SNAPSHOT</ch-commons-util.version>
</properties>
</project>
View
27 src/main/java/com/cloudhopper/smpp/PduAsyncResponse.java
@@ -26,10 +26,33 @@
*/
public interface PduAsyncResponse {
+ /**
+ * Gets the original request associated with the response.
+ * @return The original request
+ */
public PduRequest getRequest();
+ /**
+ * Gets the response from the remote endpoint.
+ * @return The response
+ */
public PduResponse getResponse();
- public long getProcessingTime();
-
+ /**
+ * Gets the amount of time required to accept the request into the session's
+ * send window (for a free slot to open up).
+ * @return The amount of time (in ms) to accept the request into the send window
+ */
+ public long getWindowWaitTime();
+
+ /**
+ * Gets the amount of time required for the remote endpoint to acknowledge
+ * the request with a response. This value is based on the time the request
+ * went out on the wire till a response was received on the wire. Does not
+ * include any time required waiting for a slot in the window to become
+ * available.
+ * @return The amount of time (in ms) to receive a response from remote endpoint
+ */
+ public long getResponseTime();
+
}
View
4 src/main/java/com/cloudhopper/smpp/SmppConstants.java
@@ -41,8 +41,12 @@
public static final byte VERSION_3_3 = 0x33;
public static final byte VERSION_3_4 = 0x34;
+ public static final int DEFAULT_WINDOW_SIZE = 1;
+ public static final long DEFAULT_WINDOW_WAIT_TIMEOUT = 60000;
public static final long DEFAULT_CONNECT_TIMEOUT = 10000;
public static final long DEFAULT_BIND_TIMEOUT = 5000;
+ public static final long DEFAULT_REQUEST_EXPIRY_TIMEOUT = -1; // disabled
+
//
// SUBMIT_MULTI destination type flags
View
29 src/main/java/com/cloudhopper/smpp/SmppFuture.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2011 Twitter, Inc..
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudhopper.smpp;
+
+import com.cloudhopper.commons.util.windowing.WindowFuture;
+import com.cloudhopper.smpp.pdu.PduRequest;
+import com.cloudhopper.smpp.pdu.PduResponse;
+
+/**
+ * Interface representing either an asynchronous or synchronous operation in SMPP.
+ *
+ * @author joelauer
+ */
+public interface SmppFuture extends WindowFuture<Integer,PduRequest,PduResponse> {
+
+}
View
12 src/main/java/com/cloudhopper/smpp/SmppSession.java
@@ -14,8 +14,8 @@
package com.cloudhopper.smpp;
-import com.cloudhopper.commons.util.windowing.RequestFuture;
import com.cloudhopper.commons.util.windowing.Window;
+import com.cloudhopper.commons.util.windowing.WindowFuture;
import com.cloudhopper.smpp.type.SmppChannelException;
import com.cloudhopper.smpp.type.SmppTimeoutException;
import com.cloudhopper.smpp.pdu.EnquireLink;
@@ -268,19 +268,19 @@
* is returned.
* @throws RecoverablePduException Thrown when a recoverable PDU error occurs.
* A recoverable PDU error includes the partially decoded PDU in order
- * to generate a negative acknowledgement (NACK) response.
+ * to generate a negative acknowledgment (NACK) response.
* @throws UnrecoverablePduException Thrown when an unrecoverable PDU error
- * occurs. This indicates a seriours error occurred and usually indicates
+ * occurs. This indicates a serious error occurred and usually indicates
* the session should be immediately terminated.
* @throws SmppTimeoutException A timeout occurred while waiting for a response
- * from the remote endpoint. A timeout can either occur with an unresponse
+ * from the remote endpoint. A timeout can either occur with an unresponsive
* remote endpoint or the bytes were not written in time.
* @throws SmppChannelException Thrown when the underlying socket/channel was
* unable to write the request.
* @throws InterruptedException The calling thread was interrupted while waiting
* to acquire a lock or write/read the bytes from the socket/channel.
*/
- public RequestFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest request, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;
+ public WindowFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest request, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;
/**
* Main underlying method for sending a response PDU to the remote endpoint.
@@ -299,4 +299,4 @@
* to acquire a lock or write/read the bytes from the socket/channel.
*/
public void sendResponsePdu(PduResponse response) throws RecoverablePduException, UnrecoverablePduException, SmppChannelException, InterruptedException;
-}
+}
View
45 src/main/java/com/cloudhopper/smpp/SmppSessionConfiguration.java
@@ -38,8 +38,10 @@
private long bindTimeout; // length of time to wait for a bind response
// logging settings
private LoggingOptions loggingOptions;
+ private long windowWaitTimeout;
// if > 0, then activated
private long requestExpiryTimeout;
+ private long windowMonitorInterval;
public SmppSessionConfiguration() {
this(SmppBindType.TRANSCEIVER, null, null, null);
@@ -50,15 +52,17 @@ public SmppSessionConfiguration(SmppBindType type, String systemId, String passw
}
public SmppSessionConfiguration(SmppBindType type, String systemId, String password, String systemType) {
- this.windowSize = 1;
+ this.windowSize = SmppConstants.DEFAULT_WINDOW_SIZE;
this.type = type;
this.systemId = systemId;
this.password = password;
this.systemType = systemType;
this.interfaceVersion = SmppConstants.VERSION_3_4;
this.bindTimeout = SmppConstants.DEFAULT_BIND_TIMEOUT;
this.loggingOptions = new LoggingOptions();
- this.requestExpiryTimeout = 0;
+ this.windowWaitTimeout = SmppConstants.DEFAULT_WINDOW_WAIT_TIMEOUT;
+ this.requestExpiryTimeout = SmppConstants.DEFAULT_REQUEST_EXPIRY_TIMEOUT;
+ this.windowMonitorInterval = -1;
}
public void setName(String value) {
@@ -141,12 +145,47 @@ public void setLoggingOptions(LoggingOptions loggingOptions) {
this.loggingOptions = loggingOptions;
}
+ public long getWindowWaitTimeout() {
+ return windowWaitTimeout;
+ }
+
+ /**
+ * Set the amount of time to wait until a slot opens up in the sendWindow.
+ * Defaults to 60000.
+ * @param windowWaitTimeout The amount of time to wait (in ms) until a slot
+ * in the sendWindow becomes available.
+ */
+ public void setWindowWaitTimeout(long windowWaitTimeout) {
+ this.windowWaitTimeout = windowWaitTimeout;
+ }
+
public long getRequestExpiryTimeout() {
return requestExpiryTimeout;
}
+ /**
+ * Set the amount of time to wait for an endpoint to respond to
+ * a request before it expires. Defaults to disabled (-1).
+ * @param requestExpiryTimeout The amount of time to wait (in ms) before
+ * an unacknowledged request expires. -1 disables.
+ */
public void setRequestExpiryTimeout(long requestExpiryTimeout) {
this.requestExpiryTimeout = requestExpiryTimeout;
}
-
+
+ public long getWindowMonitorInterval() {
+ return windowMonitorInterval;
+ }
+
+ /**
+ * Sets the amount of time between executions of monitoring the window
+ * for requests that expire. It's recommended that this generally either
+ * matches or is half the value of requestExpiryTimeout. Therefore, at worst
+ * a request would could take up 1.5X the requestExpiryTimeout to clear out.
+ * @param windowMonitorInterval The amount of time to wait (in ms) between
+ * executions of monitoring the window.
+ */
+ public void setWindowMonitorInterval(long windowMonitorInterval) {
+ this.windowMonitorInterval = windowMonitorInterval;
+ }
}
View
26 src/main/java/com/cloudhopper/smpp/impl/DefaultPduAsyncResponse.java
@@ -15,7 +15,7 @@
package com.cloudhopper.smpp.impl;
import com.cloudhopper.commons.util.HexUtil;
-import com.cloudhopper.commons.util.windowing.ResponseFuture;
+import com.cloudhopper.commons.util.windowing.WindowFuture;
import com.cloudhopper.smpp.PduAsyncResponse;
import com.cloudhopper.smpp.pdu.PduRequest;
import com.cloudhopper.smpp.pdu.PduResponse;
@@ -26,32 +26,42 @@
* @author joelauer
*/
public class DefaultPduAsyncResponse implements PduAsyncResponse {
- // we internally "wrap" a PDU response future
- private final ResponseFuture<Integer,PduRequest,PduResponse> future;
+ // we internally "wrap" a PDU window future
+ private final WindowFuture<Integer,PduRequest,PduResponse> future;
- public DefaultPduAsyncResponse(ResponseFuture<Integer,PduRequest,PduResponse> future) {
+ public DefaultPduAsyncResponse(WindowFuture<Integer,PduRequest,PduResponse> future) {
this.future = future;
}
+ @Override
public PduRequest getRequest() {
return future.getRequest();
}
+ @Override
public PduResponse getResponse() {
return future.getResponse();
}
+
+ @Override
+ public long getWindowWaitTime() {
+ return future.getOfferToAcceptTime();
+ }
- public long getProcessingTime() {
- return future.getProcessingTime();
+ @Override
+ public long getResponseTime() {
+ return future.getAcceptToDoneTime();
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(100);
buf.append("smpp_async_resp: seqNum [0x");
buf.append(HexUtil.toHexString(this.future.getKey()));
- buf.append("] processingTime [");
- buf.append(this.future.getProcessingTime());
+ buf.append("] windowWaitTime [");
+ buf.append(getWindowWaitTime());
+ buf.append("] responseTime [");
+ buf.append(getWindowWaitTime());
buf.append(" ms] reqType [");
buf.append(getRequest().getName());
buf.append("] respType [");
View
2 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppClient.java
@@ -18,7 +18,6 @@
import com.cloudhopper.smpp.util.DaemonExecutors;
import com.cloudhopper.smpp.SmppBindType;
import com.cloudhopper.smpp.type.SmppChannelException;
-import com.cloudhopper.smpp.SmppConstants;
import com.cloudhopper.smpp.SmppSession;
import com.cloudhopper.smpp.SmppSessionConfiguration;
import com.cloudhopper.smpp.SmppSessionHandler;
@@ -60,7 +59,6 @@
private ChannelGroup channels;
private SmppClientConnector clientConnector;
-
private ExecutorService executors;
private NioClientSocketChannelFactory channelFactory;
private ClientBootstrap clientBootstrap;
View
17 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppServer.java
@@ -37,7 +37,7 @@
import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
@@ -70,12 +70,18 @@
private final Timer bindTimer;
// shared instance of a session id generator (an atomic long)
private final AtomicLong sessionIdSequence;
+ // shared instance for monitor executors
+ private final ScheduledExecutorService monitorExecutor;
public DefaultSmppServer(SmppServerConfiguration configuration, SmppServerHandler serverHandler) {
this(configuration, DaemonExecutors.newCachedDaemonThreadPool(), serverHandler);
}
+
+ public DefaultSmppServer(final SmppServerConfiguration configuration, ExecutorService executor, SmppServerHandler serverHandler) {
+ this(configuration, DaemonExecutors.newCachedDaemonThreadPool(), serverHandler, null);
+ }
- public DefaultSmppServer(final SmppServerConfiguration configuration, ExecutorService executors, SmppServerHandler serverHandler) {
+ public DefaultSmppServer(final SmppServerConfiguration configuration, ExecutorService executor, SmppServerHandler serverHandler, ScheduledExecutorService monitorExecutor) {
this.configuration = configuration;
// the same group we'll put every server channel
this.channels = new DefaultChannelGroup();
@@ -85,9 +91,9 @@ public DefaultSmppServer(final SmppServerConfiguration configuration, ExecutorSe
// a factory for creating channels (connections)
if (configuration.isNonBlockingSocketsEnabled()) {
- this.channelFactory = new NioServerSocketChannelFactory(this.bossThreadPool, executors, configuration.getMaxConnections());
+ this.channelFactory = new NioServerSocketChannelFactory(this.bossThreadPool, executor, configuration.getMaxConnections());
} else {
- this.channelFactory = new OioServerSocketChannelFactory(this.bossThreadPool, executors);
+ this.channelFactory = new OioServerSocketChannelFactory(this.bossThreadPool, executor);
}
// tie the server bootstrap to this server socket channel factory
@@ -104,6 +110,7 @@ public DefaultSmppServer(final SmppServerConfiguration configuration, ExecutorSe
// NOTE: this would permit us to customize the "transcoding" context for a server if needed
this.transcoder = new DefaultPduTranscoder(new DefaultPduTranscoderContext());
this.sessionIdSequence = new AtomicLong(0);
+ this.monitorExecutor = monitorExecutor;
}
public PduTranscoder getTranscoder() {
@@ -193,7 +200,7 @@ protected void createSession(Long sessionId, Channel channel, SmppSessionConfigu
byte interfaceVersion = this.autoNegotiateInterfaceVersion(config.getInterfaceVersion());
// create a new server session associated with this server
- DefaultSmppSession session = new DefaultSmppSession(SmppSession.Type.SERVER, config, channel, this, sessionId, preparedBindResponse, interfaceVersion);
+ DefaultSmppSession session = new DefaultSmppSession(SmppSession.Type.SERVER, config, channel, this, sessionId, preparedBindResponse, interfaceVersion, monitorExecutor);
// replace name of thread used for renaming
SmppSessionThreadRenamer threadRenamer = (SmppSessionThreadRenamer)channel.getPipeline().get(SmppChannelConstants.PIPELINE_SESSION_THREAD_RENAMER_NAME);
View
158 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java
@@ -14,17 +14,14 @@
package com.cloudhopper.smpp.impl;
-import com.cloudhopper.commons.util.windowing.WindowListener;
-import com.cloudhopper.commons.util.windowing.OfferTimeoutException;
import com.cloudhopper.commons.util.windowing.DuplicateKeyException;
-import com.cloudhopper.commons.util.windowing.RequestCancelledException;
-import com.cloudhopper.commons.util.windowing.RequestFuture;
-import com.cloudhopper.commons.util.windowing.ResponseFuture;
-import com.cloudhopper.commons.util.windowing.ResponseTimeoutException;
+import com.cloudhopper.commons.util.windowing.OfferTimeoutException;
import com.cloudhopper.commons.util.windowing.Window;
import com.cloudhopper.commons.util.windowing.WindowFuture;
+import com.cloudhopper.commons.util.windowing.WindowListener;
import com.cloudhopper.smpp.SmppBindType;
import com.cloudhopper.smpp.SmppConstants;
+import com.cloudhopper.smpp.SmppFuture;
import com.cloudhopper.smpp.SmppServerSession;
import com.cloudhopper.smpp.type.SmppChannelException;
import com.cloudhopper.smpp.SmppSessionConfiguration;
@@ -52,6 +49,7 @@
import com.cloudhopper.smpp.util.SmppSessionUtil;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -79,21 +77,21 @@
private SmppSessionHandler sessionHandler;
private final SequenceNumber sequenceNumber;
private final PduTranscoder transcoder;
- private final Window<Integer,PduRequest,PduResponse> requestWindow;
+ private final Window<Integer,PduRequest,PduResponse> sendWindow;
private byte interfaceVersion;
-
// only for server sessions
private DefaultSmppServer server;
// the session id assigned by the server to this particular instance
private Long serverSessionId;
// pre-prepared BindResponse to send back once we're flagged as ready
private BaseBindResp preparedBindResponse;
+ private ScheduledExecutorService monitorExecutor;
/**
- * Creates a server session.
+ * Creates an SmppSession for a server-based session.
*/
- public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration, Channel channel, DefaultSmppServer server, Long serverSessionId, BaseBindResp preparedBindResponse, byte interfaceVersion) {
- this(localType, configuration, channel, (SmppSessionHandler)null);
+ public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration, Channel channel, DefaultSmppServer server, Long serverSessionId, BaseBindResp preparedBindResponse, byte interfaceVersion, ScheduledExecutorService monitorExecutor) {
+ this(localType, configuration, channel, (SmppSessionHandler)null, monitorExecutor);
// default state for a server session is that it's binding
this.state.set(STATE_BINDING);
this.server = server;
@@ -103,9 +101,36 @@ public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration
}
/**
- * Creates a client session.
+ * Creates an SmppSession for a client-based session. It is <b>NOT</b>
+ * recommended that this constructor is called directly. The recommended
+ * way to construct a session is either via a DefaultSmppClient or
+ * DefaultSmppServer. This constructor will cause monitoring to be disabled.
+ * @param localType The type of local endpoint (ESME vs. SMSC)
+ * @param configuration The session configuration
+ * @param channel The channel associated with this session. The channel
+ * needs to already be opened.
+ * @param sessionHandler The handler for session events
*/
public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration, Channel channel, SmppSessionHandler sessionHandler) {
+ this(localType, configuration, channel, sessionHandler, null);
+ }
+
+
+ /**
+ * Creates an SmppSession for a client-based session. It is <b>NOT</b>
+ * recommended that this constructor is called directly. The recommended
+ * way to construct a session is either via a DefaultSmppClient or
+ * DefaultSmppServer.
+ * @param localType The type of local endpoint (ESME vs. SMSC)
+ * @param configuration The session configuration
+ * @param channel The channel associated with this session. The channel
+ * needs to already be opened.
+ * @param sessionHandler The handler for session events
+ * @param executor The executor that window monitoring and potentially
+ * statistics will be periodically executed under. If null, monitoring
+ * will be disabled.
+ */
+ public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration, Channel channel, SmppSessionHandler sessionHandler, ScheduledExecutorService monitorExecutor) {
this.localType = localType;
this.state = new AtomicInteger(STATE_OPEN);
this.configuration = configuration;
@@ -115,16 +140,26 @@ public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration
this.sequenceNumber = new SequenceNumber();
// always "wrap" the custom pdu transcoder context with a default one
this.transcoder = new DefaultPduTranscoder(new DefaultPduTranscoderContext(this.sessionHandler));
- this.requestWindow = new Window<Integer,PduRequest,PduResponse>(configuration.getWindowSize());
- // should we activate the response expiry reaper?
- if (configuration.getRequestExpiryTimeout() > 0) {
- this.requestWindow.enableExpiredRequestReaper(this, configuration.getRequestExpiryTimeout());
+ this.monitorExecutor = monitorExecutor;
+
+ // different ways to construct the window if monitoring is enabled
+ if (monitorExecutor != null) {
+ // enable send window monitoring, verify if the monitoringInterval has been set
+ if (configuration.getWindowMonitorInterval() <= 0) {
+ throw new IllegalArgumentException("An executor was included during SmppSession constructor, but windowMonitorInterval was <= 0 [actual=" + configuration.getWindowMonitorInterval() + "]");
+ } else {
+ this.sendWindow = new Window<Integer,PduRequest,PduResponse>(configuration.getWindowSize(), monitorExecutor, configuration.getWindowMonitorInterval(), this, configuration.getName() + ".Monitor");
+ }
+ } else {
+ this.sendWindow = new Window<Integer,PduRequest,PduResponse>(configuration.getWindowSize());
}
+
// these server-only items are null
this.server = null;
this.serverSessionId = null;
this.preparedBindResponse = null;
}
+
@Override
public SmppBindType getBindType() {
@@ -223,7 +258,7 @@ protected PduTranscoder getTranscoder() {
@Override
public Window<Integer,PduRequest,PduResponse> getRequestWindow() {
- return this.requestWindow;
+ return this.sendWindow;
}
@Override
@@ -371,24 +406,29 @@ protected void assertValidRequest(PduRequest request) throws NullPointerExceptio
* @throws InterruptedException
*/
protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long timeoutInMillis) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException {
- RequestFuture<Integer,PduRequest,PduResponse> requestFuture = sendRequestPdu(requestPdu, timeoutInMillis, true);
- try {
- requestFuture.await();
- } catch (ResponseTimeoutException e) {
- throw new SmppTimeoutException(e.getMessage(), e);
- } catch (RequestCancelledException e) {
- // the request future may have a cause set that we want to unwrap
- Throwable cause = requestFuture.getCause();
- if (cause != null && cause instanceof ClosedChannelException) {
+ WindowFuture<Integer,PduRequest,PduResponse> future = sendRequestPdu(requestPdu, timeoutInMillis, true);
+ boolean completedWithinTimeout = future.await();
+
+ if (!completedWithinTimeout) {
+ // FIXME: make sure we remove this request from the window??
+// future.cancel();
+ throw new SmppTimeoutException("Unable to get response within [" + timeoutInMillis + " ms]");
+ }
+
+ // 3 possible scenarios once completed: success, failure, or cancellation
+ if (future.isSuccess()) {
+ return future.getResponse();
+ } else if (future.getCause() != null) {
+ Throwable cause = future.getCause();
+ if (cause instanceof ClosedChannelException) {
throw new SmppChannelException("Channel was closed after sending request, but before receiving response", cause);
} else {
- throw new UnrecoverablePduException(e.getMessage(), e);
+ throw new UnrecoverablePduException(cause.getMessage(), cause);
}
- }
- if (requestFuture.isSuccess()) {
- return requestFuture.getResponse();
+ } else if (future.isCancelled()) {
+ throw new RecoverablePduException("Request was cancelled");
} else {
- throw new UnrecoverablePduException("Unable to sendRequestAndGetResponse successfully");
+ throw new UnrecoverablePduException("Unable to sendRequestAndGetResponse successfully (future was in strange state)");
}
}
@@ -418,7 +458,7 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
*/
@SuppressWarnings("unchecked")
@Override
- public RequestFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest pdu, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException {
+ public WindowFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest pdu, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException {
// assign the next PDU sequence # if its not yet assigned
if (!pdu.hasSequenceNumberAssigned()) {
pdu.setSequenceNumber(this.sequenceNumber.next());
@@ -427,9 +467,10 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
// encode the pdu into a buffer
ChannelBuffer buffer = transcoder.encode(pdu);
- RequestFuture<Integer,PduRequest,PduResponse> requestFuture = null;
+ WindowFuture<Integer,PduRequest,PduResponse> future = null;
try {
- requestFuture = requestWindow.addRequest(pdu.getSequenceNumber(), pdu, timeoutInMillis, synchronous);
+ future = sendWindow.offer(pdu.getSequenceNumber(), pdu, timeoutInMillis, configuration.getRequestExpiryTimeout(), synchronous);
+ logger.debug("IsCallerWaiting? " + future.isCallerWaiting());
} catch (DuplicateKeyException e) {
throw new UnrecoverablePduException(e.getMessage(), e);
} catch (OfferTimeoutException e) {
@@ -455,7 +496,7 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
throw new SmppChannelException(channelFuture.getCause().getMessage(), channelFuture.getCause());
}
- return requestFuture;
+ return future;
}
/**
@@ -518,25 +559,26 @@ public void firePduReceived(Pdu pdu) {
int receivedPduSeqNum = pdu.getSequenceNumber();
try {
- // see if a correlating request exists in the "requestWindow"
- ResponseFuture<Integer,PduRequest,PduResponse> responseFuture
- = this.requestWindow.addResponse(receivedPduSeqNum, responsePdu);
-
- if (responseFuture != null) {
+ // see if a correlating request exists in the window
+ WindowFuture<Integer,PduRequest,PduResponse> future = this.sendWindow.complete(receivedPduSeqNum, responsePdu);
+ if (future != null) {
+ logger.debug("Found a future in the window for seqNum [{}]", receivedPduSeqNum);
// if this isn't null, we found a match to a request
- int callerStatus = responseFuture.getCallerStatus();
- // depending on the status of things, handle the response differently
- if (callerStatus == WindowFuture.CALLER_WAITING) {
- // do nothing -- calling thread going to process it
+ int callerStateHint = future.getCallerStateHint();
+ logger.debug("IsCallerWaiting? " + future.isCallerWaiting() + " callerStateHint=" + callerStateHint);
+ if (callerStateHint == WindowFuture.CALLER_WAITING) {
+ logger.debug("Going to just return for {}", future.getRequest());
+ // if a caller is waiting, nothing extra needs done as calling thread will handle the response
return;
- } else if (callerStatus == WindowFuture.CALLER_NOT_WAITING) {
- // this was an "expected" response -- wrap it into an async response
- this.sessionHandler.fireExpectedPduResponseReceived(new DefaultPduAsyncResponse(responseFuture));
+ } else if (callerStateHint == WindowFuture.CALLER_NOT_WAITING) {
+ logger.debug("Going to fireExpectedPduResponseReceived for {}", future.getRequest());
+ // this was an "expected" response - wrap it into an async response
+ this.sessionHandler.fireExpectedPduResponseReceived(new DefaultPduAsyncResponse(future));
return;
}
}
} catch (InterruptedException e) {
- logger.warn("Interrupted while attempting to process response PDU and match it to a request via requesWindow: {}", e);
+ logger.warn("Interrupted while attempting to process response PDU and match it to a request via requesWindow: ", e);
// do nothing, continue processing
}
@@ -574,16 +616,16 @@ public void fireChannelClosed() {
// to do anything special -- however when a caller is waiting for a response
// to a request and we know the channel closed, we should check for those
// specific requests and make sure to cancel them
- if (this.requestWindow.getSize() > 0) {
- logger.warn("Channel closed and requestWindow has [{}] pending requests, some may need cancelled immediately", this.requestWindow.getSize());
- Map<Integer,WindowFuture<Integer,PduRequest,PduResponse>> requests = this.requestWindow.getPendingRequests();
- for (Integer key : requests.keySet()) {
- WindowFuture<Integer,PduRequest,PduResponse> entry = requests.get(key);
+ if (this.sendWindow.getSize() > 0) {
+ logger.warn("Channel closed and sendWindow has [{}] outstanding requests, some may need cancelled immediately", this.sendWindow.getSize());
+ Map<Integer,WindowFuture<Integer,PduRequest,PduResponse>> requests = this.sendWindow.createSortedSnapshot();
+ Throwable cause = new ClosedChannelException();
+ for (WindowFuture<Integer,PduRequest,PduResponse> future : requests.values()) {
// is the caller waiting?
- if (entry.getCallerStatus() == WindowFuture.CALLER_WAITING) {
- logger.warn("Caller waiting on request [{}], cancelling it with a channel closed exception", key);
+ if (future.isCallerWaiting()) {
+ logger.warn("Caller waiting on request [{}], cancelling it with a channel closed exception", future.getKey());
try {
- this.requestWindow.cancelRequest(key, new ClosedChannelException());
+ future.fail(cause);
} catch (Exception e) { }
}
}
@@ -600,8 +642,8 @@ public void fireChannelClosed() {
}
@Override
- public void expired(WindowFuture<Integer, PduRequest, PduResponse> entry) {
- this.sessionHandler.firePduRequestExpired(entry.getRequest());
+ public void expired(WindowFuture<Integer, PduRequest, PduResponse> future) {
+ this.sessionHandler.firePduRequestExpired(future.getRequest());
}
}
View
194 src/test/java/com/cloudhopper/smpp/impl/DefaultSmppSessionTest.java
@@ -17,8 +17,7 @@
// third party imports
import com.cloudhopper.smpp.util.DaemonExecutors;
import com.cloudhopper.commons.util.windowing.OfferTimeoutException;
-import com.cloudhopper.commons.util.windowing.RequestFuture;
-import com.cloudhopper.commons.util.windowing.ResponseTimeoutException;
+import com.cloudhopper.commons.util.windowing.WindowFuture;
import com.cloudhopper.smpp.PduAsyncResponse;
import com.cloudhopper.smpp.SmppBindType;
import com.cloudhopper.smpp.SmppConstants;
@@ -173,8 +172,8 @@ public void bindConnectsButNoResponseThrowsSmppTimeoutException() throws Excepti
Assert.fail();
} catch (SmppTimeoutException e) {
// correct behavior (underlying cause MUST be a response timeout)
- Assert.assertNotNull(e.getCause());
- Assert.assertEquals(ResponseTimeoutException.class, e.getCause().getClass());
+// Assert.assertNotNull(e.getCause());
+// Assert.assertEquals(ResponseTimeoutException.class, e.getCause().getClass());
} finally {
SmppSessionUtil.close(session);
}
@@ -225,6 +224,7 @@ public void enquireLinkWithGenericNackResponse() throws Exception {
SmppSimulatorSessionHandler simulator0 = server.pollNextSession(1000);
// register a generic nack will come next
simulator0.setPduProcessor(new SmppSimulatorPduProcessor() {
+ @Override
public boolean process(SmppSimulatorSessionHandler session, Channel channel, Pdu pdu) throws Exception {
session.addPduToWriteOnNextPduReceived(((PduRequest)pdu).createGenericNack(SmppConstants.STATUS_SYSERR));
return true;
@@ -254,6 +254,7 @@ public void enquireLinkWithARequestWithSameSequenceNumber() throws Exception {
SmppSimulatorSessionHandler simulator0 = server.pollNextSession(1000);
// create an enquire link response back -- we should skip it and wait for a response instead
simulator0.setPduProcessor(new SmppSimulatorPduProcessor() {
+ @Override
public boolean process(SmppSimulatorSessionHandler session, Channel channel, Pdu pdu) throws Exception {
EnquireLink enquireLink = new EnquireLink();
enquireLink.setSequenceNumber(pdu.getSequenceNumber());
@@ -268,8 +269,8 @@ public boolean process(SmppSimulatorSessionHandler session, Channel channel, Pdu
Assert.fail();
} catch (SmppTimeoutException e) {
// correct behavior (underlying cause MUST be a response timeout)
- Assert.assertNotNull(e.getCause());
- Assert.assertEquals(ResponseTimeoutException.class, e.getCause().getClass());
+// Assert.assertNotNull(e.getCause());
+// Assert.assertEquals(ResponseTimeoutException.class, e.getCause().getClass());
}
} finally {
SmppSessionUtil.close(session);
@@ -287,6 +288,7 @@ public void multipleEnquireLinks() throws Exception {
SmppSimulatorSessionHandler simulator0 = server.pollNextSession(1000);
// create an enquire link response back -- we should skip it and wait for a response instead
simulator0.setPduProcessor(new SmppSimulatorPduProcessor() {
+ @Override
public boolean process(SmppSimulatorSessionHandler session, Channel channel, Pdu pdu) throws Exception {
session.addPduToWriteOnNextPduReceived(((PduRequest)pdu).createResponse());
return true;
@@ -336,9 +338,9 @@ public void windowSizeBlocksAsyncRequest() throws Exception {
EnquireLinkResp el3Resp = el3.createResponse();
// this request should be permitted (with window size = 2)
- RequestFuture future0 = session.sendRequestPdu(el0, 3000, true);
- RequestFuture future1 = session.sendRequestPdu(el1, 3000, true);
- RequestFuture future2 = session.sendRequestPdu(el2, 3000, true);
+ WindowFuture future0 = session.sendRequestPdu(el0, 3000, true);
+ WindowFuture future1 = session.sendRequestPdu(el1, 3000, true);
+ WindowFuture future2 = session.sendRequestPdu(el2, 3000, true);
Assert.assertEquals(3, session.getRequestWindow().getSize());
@@ -363,7 +365,7 @@ public void windowSizeBlocksAsyncRequest() throws Exception {
Assert.assertEquals(2, session.getRequestWindow().getSize());
// this request should now succeed
- RequestFuture future3 = session.sendRequestPdu(el3, 3000, true);
+ WindowFuture future3 = session.sendRequestPdu(el3, 3000, true);
// send back responses for everything that's missing
simulator0.sendPdu(el2Resp);
@@ -426,45 +428,6 @@ public void cumulationOfMultipleByteBuffersToParsePdu() throws Exception {
}
}
-
- @Test
- public void receiveUnexpectedPduResponse() throws Exception {
- SmppSessionConfiguration configuration = createDefaultConfiguration();
- registerServerBindProcessor();
- clearAllServerSessions();
-
- // bind and get the simulator session
- PollableSmppSessionHandler sessionHandler = new PollableSmppSessionHandler();
- DefaultSmppSession session = (DefaultSmppSession)bootstrap.bind(configuration, sessionHandler);
-
- SmppSimulatorSessionHandler simulator0 = server.pollNextSession(1000);
- simulator0.setPduProcessor(null);
-
- try {
- EnquireLink el0 = new EnquireLink();
- el0.setSequenceNumber(0x1000);
- EnquireLinkResp el0Resp = el0.createResponse();
-
- // send a response to a request that was NEVER sent
- simulator0.sendPdu(el0Resp);
-
- // we should have received a PDU response
- PduResponse pdu0 = sessionHandler.getReceivedUnexpectedPduResponses().poll(1000, TimeUnit.MILLISECONDS);
- Assert.assertNotNull("Unable to receive unexpected PDU response -- perhaps it was routed incorrectly?", pdu0);
- Assert.assertEquals(SmppConstants.CMD_ID_ENQUIRE_LINK_RESP, pdu0.getCommandId());
- Assert.assertEquals(0, pdu0.getCommandStatus());
- Assert.assertEquals(16, pdu0.getCommandLength());
- Assert.assertEquals(0x1000, pdu0.getSequenceNumber());
-
- Assert.assertEquals(0, sessionHandler.getReceivedPduRequests().size());
- Assert.assertEquals(0, sessionHandler.getReceivedExpectedPduResponses().size());
- Assert.assertEquals(0, sessionHandler.getReceivedUnexpectedPduResponses().size());
- } finally {
- SmppSessionUtil.close(session);
- }
- }
-
-
@Test
public void routePduResponseToWaitingThread() throws Exception {
SmppSessionConfiguration configuration = createDefaultConfiguration();
@@ -477,6 +440,7 @@ public void routePduResponseToWaitingThread() throws Exception {
SmppSimulatorSessionHandler simulator0 = server.pollNextSession(1000);
simulator0.setPduProcessor(new SmppSimulatorPduProcessor() {
+ @Override
public boolean process(SmppSimulatorSessionHandler session, Channel channel, Pdu pdu) throws Exception {
session.addPduToWriteOnNextPduReceived(((PduRequest)pdu).createResponse());
return true;
@@ -505,53 +469,6 @@ public boolean process(SmppSimulatorSessionHandler session, Channel channel, Pdu
@Test
- public void receiveUnexpectedPduResponseAfterSenderThreadTimeoutWaiting() throws Exception {
- SmppSessionConfiguration configuration = createDefaultConfiguration();
- registerServerBindProcessor();
- clearAllServerSessions();
-
- // bind and get the simulator session
- PollableSmppSessionHandler sessionHandler = new PollableSmppSessionHandler();
- DefaultSmppSession session = (DefaultSmppSession)bootstrap.bind(configuration, sessionHandler);
-
- SmppSimulatorSessionHandler simulator0 = server.pollNextSession(1000);
- simulator0.setPduProcessor(null);
-
- try {
- EnquireLink el0 = new EnquireLink();
- el0.setSequenceNumber(0x1001);
- EnquireLinkResp el0Resp = el0.createResponse();
-
- // send a request and wait for a response that never shows up
- RequestFuture future = session.sendRequestPdu(el0, 50, true);
- try {
- future.await();
- Assert.fail();
- } catch (ResponseTimeoutException e) {
- // correct behavior
- }
-
- // send a response now after the caller would have timed out
- simulator0.sendPdu(el0Resp);
-
- // we should have received an unexpected PDU response
- PduResponse pdu0 = sessionHandler.getReceivedUnexpectedPduResponses().poll(1000, TimeUnit.MILLISECONDS);
- Assert.assertNotNull("Unable to receive unexpected PDU response -- perhaps it was routed incorrectly?", pdu0);
- Assert.assertEquals(SmppConstants.CMD_ID_ENQUIRE_LINK_RESP, pdu0.getCommandId());
- Assert.assertEquals(0, pdu0.getCommandStatus());
- Assert.assertEquals(16, pdu0.getCommandLength());
- Assert.assertEquals(0x1001, pdu0.getSequenceNumber());
-
- Assert.assertEquals(0, sessionHandler.getReceivedPduRequests().size());
- Assert.assertEquals(0, sessionHandler.getReceivedExpectedPduResponses().size());
- Assert.assertEquals(0, sessionHandler.getReceivedUnexpectedPduResponses().size());
- } finally {
- SmppSessionUtil.close(session);
- }
- }
-
-
- @Test
public void receiveExpectedPduResponseViaAnAsynchronousSend() throws Exception {
SmppSessionConfiguration configuration = createDefaultConfiguration();
registerServerBindProcessor();
@@ -958,4 +875,89 @@ public void asynchronousPduRequestWithResponseTypeNotMatchingOriginalRequest() t
SmppSessionUtil.close(session);
}
}
+
+
+ @Test
+ public void receiveUnexpectedPduResponseAfterSenderThreadTimeoutWaiting() throws Exception {
+ SmppSessionConfiguration configuration = createDefaultConfiguration();
+ registerServerBindProcessor();
+ clearAllServerSessions();
+
+ // bind and get the simulator session
+ PollableSmppSessionHandler sessionHandler = new PollableSmppSessionHandler();
+ DefaultSmppSession session = (DefaultSmppSession)bootstrap.bind(configuration, sessionHandler);
+
+ SmppSimulatorSessionHandler simulator0 = server.pollNextSession(1000);
+ simulator0.setPduProcessor(null);
+
+ try {
+ EnquireLink el0 = new EnquireLink();
+ el0.setSequenceNumber(0x1001);
+ EnquireLinkResp el0Resp = el0.createResponse();
+
+ // send a request and wait for a response that never shows up
+ WindowFuture future = session.sendRequestPdu(el0, 50, true);
+ Assert.assertFalse(future.await());
+ // a call to cancel() is usually done in sendRequestPduAndGetResponse
+ // but for this test we'll manually need to call it here
+ future.cancel();
+
+ Assert.assertEquals(WindowFuture.CALLER_WAITING_TIMEOUT, future.getCallerStateHint());
+
+ // send a response now after the caller would have timed out
+ simulator0.sendPdu(el0Resp);
+
+ // we should have received an unexpected PDU response
+ Assert.assertEquals(0, sessionHandler.getReceivedPduRequests().size());
+ Assert.assertEquals(0, sessionHandler.getReceivedExpectedPduResponses().size());
+ PduResponse pdu0 = sessionHandler.getReceivedUnexpectedPduResponses().poll(1000, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull("Unable to receive unexpected PDU response -- perhaps it was routed incorrectly?", pdu0);
+ Assert.assertEquals(SmppConstants.CMD_ID_ENQUIRE_LINK_RESP, pdu0.getCommandId());
+ Assert.assertEquals(0, pdu0.getCommandStatus());
+ Assert.assertEquals(16, pdu0.getCommandLength());
+ Assert.assertEquals(0x1001, pdu0.getSequenceNumber());
+
+ Assert.assertEquals(0, sessionHandler.getReceivedUnexpectedPduResponses().size());
+ } finally {
+ SmppSessionUtil.close(session);
+ }
+ }
+
+ @Test
+ public void receiveUnexpectedPduResponse() throws Exception {
+ SmppSessionConfiguration configuration = createDefaultConfiguration();
+ registerServerBindProcessor();
+ clearAllServerSessions();
+
+ // bind and get the simulator session
+ PollableSmppSessionHandler sessionHandler = new PollableSmppSessionHandler();
+ DefaultSmppSession session = (DefaultSmppSession)bootstrap.bind(configuration, sessionHandler);
+
+ SmppSimulatorSessionHandler simulator0 = server.pollNextSession(1000);
+ simulator0.setPduProcessor(null);
+
+ try {
+ EnquireLink el0 = new EnquireLink();
+ el0.setSequenceNumber(0x1000);
+ EnquireLinkResp el0Resp = el0.createResponse();
+
+ // send a response to a request that was NEVER sent
+ simulator0.sendPdu(el0Resp);
+
+ // we should have received a PDU response
+ PduResponse pdu0 = sessionHandler.getReceivedUnexpectedPduResponses().poll(1000, TimeUnit.MILLISECONDS);
+ Assert.assertNotNull("Unable to receive unexpected PDU response -- perhaps it was routed incorrectly?", pdu0);
+ Assert.assertEquals(SmppConstants.CMD_ID_ENQUIRE_LINK_RESP, pdu0.getCommandId());
+ Assert.assertEquals(0, pdu0.getCommandStatus());
+ Assert.assertEquals(16, pdu0.getCommandLength());
+ Assert.assertEquals(0x1000, pdu0.getSequenceNumber());
+
+ Assert.assertEquals(0, sessionHandler.getReceivedPduRequests().size());
+ Assert.assertEquals(0, sessionHandler.getReceivedExpectedPduResponses().size());
+ Assert.assertEquals(0, sessionHandler.getReceivedUnexpectedPduResponses().size());
+ } finally {
+ SmppSessionUtil.close(session);
+ }
+ }
+
}

0 comments on commit 0126bde

Please sign in to comment.
Something went wrong with that request. Please try again.