Skip to content
This repository
Browse code

Integrated new monitor for auto expiring requests in client

  • Loading branch information...
commit 04a96223475cb939cdf97d219986ed74ad8fafa8 1 parent 0126bde
Joe Lauer jjlauer authored
2  pom.xml
@@ -100,7 +100,7 @@
100 100 <netty.version>[3.2,)</netty.version>
101 101 <ch-commons-charset.version>[2.0,)</ch-commons-charset.version>
102 102 <ch-commons-gsm.version>[2.0,)</ch-commons-gsm.version>
103   - <ch-commons-util.version>3.3-SNAPSHOT</ch-commons-util.version>
  103 + <ch-commons-util.version>4.2-SNAPSHOT</ch-commons-util.version>
104 104 </properties>
105 105
106 106 </project>
29 src/main/java/com/cloudhopper/smpp/SmppFuture.java
... ... @@ -1,29 +0,0 @@
1   -/*
2   - * Copyright 2011 Twitter, Inc..
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package com.cloudhopper.smpp;
17   -
18   -import com.cloudhopper.commons.util.windowing.WindowFuture;
19   -import com.cloudhopper.smpp.pdu.PduRequest;
20   -import com.cloudhopper.smpp.pdu.PduResponse;
21   -
22   -/**
23   - * Interface representing either an asynchronous or synchronous operation in SMPP.
24   - *
25   - * @author joelauer
26   - */
27   -public interface SmppFuture extends WindowFuture<Integer,PduRequest,PduResponse> {
28   -
29   -}
8 src/main/java/com/cloudhopper/smpp/SmppSession.java
@@ -164,13 +164,19 @@
164 164 public long getBoundTime();
165 165
166 166 /**
  167 + * @deprecated
  168 + * @see #getRequestWindow
  169 + */
  170 + public Window<Integer,PduRequest,PduResponse> getRequestWindow();
  171 +
  172 + /**
167 173 * Gets the underlying request "window" for this session. A "window" represents
168 174 * a request sent to the remote endpoint, but has not received a response
169 175 * yet. Accessing this property is useful if unacknowledged requests need
170 176 * to be cleared out (most likely for a retry at a later time).
171 177 * @return The request "window"
172 178 */
173   - public Window<Integer,PduRequest,PduResponse> getRequestWindow();
  179 + public Window<Integer,PduRequest,PduResponse> getSendWindow();
