diff --git a/src/main/java/com/cloudhopper/smpp/SmppFuture.java b/src/main/java/com/cloudhopper/smpp/SmppFuture.java new file mode 100644 index 00000000..cec53535 --- /dev/null +++ b/src/main/java/com/cloudhopper/smpp/SmppFuture.java @@ -0,0 +1,29 @@ +/* + * Copyright 2011 Twitter, Inc.. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudhopper.smpp; + +import com.cloudhopper.commons.util.windowing.WindowFuture; +import com.cloudhopper.smpp.pdu.PduRequest; +import com.cloudhopper.smpp.pdu.PduResponse; + +/** + * Defines the interface for any asynchronous or synchronous operation in SMPP. + * + * @author joelauer + */ +public interface SmppFuture extends WindowFuture { + +} diff --git a/src/main/java/com/cloudhopper/smpp/SmppSession.java b/src/main/java/com/cloudhopper/smpp/SmppSession.java index 882da7f6..7e939dd9 100644 --- a/src/main/java/com/cloudhopper/smpp/SmppSession.java +++ b/src/main/java/com/cloudhopper/smpp/SmppSession.java @@ -14,8 +14,8 @@ package com.cloudhopper.smpp; -import com.cloudhopper.commons.util.windowing.RequestFuture; import com.cloudhopper.commons.util.windowing.Window; +import com.cloudhopper.commons.util.windowing.WindowFuture; import com.cloudhopper.smpp.type.SmppChannelException; import com.cloudhopper.smpp.type.SmppTimeoutException; import com.cloudhopper.smpp.pdu.EnquireLink; @@ -268,19 +268,19 @@ public enum Type { * is returned. * @throws RecoverablePduException Thrown when a recoverable PDU error occurs. * A recoverable PDU error includes the partially decoded PDU in order - * to generate a negative acknowledgement (NACK) response. + * to generate a negative acknowledgment (NACK) response. * @throws UnrecoverablePduException Thrown when an unrecoverable PDU error - * occurs. This indicates a seriours error occurred and usually indicates + * occurs. This indicates a serious error occurred and usually indicates * the session should be immediately terminated. * @throws SmppTimeoutException A timeout occurred while waiting for a response - * from the remote endpoint. A timeout can either occur with an unresponse + * from the remote endpoint. A timeout can either occur with an unresponsive * remote endpoint or the bytes were not written in time. * @throws SmppChannelException Thrown when the underlying socket/channel was * unable to write the request. * @throws InterruptedException The calling thread was interrupted while waiting * to acquire a lock or write/read the bytes from the socket/channel. */ - public RequestFuture sendRequestPdu(PduRequest request, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException; + public WindowFuture sendRequestPdu(PduRequest request, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException; /** * Main underlying method for sending a response PDU to the remote endpoint. @@ -299,4 +299,4 @@ public enum Type { * to acquire a lock or write/read the bytes from the socket/channel. */ public void sendResponsePdu(PduResponse response) throws RecoverablePduException, UnrecoverablePduException, SmppChannelException, InterruptedException; -} +} \ No newline at end of file diff --git a/src/main/java/com/cloudhopper/smpp/impl/DefaultPduAsyncResponse.java b/src/main/java/com/cloudhopper/smpp/impl/DefaultPduAsyncResponse.java index 47e1a0a9..caf291de 100644 --- a/src/main/java/com/cloudhopper/smpp/impl/DefaultPduAsyncResponse.java +++ b/src/main/java/com/cloudhopper/smpp/impl/DefaultPduAsyncResponse.java @@ -15,7 +15,7 @@ package com.cloudhopper.smpp.impl; import com.cloudhopper.commons.util.HexUtil; -import com.cloudhopper.commons.util.windowing.ResponseFuture; +import com.cloudhopper.commons.util.windowing.WindowFuture; import com.cloudhopper.smpp.PduAsyncResponse; import com.cloudhopper.smpp.pdu.PduRequest; import com.cloudhopper.smpp.pdu.PduResponse; @@ -26,21 +26,24 @@ * @author joelauer */ public class DefaultPduAsyncResponse implements PduAsyncResponse { - // we internally "wrap" a PDU response future - private final ResponseFuture future; + // we internally "wrap" a PDU window future + private final WindowFuture future; - public DefaultPduAsyncResponse(ResponseFuture future) { + public DefaultPduAsyncResponse(WindowFuture future) { this.future = future; } + @Override public PduRequest getRequest() { return future.getRequest(); } + @Override public PduResponse getResponse() { return future.getResponse(); } + @Override public long getProcessingTime() { return future.getProcessingTime(); } diff --git a/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java b/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java index dd54c46e..b2188fb0 100644 --- a/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java +++ b/src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java @@ -14,17 +14,12 @@ package com.cloudhopper.smpp.impl; -import com.cloudhopper.commons.util.windowing.ExpiredRequestListener; -import com.cloudhopper.commons.util.windowing.MaxWindowSizeTimeoutException; -import com.cloudhopper.commons.util.windowing.RequestAlreadyExistsException; -import com.cloudhopper.commons.util.windowing.RequestCanceledException; -import com.cloudhopper.commons.util.windowing.RequestFuture; -import com.cloudhopper.commons.util.windowing.ResponseFuture; -import com.cloudhopper.commons.util.windowing.ResponseTimeoutException; import com.cloudhopper.commons.util.windowing.Window; -import com.cloudhopper.commons.util.windowing.WindowEntry; +import com.cloudhopper.commons.util.windowing.WindowFuture; +import com.cloudhopper.commons.util.windowing.WindowListener; import com.cloudhopper.smpp.SmppBindType; import com.cloudhopper.smpp.SmppConstants; +import com.cloudhopper.smpp.SmppFuture; import com.cloudhopper.smpp.SmppServerSession; import com.cloudhopper.smpp.type.SmppChannelException; import com.cloudhopper.smpp.SmppSessionConfiguration; @@ -65,7 +60,7 @@ * * @author joelauer */ -public class DefaultSmppSession implements SmppServerSession, SmppSessionChannelListener, ExpiredRequestListener { +public class DefaultSmppSession implements SmppServerSession, SmppSessionChannelListener, WindowListener { private static final Logger logger = LoggerFactory.getLogger(DefaultSmppSession.class); // are we an "esme" or "smsc" session type? @@ -116,10 +111,12 @@ public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration // always "wrap" the custom pdu transcoder context with a default one this.transcoder = new DefaultPduTranscoder(new DefaultPduTranscoderContext(this.sessionHandler)); this.requestWindow = new Window(configuration.getWindowSize()); + // should we activate the response expiry reaper? if (configuration.getRequestExpiryTimeout() > 0) { this.requestWindow.enableExpiredRequestReaper(this, configuration.getRequestExpiryTimeout()); } + // these server-only items are null this.server = null; this.serverSessionId = null; @@ -371,7 +368,7 @@ protected void assertValidRequest(PduRequest request) throws NullPointerExceptio * @throws InterruptedException */ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long timeoutInMillis) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException { - RequestFuture requestFuture = sendRequestPdu(requestPdu, timeoutInMillis, true); + WindowFuture requestFuture = sendRequestPdu(requestPdu, timeoutInMillis, true); try { requestFuture.await(); } catch (ResponseTimeoutException e) { @@ -418,7 +415,7 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time */ @SuppressWarnings("unchecked") @Override - public RequestFuture sendRequestPdu(PduRequest pdu, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException { + public SmppFuture sendRequestPdu(PduRequest pdu, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException { // assign the next PDU sequence # if its not yet assigned if (!pdu.hasSequenceNumberAssigned()) { pdu.setSequenceNumber(this.sequenceNumber.next()); @@ -427,9 +424,9 @@ public RequestFuture sendRequestPdu(PduRequest p // encode the pdu into a buffer ChannelBuffer buffer = transcoder.encode(pdu); - RequestFuture requestFuture = null; + SmppFuture future = null; try { - requestFuture = requestWindow.addRequest(pdu.getSequenceNumber(), pdu, timeoutInMillis, synchronous); + future = requestWindow.offer(pdu.getSequenceNumber(), pdu, timeoutInMillis, synchronous); } catch (RequestAlreadyExistsException e) { throw new UnrecoverablePduException(e.getMessage(), e); } catch (MaxWindowSizeTimeoutException e) { @@ -600,8 +597,8 @@ public void fireChannelClosed() { } @Override - public void requestExpired(WindowEntry entry) { - this.sessionHandler.firePduRequestExpired(entry.getRequest()); + public void expired(WindowFuture future) { + this.sessionHandler.firePduRequestExpired(future.getRequest()); } }