Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Initial work on new SmppFuture impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jjlauer committed May 2, 2011
1 parent 921f19f commit 7b598e3
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 25 deletions.
29 changes: 29 additions & 0 deletions src/main/java/com/cloudhopper/smpp/SmppFuture.java
@@ -0,0 +1,29 @@
/*
* Copyright 2011 Twitter, Inc..
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudhopper.smpp;

import com.cloudhopper.commons.util.windowing.WindowFuture;
import com.cloudhopper.smpp.pdu.PduRequest;
import com.cloudhopper.smpp.pdu.PduResponse;

/**
* Defines the interface for any asynchronous or synchronous operation in SMPP.
*
* @author joelauer
*/
public interface SmppFuture extends WindowFuture<Integer,PduRequest,PduResponse> {

}
12 changes: 6 additions & 6 deletions src/main/java/com/cloudhopper/smpp/SmppSession.java
Expand Up @@ -14,8 +14,8 @@

package com.cloudhopper.smpp;

import com.cloudhopper.commons.util.windowing.RequestFuture;
import com.cloudhopper.commons.util.windowing.Window;
import com.cloudhopper.commons.util.windowing.WindowFuture;
import com.cloudhopper.smpp.type.SmppChannelException;
import com.cloudhopper.smpp.type.SmppTimeoutException;
import com.cloudhopper.smpp.pdu.EnquireLink;
Expand Down Expand Up @@ -268,19 +268,19 @@ public enum Type {
* is returned.
* @throws RecoverablePduException Thrown when a recoverable PDU error occurs.
* A recoverable PDU error includes the partially decoded PDU in order
* to generate a negative acknowledgement (NACK) response.
* to generate a negative acknowledgment (NACK) response.
* @throws UnrecoverablePduException Thrown when an unrecoverable PDU error
* occurs. This indicates a seriours error occurred and usually indicates
* occurs. This indicates a serious error occurred and usually indicates
* the session should be immediately terminated.
* @throws SmppTimeoutException A timeout occurred while waiting for a response
* from the remote endpoint. A timeout can either occur with an unresponse
* from the remote endpoint. A timeout can either occur with an unresponsive
* remote endpoint or the bytes were not written in time.
* @throws SmppChannelException Thrown when the underlying socket/channel was
* unable to write the request.
* @throws InterruptedException The calling thread was interrupted while waiting
* to acquire a lock or write/read the bytes from the socket/channel.
*/
public RequestFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest request, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;
public WindowFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest request, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;

/**
* Main underlying method for sending a response PDU to the remote endpoint.
Expand All @@ -299,4 +299,4 @@ public enum Type {
* to acquire a lock or write/read the bytes from the socket/channel.
*/
public void sendResponsePdu(PduResponse response) throws RecoverablePduException, UnrecoverablePduException, SmppChannelException, InterruptedException;
}
}
Expand Up @@ -15,7 +15,7 @@
package com.cloudhopper.smpp.impl;