174 180
175 181 /**
176 182 * Immediately close the session by closing the underlying socket/channel.
9 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppClient.java
@@ -40,6 +40,7 @@
40 40 import com.cloudhopper.smpp.type.UnrecoverablePduException;
41 41 import java.net.InetSocketAddress;
42 42 import java.util.concurrent.ExecutorService;
  43 +import java.util.concurrent.ScheduledExecutorService;
43 44 import org.jboss.netty.bootstrap.ClientBootstrap;
44 45 import org.jboss.netty.channel.Channel;
45 46 import org.jboss.netty.channel.ChannelFuture;
@@ -62,6 +63,7 @@
62 63 private ExecutorService executors;
63 64 private NioClientSocketChannelFactory channelFactory;
64 65 private ClientBootstrap clientBootstrap;
  66 + private ScheduledExecutorService monitorExecutor;
65 67
66 68 public DefaultSmppClient() {
67 69 this(DaemonExecutors.newCachedDaemonThreadPool());
@@ -72,6 +74,10 @@ public DefaultSmppClient(ExecutorService executors) {
72 74 }
73 75
74 76 public DefaultSmppClient(ExecutorService executors, int expectedSessions) {
  77 + this(executors, expectedSessions, null);
  78 + }
  79 +
  80 + public DefaultSmppClient(ExecutorService executors, int expectedSessions, ScheduledExecutorService monitorExecutor) {
75 81 this.channels = new DefaultChannelGroup();
76 82 this.executors = executors;
77 83 this.channelFactory = new NioClientSocketChannelFactory(this.executors, this.executors, expectedSessions);
@@ -79,6 +85,7 @@ public DefaultSmppClient(ExecutorService executors, int expectedSessions) {
79 85 // we use the same default pipeline for all new channels - no need for a factory
80 86 this.clientConnector = new SmppClientConnector(this.channels);
81 87 this.clientBootstrap.getPipeline().addLast(SmppChannelConstants.PIPELINE_CLIENT_CONNECTOR_NAME, this.clientConnector);
  88 + this.monitorExecutor = monitorExecutor;
82 89 }
83 90
84 91 @Override
@@ -177,7 +184,7 @@ protected DefaultSmppSession doOpen(SmppSessionConfiguration config, SmppSession
177 184 }
178 185
179 186 protected DefaultSmppSession createSession(Channel channel, SmppSessionConfiguration config, SmppSessionHandler sessionHandler) throws SmppTimeoutException, SmppChannelException, InterruptedException {
180   - DefaultSmppSession session = new DefaultSmppSession(SmppSession.Type.CLIENT, config, channel, sessionHandler);
  187 + DefaultSmppSession session = new DefaultSmppSession(SmppSession.Type.CLIENT, config, channel, sessionHandler, monitorExecutor);
181 188
182 189 // add the thread renamer portion to the pipeline
183 190 if (config.getName() != null) {
14 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java
@@ -21,7 +21,6 @@
21 21 import com.cloudhopper.commons.util.windowing.WindowListener;
22 22 import com.cloudhopper.smpp.SmppBindType;
23 23 import com.cloudhopper.smpp.SmppConstants;
24   -import com.cloudhopper.smpp.SmppFuture;
25 24 import com.cloudhopper.smpp.SmppServerSession;
26 25 import com.cloudhopper.smpp.type.SmppChannelException;
27 26 import com.cloudhopper.smpp.SmppSessionConfiguration;
@@ -255,9 +254,14 @@ public SequenceNumber getSequenceNumber() {
255 254 protected PduTranscoder getTranscoder() {
256 255 return this.transcoder;
257 256 }
258   -
  257 +
259 258 @Override
260 259 public Window<Integer,PduRequest,PduResponse> getRequestWindow() {
  260 + return getSendWindow();
  261 + }
  262 +
  263 + @Override
  264 + public Window<Integer,PduRequest,PduResponse> getSendWindow() {
261 265 return this.sendWindow;
262 266 }
263 267
@@ -410,8 +414,9 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
410 414 boolean completedWithinTimeout = future.await();
411 415
412 416 if (!completedWithinTimeout) {
413   - // FIXME: make sure we remove this request from the window??
414   -// future.cancel();
  417 + // since this is a "synchronous" request and it timed out, we don't
  418 + // want it eating up valuable window space - cancel it before returning exception
  419 + future.cancel();
415 420 throw new SmppTimeoutException("Unable to get response within [" + timeoutInMillis + " ms]");
416 421 }
417 422
@@ -471,6 +476,7 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
471 476 try {
472 477 future = sendWindow.offer(pdu.getSequenceNumber(), pdu, timeoutInMillis, configuration.getRequestExpiryTimeout(), synchronous);
473 478 logger.debug("IsCallerWaiting? " + future.isCallerWaiting());
  479 + logger.debug("Expire Timestamp: " + future.getExpireTimestamp());
474 480 } catch (DuplicateKeyException e) {
475 481 throw new UnrecoverablePduException(e.getMessage(), e);
476 482 } catch (OfferTimeoutException e) {
9 src/test/java/com/cloudhopper/smpp/demo/ClientMain.java
@@ -22,7 +22,6 @@
22 22 import com.cloudhopper.smpp.impl.DefaultSmppClient;
23 23 import com.cloudhopper.smpp.impl.DefaultSmppSessionHandler;
24 24 import com.cloudhopper.smpp.pdu.DeliverSm;
25   -import com.cloudhopper.smpp.pdu.SubmitSmResp;
26 25 import com.cloudhopper.smpp.type.Address;
27 26 import com.cloudhopper.smpp.pdu.EnquireLink;
28 27 import com.cloudhopper.smpp.pdu.PduRequest;
@@ -44,7 +43,9 @@ static public void main(String[] args) throws Exception {
44 43 // THIS VERSION USES "DAEMON" threads by default
45 44 // SmppSessionBootstrap bootstrap = new SmppSessionBootstrap();
46 45 // THIS VERSION DOESN'T - WILL HANG JVM UNTIL CLOSED
47   - DefaultSmppClient bootstrap = new DefaultSmppClient(Executors.newCachedThreadPool());
  46 + //DefaultSmppClient bootstrap = new DefaultSmppClient(Executors.newCachedThreadPool());
  47 + // include monitoring (automatic expiration) in this version
  48 + DefaultSmppClient bootstrap = new DefaultSmppClient(Executors.newCachedThreadPool(), 1, Executors.newScheduledThreadPool(1));
48 49
49 50 DefaultSmppSessionHandler sessionHandler = new ClientSmppSessionHandler();
50 51
@@ -58,6 +59,10 @@ static public void main(String[] args) throws Exception {
58 59 config0.setSystemId("1234567890");
59 60 config0.setPassword("password");
60 61 config0.getLoggingOptions().setLogBytes(true);
  62 +
  63 + // to enable monitoring
  64 + config0.setRequestExpiryTimeout(2000);
  65 + config0.setWindowMonitorInterval(1000);
61 66
62 67 SmppSession session0 = null;
63 68
56 src/test/java/com/cloudhopper/smpp/impl/DefaultSmppSessionTest.java
@@ -342,7 +342,7 @@ public void windowSizeBlocksAsyncRequest() throws Exception {
342 342 WindowFuture future1 = session.sendRequestPdu(el1, 3000, true);
343 343 WindowFuture future2 = session.sendRequestPdu(el2, 3000, true);
344 344
345   - Assert.assertEquals(3, session.getRequestWindow().getSize());
  345 + Assert.assertEquals(3, session.getSendWindow().getSize());
346 346
347 347 try {
348 348 // window size of 3 is now filled up, this one should timeout
@@ -353,7 +353,7 @@ public void windowSizeBlocksAsyncRequest() throws Exception {
353 353 Assert.assertEquals(OfferTimeoutException.class, e.getCause().getClass());
354 354 }
355 355
356   - Assert.assertEquals(3, session.getRequestWindow().getSize());
  356 + Assert.assertEquals(3, session.getSendWindow().getSize());
357 357
358 358 // now the smsc will send a response back to the second request
359 359 simulator0.sendPdu(el1Resp);
@@ -362,7 +362,7 @@ public void windowSizeBlocksAsyncRequest() throws Exception {
362 362 future1.await();
363 363
364 364 // there should be 1 slot free now in the window
365   - Assert.assertEquals(2, session.getRequestWindow().getSize());
  365 + Assert.assertEquals(2, session.getSendWindow().getSize());
366 366
367 367 // this request should now succeed
368 368 WindowFuture future3 = session.sendRequestPdu(el3, 3000, true);
@@ -378,7 +378,7 @@ public void windowSizeBlocksAsyncRequest() throws Exception {
378 378 future2.await();
379 379 future3.await();
380 380
381   - Assert.assertEquals(0, session.getRequestWindow().getSize());
  381 + Assert.assertEquals(0, session.getSendWindow().getSize());
382 382 } finally {
383 383 SmppSessionUtil.close(session);
384 384 }
@@ -959,5 +959,53 @@ public void receiveUnexpectedPduResponse() throws Exception {
959 959 SmppSessionUtil.close(session);
960 960 }
961 961 }
  962 +
  963 +
  964 + @Test
  965 + public void synchronousSendButNeverGetResponse() throws Exception {
  966 + SmppSessionConfiguration configuration = createDefaultConfiguration();
  967 + registerServerBindProcessor();
  968 + clearAllServerSessions();
  969 +
  970 + // bind and get the simulator session
  971 + PollableSmppSessionHandler sessionHandler = new PollableSmppSessionHandler();
  972 + DefaultSmppSession session = (DefaultSmppSession)bootstrap.bind(configuration, sessionHandler);
  973 +
  974 + SmppSimulatorSessionHandler simulator0 = server.pollNextSession(1000);
  975 + simulator0.setPduProcessor(null);
  976 +
  977 + try {
  978 + try {
  979 + session.enquireLink(new EnquireLink(), 100);
  980 + // request should timeout
  981 + Assert.fail();
  982 + } catch (SmppTimeoutException e) {
  983 + // correct behavior
  984 + }
  985 +
  986 + // with a "synchronous" type of send, after a timeout, the request
  987 + // should have been cancelled
  988 + Assert.assertEquals(0, session.getSendWindow().getSize());
  989 +
  990 + /**
  991 + // send a response to a request that was NEVER sent
  992 + simulator0.sendPdu(el0Resp);
  993 +
  994 + // we should have received a PDU response
  995 + PduResponse pdu0 = sessionHandler.getReceivedUnexpectedPduResponses().poll(1000, TimeUnit.MILLISECONDS);
  996 + Assert.assertNotNull("Unable to receive unexpected PDU response -- perhaps it was routed incorrectly?", pdu0);
  997 + Assert.assertEquals(SmppConstants.CMD_ID_ENQUIRE_LINK_RESP, pdu0.getCommandId());
  998 + Assert.assertEquals(0, pdu0.getCommandStatus());
  999 + Assert.assertEquals(16, pdu0.getCommandLength());
  1000 + Assert.assertEquals(0x1000, pdu0.getSequenceNumber());
  1001 +
  1002 + Assert.assertEquals(0, sessionHandler.getReceivedPduRequests().size());
  1003 + Assert.assertEquals(0, sessionHandler.getReceivedExpectedPduResponses().size());
  1004 + Assert.assertEquals(0, sessionHandler.getReceivedUnexpectedPduResponses().size());
  1005 + */
  1006 + } finally {
  1007 + SmppSessionUtil.close(session);
  1008 + }
  1009 + }
962 1010
963 1011 }

0 comments on commit 04a9622

Please sign in to comment.
Something went wrong with that request. Please try again.