import com.cloudhopper.commons.util.HexUtil;
import com.cloudhopper.commons.util.windowing.ResponseFuture;
import com.cloudhopper.commons.util.windowing.WindowFuture;
import com.cloudhopper.smpp.PduAsyncResponse;
import com.cloudhopper.smpp.pdu.PduRequest;
import com.cloudhopper.smpp.pdu.PduResponse;
Expand All @@ -26,21 +26,24 @@
* @author joelauer
*/
public class DefaultPduAsyncResponse implements PduAsyncResponse {
// we internally "wrap" a PDU response future
private final ResponseFuture<Integer,PduRequest,PduResponse> future;
// we internally "wrap" a PDU window future
private final WindowFuture<Integer,PduRequest,PduResponse> future;

public DefaultPduAsyncResponse(ResponseFuture<Integer,PduRequest,PduResponse> future) {
public DefaultPduAsyncResponse(WindowFuture<Integer,PduRequest,PduResponse> future) {
this.future = future;
}

@Override
public PduRequest getRequest() {
return future.getRequest();
}

@Override
public PduResponse getResponse() {
return future.getResponse();
}

@Override
public long getProcessingTime() {
return future.getProcessingTime();
}
Expand Down
27 changes: 12 additions & 15 deletions src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java
Expand Up @@ -14,17 +14,12 @@

package com.cloudhopper.smpp.impl;

import com.cloudhopper.commons.util.windowing.ExpiredRequestListener;
import com.cloudhopper.commons.util.windowing.MaxWindowSizeTimeoutException;
import com.cloudhopper.commons.util.windowing.RequestAlreadyExistsException;
import com.cloudhopper.commons.util.windowing.RequestCanceledException;
import com.cloudhopper.commons.util.windowing.RequestFuture;
import com.cloudhopper.commons.util.windowing.ResponseFuture;
import com.cloudhopper.commons.util.windowing.ResponseTimeoutException;
import com.cloudhopper.commons.util.windowing.Window;
import com.cloudhopper.commons.util.windowing.WindowEntry;
import com.cloudhopper.commons.util.windowing.WindowFuture;
import com.cloudhopper.commons.util.windowing.WindowListener;
import com.cloudhopper.smpp.SmppBindType;
import com.cloudhopper.smpp.SmppConstants;
import com.cloudhopper.smpp.SmppFuture;
import com.cloudhopper.smpp.SmppServerSession;
import com.cloudhopper.smpp.type.SmppChannelException;
import com.cloudhopper.smpp.SmppSessionConfiguration;
Expand Down Expand Up @@ -65,7 +60,7 @@
*
* @author joelauer
*/
public class DefaultSmppSession implements SmppServerSession, SmppSessionChannelListener, ExpiredRequestListener<Integer,PduRequest,PduResponse> {
public class DefaultSmppSession implements SmppServerSession, SmppSessionChannelListener, WindowListener<Integer,PduRequest,PduResponse> {
private static final Logger logger = LoggerFactory.getLogger(DefaultSmppSession.class);

// are we an "esme" or "smsc" session type?
Expand Down Expand Up @@ -116,10 +111,12 @@ public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration
// always "wrap" the custom pdu transcoder context with a default one
this.transcoder = new DefaultPduTranscoder(new DefaultPduTranscoderContext(this.sessionHandler));
this.requestWindow = new Window<Integer,PduRequest,PduResponse>(configuration.getWindowSize());

// should we activate the response expiry reaper?
if (configuration.getRequestExpiryTimeout() > 0) {
this.requestWindow.enableExpiredRequestReaper(this, configuration.getRequestExpiryTimeout());
}

// these server-only items are null
this.server = null;
this.serverSessionId = null;
Expand Down Expand Up @@ -371,7 +368,7 @@ protected void assertValidRequest(PduRequest request) throws NullPointerExceptio
* @throws InterruptedException
*/
protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long timeoutInMillis) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException {
RequestFuture<Integer,PduRequest,PduResponse> requestFuture = sendRequestPdu(requestPdu, timeoutInMillis, true);
WindowFuture<Integer,PduRequest,PduResponse> requestFuture = sendRequestPdu(requestPdu, timeoutInMillis, true);
try {
requestFuture.await();
} catch (ResponseTimeoutException e) {
Expand Down Expand Up @@ -418,7 +415,7 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
*/
@SuppressWarnings("unchecked")
@Override
public RequestFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest pdu, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException {
public SmppFuture sendRequestPdu(PduRequest pdu, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException {
// assign the next PDU sequence # if its not yet assigned
if (!pdu.hasSequenceNumberAssigned()) {
pdu.setSequenceNumber(this.sequenceNumber.next());
Expand All @@ -427,9 +424,9 @@ public RequestFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest p
// encode the pdu into a buffer
ChannelBuffer buffer = transcoder.encode(pdu);

RequestFuture<Integer,PduRequest,PduResponse> requestFuture = null;
SmppFuture future = null;
try {
requestFuture = requestWindow.addRequest(pdu.getSequenceNumber(), pdu, timeoutInMillis, synchronous);
future = requestWindow.offer(pdu.getSequenceNumber(), pdu, timeoutInMillis, synchronous);
} catch (RequestAlreadyExistsException e) {
throw new UnrecoverablePduException(e.getMessage(), e);
} catch (MaxWindowSizeTimeoutException e) {
Expand Down Expand Up @@ -600,8 +597,8 @@ public void fireChannelClosed() {
}

@Override
public void requestExpired(WindowEntry<Integer, PduRequest, PduResponse> entry) {
this.sessionHandler.firePduRequestExpired(entry.getRequest());
public void expired(WindowFuture<Integer, PduRequest, PduResponse> future) {
this.sessionHandler.firePduRequestExpired(future.getRequest());
}

}

0 comments on commit 7b598e3

Please sign in to comment.