Skip to content
Browse files

Added both session and server counters; added performance testing cli…

…ent demo
  • Loading branch information...
1 parent 85c03da commit 8414d69f4ef75afb213ffce8c0ba12a5a103979d @jjlauer jjlauer committed May 5, 2011
View
3 Makefile
@@ -2,6 +2,9 @@
run-client:
mvn -e test-compile exec:java -Dexec.classpathScope="test" -Dexec.mainClass="com.cloudhopper.smpp.demo.ClientMain"
+run-performance-client:
+ mvn -e test-compile exec:java -Dexec.classpathScope="test" -Dexec.mainClass="com.cloudhopper.smpp.demo.PerformanceClientMain"
+
run-server:
mvn -e test-compile exec:java -Dexec.classpathScope="test" -Dexec.mainClass="com.cloudhopper.smpp.demo.ServerMain"
View
1 ReleaseNotes.txt
@@ -4,6 +4,7 @@ http://www.cloudhopper.com/
SMPP Library
--------------------------------------------------------------------------------
4.0 - 2011-05-03
+ * Upgraded ch-commons-util dependency from 4.1 to 5.0
* WARN: There are some minor source and binary backwards compatability issues
that were unavoidable to add request expiry as a new feature.
* Automatic request expiration is no longer experimental. See the new
View
8 src/main/java/com/cloudhopper/smpp/SmppServer.java
@@ -23,10 +23,12 @@
*/
public interface SmppServer {
- void start();
+ public void start();
- void stop();
+ public void stop();
- ChannelGroup getChannels();
+ public ChannelGroup getChannels();
+
+ public SmppServerCounters getCounters();
}
View
13 src/main/java/com/cloudhopper/smpp/SmppServerConfiguration.java
@@ -43,6 +43,7 @@
private long defaultWindowWaitTimeout = SmppConstants.DEFAULT_WINDOW_WAIT_TIMEOUT;
private long defaultRequestExpiryTimeout = SmppConstants.DEFAULT_REQUEST_EXPIRY_TIMEOUT;
private long defaultWindowMonitorInterval = SmppConstants.DEFAULT_WINDOW_MONITOR_INTERVAL;
+ private boolean defaultSessionCountersEnabled = false;
public SmppServerConfiguration() {
this.name = "SmppServer";
@@ -198,6 +199,14 @@ public long getDefaultWindowWaitTimeout() {
public void setDefaultWindowWaitTimeout(long defaultWindowWaitTimeout) {
this.defaultWindowWaitTimeout = defaultWindowWaitTimeout;
- }
-
+ }
+
+ public boolean isDefaultSessionCountersEnabled() {
+ return defaultSessionCountersEnabled;
+ }
+
+ public void setDefaultSessionCountersEnabled(boolean defaultSessionCountersEnabled) {
+ this.defaultSessionCountersEnabled = defaultSessionCountersEnabled;
+ }
+
}
View
39 src/main/java/com/cloudhopper/smpp/SmppServerCounters.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2011 Twitter, Inc..
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudhopper.smpp;
+
+/**
+ * Interface defining the counters that will be tracked for an SMPP server.
+ *
+ * @author joelauer (twitter: @jjlauer or <a href="http://twitter.com/jjlauer" target=window>http://twitter.com/jjlauer</a>)
+ */
+public interface SmppServerCounters {
+
+ public void reset();
+
+ public int getChannelConnects();
+
+ public int getChannelDisconnects();
+
+ public int getBindTimeouts();
+
+ public int getBindRequested();
+
+ public int getSessionCreated();
+
+ public int getSessionDestroyed();
+
+}
View
14 src/main/java/com/cloudhopper/smpp/SmppSession.java
@@ -164,7 +164,7 @@
/**
* @deprecated
- * @see #getRequestWindow
+ * @see #getSendWindow()
*/
public Window<Integer,PduRequest,PduResponse> getRequestWindow();
@@ -176,6 +176,18 @@
* @return The request "window"
*/
public Window<Integer,PduRequest,PduResponse> getSendWindow();
+
+ /**
+ * Returns true if and only if this session has counters enabled.
+ * @return True if the session has counters
+ */
+ public boolean hasCounters();
+
+ /**
+ * Gets the counters this session is tracking.
+ * @return The session counters or null if counters are disabled.
+ */
+ public SmppSessionCounters getCounters();
/**
* Immediately close the session by closing the underlying socket/channel.
View
12 src/main/java/com/cloudhopper/smpp/SmppSessionConfiguration.java
@@ -14,6 +14,7 @@
package com.cloudhopper.smpp;
+import com.cloudhopper.smpp.impl.DefaultSmppSessionCounters;
import com.cloudhopper.smpp.type.SmppConnectionConfiguration;
import com.cloudhopper.smpp.type.LoggingOptions;
import com.cloudhopper.smpp.type.Address;
@@ -42,6 +43,7 @@
// if > 0, then activated
private long requestExpiryTimeout;
private long windowMonitorInterval;
+ private boolean countersEnabled;
public SmppSessionConfiguration() {
this(SmppBindType.TRANSCEIVER, null, null, null);
@@ -63,6 +65,7 @@ public SmppSessionConfiguration(SmppBindType type, String systemId, String passw
this.windowWaitTimeout = SmppConstants.DEFAULT_WINDOW_WAIT_TIMEOUT;
this.requestExpiryTimeout = SmppConstants.DEFAULT_REQUEST_EXPIRY_TIMEOUT;
this.windowMonitorInterval = SmppConstants.DEFAULT_WINDOW_MONITOR_INTERVAL;
+ this.countersEnabled = false;
}
public void setName(String value) {
@@ -188,4 +191,13 @@ public long getWindowMonitorInterval() {
public void setWindowMonitorInterval(long windowMonitorInterval) {
this.windowMonitorInterval = windowMonitorInterval;
}
+
+ public boolean isCountersEnabled() {
+ return countersEnabled;
+ }
+
+ public void setCountersEnabled(boolean countersEnabled) {
+ this.countersEnabled = countersEnabled;
+ }
+
}
View
45 src/main/java/com/cloudhopper/smpp/SmppSessionCounters.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2011 Twitter, Inc..
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudhopper.smpp;
+
+import com.cloudhopper.smpp.util.ConcurrentCommandCounter;
+
+/**
+ * Interface defining the counters that can be optionally tracked for an SMPP session.
+ *
+ * @author joelauer (twitter: @jjlauer or <a href="http://twitter.com/jjlauer" target=window>http://twitter.com/jjlauer</a>)
+ */
+public interface SmppSessionCounters {
+
+ public void reset();
+
+ public ConcurrentCommandCounter getRxDataSM();
+
+ public ConcurrentCommandCounter getRxDeliverSM();
+
+ public ConcurrentCommandCounter getRxEnquireLink();
+
+ public ConcurrentCommandCounter getRxSubmitSM();
+
+ public ConcurrentCommandCounter getTxDataSM();
+
+ public ConcurrentCommandCounter getTxDeliverSM();
+
+ public ConcurrentCommandCounter getTxEnquireLink();
+
+ public ConcurrentCommandCounter getTxSubmitSM();
+
+}
View
2 src/main/java/com/cloudhopper/smpp/channel/SmppServerConnector.java
@@ -51,6 +51,7 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr
// always add it to our channel group
channels.add(channel);
+ this.server.getCounters().incrementChannelConnectsAndGet();
// create a default "unbound" thread name for the thread processing the channel
// this will create a name of "RemoteIPAddress.RemotePort"
@@ -78,6 +79,7 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// called every time a channel disconnects
channels.remove(e.getChannel());
+ this.server.getCounters().incrementChannelDisconnectsAndGet();
}
}
View
10 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppClient.java
@@ -46,7 +46,9 @@
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +63,7 @@
private ChannelGroup channels;
private SmppClientConnector clientConnector;
private ExecutorService executors;
- private NioClientSocketChannelFactory channelFactory;
+ private ClientSocketChannelFactory channelFactory;
private ClientBootstrap clientBootstrap;
private ScheduledExecutorService monitorExecutor;
@@ -122,7 +124,11 @@ public DefaultSmppClient(ExecutorService executors, int expectedSessions) {
public DefaultSmppClient(ExecutorService executors, int expectedSessions, ScheduledExecutorService monitorExecutor) {
this.channels = new DefaultChannelGroup();
this.executors = executors;
- this.channelFactory = new NioClientSocketChannelFactory(this.executors, this.executors, expectedSessions);
+
+
+// this.channelFactory = new NioClientSocketChannelFactory(this.executors, this.executors, expectedSessions);
+ this.channelFactory = new OioClientSocketChannelFactory(this.executors);
+
this.clientBootstrap = new ClientBootstrap(channelFactory);
// we use the same default pipeline for all new channels - no need for a factory
this.clientConnector = new SmppClientConnector(this.channels);
View
10 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppServer.java
@@ -72,6 +72,7 @@
private final AtomicLong sessionIdSequence;
// shared instance for monitor executors
private final ScheduledExecutorService monitorExecutor;
+ private DefaultSmppServerCounters counters;
/**
* Creates a new default SmppServer. Window monitoring and automatic
@@ -143,6 +144,7 @@ public DefaultSmppServer(final SmppServerConfiguration configuration, SmppServer
this.transcoder = new DefaultPduTranscoder(new DefaultPduTranscoderContext());
this.sessionIdSequence = new AtomicLong(0);
this.monitorExecutor = monitorExecutor;
+ this.counters = new DefaultSmppServerCounters();
}
public PduTranscoder getTranscoder() {
@@ -157,6 +159,11 @@ public ChannelGroup getChannels() {
public SmppServerConfiguration getConfiguration() {
return this.configuration;
}
+
+ @Override
+ public DefaultSmppServerCounters getCounters() {
+ return this.counters;
+ }
public Timer getBindTimer() {
return this.bindTimer;
@@ -211,6 +218,7 @@ protected BaseBindResp createBindResponse(BaseBind bindRequest, int statusCode)
}
protected void bindRequested(Long sessionId, SmppSessionConfiguration config, BaseBind bindRequest) throws SmppProcessingException {
+ counters.incrementBindRequestedAndGet();
// delegate request upstream to server handler
this.serverHandler.sessionBindRequested(sessionId, config, bindRequest);
}
@@ -254,12 +262,14 @@ protected void createSession(Long sessionId, Channel channel, SmppSessionConfigu
}
// session created, now pass it upstream
+ counters.incrementSessionCreatedAndGet();
this.serverHandler.sessionCreated(sessionId, session, preparedBindResponse);
}
protected void destroySession(Long sessionId, DefaultSmppSession session) {
// session destroyed, now pass it upstream
+ counters.incrementSessionDestroyedAndGet();
serverHandler.sessionDestroyed(sessionId, session);
}
}
View
126 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppServerCounters.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2011 Twitter, Inc..
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudhopper.smpp.impl;
+
+import com.cloudhopper.smpp.SmppServerCounters;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Default implementation of a SmppSessionCounters interface.
+ *
+ * @author joelauer (twitter: @jjlauer or <a href="http://twitter.com/jjlauer" target=window>http://twitter.com/jjlauer</a>)
+ */
+public class DefaultSmppServerCounters implements SmppServerCounters {
+
+ private AtomicInteger channelConnects;
+ private AtomicInteger channelDisconnects;
+ private AtomicInteger bindTimeouts;
+ private AtomicInteger bindRequested;
+ private AtomicInteger sessionCreated;
+ private AtomicInteger sessionDestroyed;
+
+ public DefaultSmppServerCounters() {
+ this.channelConnects = new AtomicInteger(0);
+ this.channelDisconnects = new AtomicInteger(0);
+ this.bindTimeouts = new AtomicInteger(0);
+ this.bindRequested = new AtomicInteger(0);
+ this.sessionCreated = new AtomicInteger(0);
+ this.sessionDestroyed = new AtomicInteger(0);
+ }
+
+ @Override
+ public void reset() {
+ this.channelConnects.set(0);
+ this.channelDisconnects.set(0);
+ this.bindTimeouts.set(0);
+ this.bindRequested.set(0);
+ this.sessionCreated.set(0);
+ this.sessionDestroyed.set(0);
+ }
+
+ @Override
+ public int getChannelConnects() {
+ return this.channelConnects.get();
+ }
+
+ public int incrementChannelConnectsAndGet() {
+ return this.channelConnects.incrementAndGet();
+ }
+
+ @Override
+ public int getChannelDisconnects() {
+ return this.channelDisconnects.get();
+ }
+
+ public int incrementChannelDisconnectsAndGet() {
+ return this.channelDisconnects.incrementAndGet();
+ }
+
+ @Override
+ public int getBindTimeouts() {
+ return this.bindTimeouts.get();
+ }
+
+ public int incrementBindTimeoutsAndGet() {
+ return this.bindTimeouts.incrementAndGet();
+ }
+
+ @Override
+ public int getBindRequested() {
+ return this.bindRequested.get();
+ }
+
+ public int incrementBindRequestedAndGet() {
+ return this.bindRequested.incrementAndGet();
+ }
+
+ @Override
+ public int getSessionCreated() {
+ return this.sessionCreated.get();
+ }
+
+ public int incrementSessionCreatedAndGet() {
+ return this.sessionCreated.incrementAndGet();
+ }
+
+ @Override
+ public int getSessionDestroyed() {
+ return this.sessionDestroyed.get();
+ }
+
+ public int incrementSessionDestroyedAndGet() {
+ return this.sessionDestroyed.incrementAndGet();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder to = new StringBuilder();
+ to.append("[channelConnects=");
+ to.append(getChannelConnects());
+ to.append(" channelDisconnects=");
+ to.append(getChannelDisconnects());
+ to.append(" bindTimeouts=");
+ to.append(getBindTimeouts());
+ to.append(" bindRequested=");
+ to.append(getBindRequested());
+ to.append(" sessionCreated=");
+ to.append(getSessionCreated());
+ to.append(" sessionDestroyed=");
+ to.append(getSessionDestroyed());
+ to.append("]");
+ return to.toString();
+ }
+}
View
185 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java
@@ -24,6 +24,7 @@
import com.cloudhopper.smpp.SmppServerSession;
import com.cloudhopper.smpp.type.SmppChannelException;
import com.cloudhopper.smpp.SmppSessionConfiguration;
+import com.cloudhopper.smpp.SmppSessionCounters;
import com.cloudhopper.smpp.SmppSessionHandler;
import com.cloudhopper.smpp.type.SmppTimeoutException;
import com.cloudhopper.smpp.pdu.BaseBind;
@@ -85,6 +86,7 @@
// pre-prepared BindResponse to send back once we're flagged as ready
private BaseBindResp preparedBindResponse;
private ScheduledExecutorService monitorExecutor;
+ private DefaultSmppSessionCounters counters;
/**
* Creates an SmppSession for a server-based session.
@@ -157,6 +159,9 @@ public DefaultSmppSession(Type localType, SmppSessionConfiguration configuration
this.server = null;
this.serverSessionId = null;
this.preparedBindResponse = null;
+ if (configuration.isCountersEnabled()) {
+ this.counters = new DefaultSmppSessionCounters();
+ }
}
@@ -264,6 +269,16 @@ protected PduTranscoder getTranscoder() {
public Window<Integer,PduRequest,PduResponse> getSendWindow() {
return this.sendWindow;
}
+
+ @Override
+ public boolean hasCounters() {
+ return (this.counters != null);
+ }
+
+ @Override
+ public SmppSessionCounters getCounters() {
+ return this.counters;
+ }
@Override
public void serverReady(SmppSessionHandler sessionHandler) {
@@ -378,6 +393,9 @@ public void close(long timeoutInMillis) {
public void shutdown() {
close();
this.sendWindow.freeExternalResources();
+ if (this.counters != null) {
+ this.counters.reset();
+ }
}
@Override
@@ -473,6 +491,8 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
// the write failed, make sure to throw an exception
throw new SmppChannelException(channelFuture.getCause().getMessage(), channelFuture.getCause());
}
+
+ this.countSendRequestPdu(pdu);
return future;
}
@@ -522,10 +542,18 @@ public void firePduReceived(Pdu pdu) {
if (pdu instanceof PduRequest) {
// process this request and allow the handler to return a result
PduRequest requestPdu = (PduRequest)pdu;
+
+ this.countReceiveRequestPdu(requestPdu);
+
+ long startTime = System.currentTimeMillis();
PduResponse responsePdu = this.sessionHandler.firePduRequestReceived(requestPdu);
+
// if the handler returned a non-null object, then we need to send it back on the channel
if (responsePdu != null) {
try {
+ long responseTime = System.currentTimeMillis() - startTime;
+ this.countSendResponsePdu(responsePdu, responseTime);
+
this.sendResponsePdu(responsePdu);
} catch (Exception e) {
logger.error("Unable to cleanly return response PDU: {}", e);
@@ -535,33 +563,41 @@ public void firePduReceived(Pdu pdu) {
// this is a response -- we need to check if its "expected" or "unexpected"
PduResponse responsePdu = (PduResponse)pdu;
int receivedPduSeqNum = pdu.getSequenceNumber();
-
+
try {
// see if a correlating request exists in the window
WindowFuture<Integer,PduRequest,PduResponse> future = this.sendWindow.complete(receivedPduSeqNum, responsePdu);
if (future != null) {
logger.trace("Found a future in the window for seqNum [{}]", receivedPduSeqNum);
+ this.countReceiveResponsePdu(responsePdu, future.getOfferToAcceptTime(), future.getAcceptToDoneTime());
+
// if this isn't null, we found a match to a request
int callerStateHint = future.getCallerStateHint();
- logger.trace("IsCallerWaiting? " + future.isCallerWaiting() + " callerStateHint=" + callerStateHint);
+ //logger.trace("IsCallerWaiting? " + future.isCallerWaiting() + " callerStateHint=" + callerStateHint);
if (callerStateHint == WindowFuture.CALLER_WAITING) {
- logger.trace("Going to just return for {}", future.getRequest());
+ logger.trace("Caller waiting for request: {}", future.getRequest());
// if a caller is waiting, nothing extra needs done as calling thread will handle the response
return;
} else if (callerStateHint == WindowFuture.CALLER_NOT_WAITING) {
- logger.trace("Going to fireExpectedPduResponseReceived for {}", future.getRequest());
+ logger.trace("Caller not waiting for request: {}", future.getRequest());
// this was an "expected" response - wrap it into an async response
this.sessionHandler.fireExpectedPduResponseReceived(new DefaultPduAsyncResponse(future));
return;
+ } else {
+ logger.trace("Caller timed out waiting for request: {}", future.getRequest());
+ // we send the request, but caller gave up on it awhile ago
+ this.sessionHandler.fireUnexpectedPduResponseReceived(responsePdu);
}
+ } else {
+ this.countReceiveResponsePdu(responsePdu, 0, 0);
+
+ // original request either expired OR was completely unexpected
+ this.sessionHandler.fireUnexpectedPduResponseReceived(responsePdu);
}
} catch (InterruptedException e) {
logger.warn("Interrupted while attempting to process response PDU and match it to a request via requesWindow: ", e);
// do nothing, continue processing
}
-
- // if we get here, this response was "unexpected"
- this.sessionHandler.fireUnexpectedPduResponseReceived(responsePdu);
}
}
@@ -621,7 +657,142 @@ public void fireChannelClosed() {
@Override
public void expired(WindowFuture<Integer, PduRequest, PduResponse> future) {
+ this.countSendRequestPduExpired(future.getRequest());
this.sessionHandler.firePduRequestExpired(future.getRequest());
}
+ private void countSendRequestPdu(PduRequest pdu) {
+ if (this.counters == null) {
+ return; // noop
+ }
+
+ if (pdu.isRequest()) {
+ switch (pdu.getCommandId()) {
+ case SmppConstants.CMD_ID_SUBMIT_SM:
+ this.counters.getTxSubmitSM().incrementRequestAndGet();
+ break;
+ case SmppConstants.CMD_ID_DELIVER_SM:
+ this.counters.getTxDeliverSM().incrementRequestAndGet();
+ break;
+ case SmppConstants.CMD_ID_DATA_SM:
+ this.counters.getTxDataSM().incrementRequestAndGet();
+ break;
+ case SmppConstants.CMD_ID_ENQUIRE_LINK:
+ this.counters.getTxEnquireLink().incrementRequestAndGet();
+ break;
+ }
+ }
+ }
+
+ private void countSendResponsePdu(PduResponse pdu, long responseTime) {
+ if (this.counters == null) {
+ return; // noop
+ }
+
+ if (pdu.isResponse()) {
+ switch (pdu.getCommandId()) {
+ case SmppConstants.CMD_ID_SUBMIT_SM_RESP:
+ this.counters.getRxSubmitSM().incrementResponseAndGet();
+ this.counters.getRxSubmitSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getRxSubmitSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
+ break;
+ case SmppConstants.CMD_ID_DELIVER_SM_RESP:
+ this.counters.getRxDeliverSM().incrementResponseAndGet();
+ this.counters.getRxDeliverSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getRxDeliverSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
+ break;
+ case SmppConstants.CMD_ID_DATA_SM_RESP:
+ this.counters.getRxDataSM().incrementResponseAndGet();
+ this.counters.getRxDataSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getRxDataSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
+ break;
+ case SmppConstants.CMD_ID_ENQUIRE_LINK_RESP:
+ this.counters.getRxEnquireLink().incrementResponseAndGet();
+ this.counters.getRxEnquireLink().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getRxEnquireLink().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
+ break;
+ }
+ }
+ }
+
+ private void countSendRequestPduExpired(PduRequest pdu) {
+ if (this.counters == null) {
+ return; // noop
+ }
+
+ if (pdu.isRequest()) {
+ switch (pdu.getCommandId()) {
+ case SmppConstants.CMD_ID_SUBMIT_SM:
+ this.counters.getTxSubmitSM().incrementRequestExpiredAndGet();
+ break;
+ case SmppConstants.CMD_ID_DELIVER_SM:
+ this.counters.getTxDeliverSM().incrementRequestExpiredAndGet();
+ break;
+ case SmppConstants.CMD_ID_DATA_SM:
+ this.counters.getTxDataSM().incrementRequestExpiredAndGet();
+ break;
+ case SmppConstants.CMD_ID_ENQUIRE_LINK:
+ this.counters.getTxEnquireLink().incrementRequestExpiredAndGet();
+ break;
+ }
+ }
+ }
+
+ private void countReceiveRequestPdu(PduRequest pdu) {
+ if (this.counters == null) {
+ return; // noop
+ }
+
+ if (pdu.isRequest()) {
+ switch (pdu.getCommandId()) {
+ case SmppConstants.CMD_ID_SUBMIT_SM:
+ this.counters.getRxSubmitSM().incrementRequestAndGet();
+ break;
+ case SmppConstants.CMD_ID_DELIVER_SM:
+ this.counters.getRxDeliverSM().incrementRequestAndGet();
+ break;
+ case SmppConstants.CMD_ID_DATA_SM:
+ this.counters.getRxDataSM().incrementRequestAndGet();
+ break;
+ case SmppConstants.CMD_ID_ENQUIRE_LINK:
+ this.counters.getRxEnquireLink().incrementRequestAndGet();
+ break;
+ }
+ }
+ }
+
+ private void countReceiveResponsePdu(PduResponse pdu, long waitTime, long responseTime) {
+ if (this.counters == null) {
+ return; // noop
+ }
+
+ if (pdu.isResponse()) {
+ switch (pdu.getCommandId()) {
+ case SmppConstants.CMD_ID_SUBMIT_SM_RESP:
+ this.counters.getTxSubmitSM().incrementResponseAndGet();
+ this.counters.getTxSubmitSM().addRequestWaitTimeAndGet(waitTime);
+ this.counters.getTxSubmitSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getTxSubmitSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
+ break;
+ case SmppConstants.CMD_ID_DELIVER_SM_RESP:
+ this.counters.getTxDeliverSM().incrementResponseAndGet();
+ this.counters.getTxDeliverSM().addRequestWaitTimeAndGet(waitTime);
+ this.counters.getTxDeliverSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getTxDeliverSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
+ break;
+ case SmppConstants.CMD_ID_DATA_SM_RESP:
+ this.counters.getTxDataSM().incrementResponseAndGet();
+ this.counters.getTxDataSM().addRequestWaitTimeAndGet(waitTime);
+ this.counters.getTxDataSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getTxDataSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
+ break;
+ case SmppConstants.CMD_ID_ENQUIRE_LINK_RESP:
+ this.counters.getTxEnquireLink().incrementResponseAndGet();
+ this.counters.getTxEnquireLink().addRequestWaitTimeAndGet(waitTime);
+ this.counters.getTxEnquireLink().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getTxEnquireLink().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
+ break;
+ }
+ }
+ }
}
View
100 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSessionCounters.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2011 Twitter, Inc..
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudhopper.smpp.impl;
+
+import com.cloudhopper.smpp.SmppSessionCounters;
+import com.cloudhopper.smpp.util.ConcurrentCommandCounter;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Default implementation of a SmppServerCounters interface.
+ *
+ * @author joelauer (twitter: @jjlauer or <a href="http://twitter.com/jjlauer" target=window>http://twitter.com/jjlauer</a>)
+ */
+public class DefaultSmppSessionCounters implements SmppSessionCounters {
+
+ private ConcurrentCommandCounter txSubmitSM;
+ private ConcurrentCommandCounter txDeliverSM;
+ private ConcurrentCommandCounter txEnquireLink;
+ private ConcurrentCommandCounter txDataSM;
+ private ConcurrentCommandCounter rxSubmitSM;
+ private ConcurrentCommandCounter rxDeliverSM;
+ private ConcurrentCommandCounter rxEnquireLink;
+ private ConcurrentCommandCounter rxDataSM;
+
+ public DefaultSmppSessionCounters() {
+ this.txSubmitSM = new ConcurrentCommandCounter();
+ this.txDeliverSM = new ConcurrentCommandCounter();
+ this.txEnquireLink = new ConcurrentCommandCounter();
+ this.txDataSM = new ConcurrentCommandCounter();
+ this.rxSubmitSM = new ConcurrentCommandCounter();
+ this.rxDeliverSM = new ConcurrentCommandCounter();
+ this.rxEnquireLink = new ConcurrentCommandCounter();
+ this.rxDataSM = new ConcurrentCommandCounter();
+ }
+
+ @Override
+ public void reset() {
+ this.txSubmitSM.reset();
+ this.txDeliverSM.reset();
+ this.txEnquireLink.reset();
+ this.txDataSM.reset();
+ this.rxSubmitSM.reset();
+ this.rxDeliverSM.reset();
+ this.rxEnquireLink.reset();
+ this.rxDataSM.reset();
+ }
+
+ @Override
+ public ConcurrentCommandCounter getRxDataSM() {
+ return rxDataSM;
+ }
+
+ @Override
+ public ConcurrentCommandCounter getRxDeliverSM() {
+ return rxDeliverSM;
+ }
+
+ @Override
+ public ConcurrentCommandCounter getRxEnquireLink() {
+ return rxEnquireLink;
+ }
+
+ @Override
+ public ConcurrentCommandCounter getRxSubmitSM() {
+ return rxSubmitSM;
+ }
+
+ @Override
+ public ConcurrentCommandCounter getTxDataSM() {
+ return txDataSM;
+ }
+
+ @Override
+ public ConcurrentCommandCounter getTxDeliverSM() {
+ return txDeliverSM;
+ }
+
+ @Override
+ public ConcurrentCommandCounter getTxEnquireLink() {
+ return txEnquireLink;
+ }
+
+ @Override
+ public ConcurrentCommandCounter getTxSubmitSM() {
+ return txSubmitSM;
+ }
+}
View
2 src/main/java/com/cloudhopper/smpp/impl/UnboundSmppSession.java
@@ -172,6 +172,7 @@ protected SmppSessionConfiguration createSessionConfiguration(BaseBind bindReque
sessionConfiguration.setWindowWaitTimeout(server.getConfiguration().getDefaultWindowWaitTimeout());
sessionConfiguration.setWindowMonitorInterval(server.getConfiguration().getDefaultWindowMonitorInterval());
sessionConfiguration.setRequestExpiryTimeout(server.getConfiguration().getDefaultRequestExpiryTimeout());
+ sessionConfiguration.setCountersEnabled(server.getConfiguration().isDefaultSessionCountersEnabled());
return sessionConfiguration;
}
@@ -206,6 +207,7 @@ public void run() {
logger.warn("Channel not bound within [{}] ms, closing connection [{}]", server.getConfiguration().getBindTimeout(), channelName);
channel.close();
this.cancel();
+ server.getCounters().incrementBindTimeoutsAndGet();
}
}
}
View
139 src/main/java/com/cloudhopper/smpp/util/ConcurrentCommandCounter.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2011 Twitter, Inc..
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudhopper.smpp.util;
+
+import com.cloudhopper.commons.util.DecimalUtil;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ * @author joelauer
+ */
+public class ConcurrentCommandCounter {
+
+ private AtomicInteger request;
+ private AtomicInteger requestExpired;
+ private AtomicLong requestWaitTime;
+ private AtomicLong requestResponseTime;
+ private AtomicInteger response;
+ private ConcurrentCommandStatusCounter responseCommandStatusCounter;
+
+ public ConcurrentCommandCounter() {
+ this.request = new AtomicInteger(0);
+ this.requestExpired = new AtomicInteger(0);
+ this.requestWaitTime = new AtomicLong(0);
+ this.requestResponseTime = new AtomicLong(0);
+ this.response = new AtomicInteger(0);
+ this.responseCommandStatusCounter = new ConcurrentCommandStatusCounter();
+ }
+
+ public ConcurrentCommandCounter(int request, int requestExpired, long requestWaitTime, long requestResponseTime, int response, final ConcurrentCommandStatusCounter responseCommandStatusCounter) {
+ this.request = new AtomicInteger(request);
+ this.requestExpired = new AtomicInteger(requestExpired);
+ this.requestWaitTime = new AtomicLong(requestWaitTime);
+ this.requestResponseTime = new AtomicLong(requestResponseTime);
+ this.response = new AtomicInteger(response);
+ this.responseCommandStatusCounter = responseCommandStatusCounter.copy();
+ }
+
+ public void reset() {
+ this.request.set(0);
+ this.requestExpired.set(0);
+ this.requestWaitTime.set(0);
+ this.requestResponseTime.set(0);
+ this.response.set(0);
+ this.responseCommandStatusCounter.reset();
+ }
+
+ public ConcurrentCommandCounter createSnapshot() {
+ return new ConcurrentCommandCounter(request.get(), requestExpired.get(), requestWaitTime.get(), requestResponseTime.get(), response.get(), responseCommandStatusCounter);
+ }
+
+ public int getRequest() {
+ return this.request.get();
+ }
+
+ public int incrementRequestAndGet() {
+ return this.request.incrementAndGet();
+ }
+
+ public int getRequestExpired() {
+ return this.requestExpired.get();
+ }
+
+ public int incrementRequestExpiredAndGet() {
+ return this.requestExpired.incrementAndGet();
+ }
+
+ public long getRequestWaitTime() {
+ return this.requestWaitTime.get();
+ }
+
+ public long addRequestWaitTimeAndGet(long waitTime) {
+ return this.requestWaitTime.addAndGet(waitTime);
+ }
+
+ public long getRequestResponseTime() {
+ return this.requestResponseTime.get();
+ }
+
+ public long addRequestResponseTimeAndGet(long responseTime) {
+ return this.requestResponseTime.addAndGet(responseTime);
+ }
+
+ public int getResponse() {
+ return this.response.get();
+ }
+
+ public int incrementResponseAndGet() {
+ return this.response.incrementAndGet();
+ }
+
+ public ConcurrentCommandStatusCounter getResponseCommandStatusCounter() {
+ return this.responseCommandStatusCounter;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder to = new StringBuilder();
+ to.append("[request=");
+ to.append(getRequest());
+ to.append(" expired=");
+ to.append(getRequestExpired());
+ to.append(" response=");
+ to.append(getResponse());
+
+ to.append(" avgWaitTime=");
+ double avgWaitTime = 0;
+ if (getResponse() > 0) {
+ avgWaitTime = (double)getRequestWaitTime()/(double)getResponse();
+ }
+ to.append(DecimalUtil.toString(avgWaitTime, 1));
+
+ to.append("ms avgResponseTime=");
+ double avgResponseTime = 0;
+ if (getResponse() > 0) {
+ avgResponseTime = (double)getRequestResponseTime()/(double)getResponse();
+ }
+ to.append(DecimalUtil.toString(avgResponseTime, 1));
+
+ to.append("ms cmdStatus=[");
+ to.append(this.responseCommandStatusCounter.toString());
+ to.append("]]");
+ return to.toString();
+ }
+}
View
97 src/main/java/com/cloudhopper/smpp/util/ConcurrentCommandStatusCounter.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2011 Twitter, Inc..
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudhopper.smpp.util;
+
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ * @author joelauer
+ */
+public class ConcurrentCommandStatusCounter {
+
+ private ConcurrentHashMap<Integer,AtomicInteger> map;
+
+ public ConcurrentCommandStatusCounter() {
+ this.map = new ConcurrentHashMap<Integer,AtomicInteger>();
+ }
+
+ public void reset() {
+ this.map.clear();
+ }
+
+ public ConcurrentCommandStatusCounter copy() {
+ ConcurrentCommandStatusCounter copy = new ConcurrentCommandStatusCounter();
+ for (Map.Entry<Integer,AtomicInteger> entry : this.map.entrySet()) {
+ this.map.put(entry.getKey(), new AtomicInteger(entry.getValue().get()));
+ }
+ return copy;
+ }
+
+ public int get(int commandStatus) {
+ Integer key = new Integer(commandStatus);
+ AtomicInteger val = map.get(key);
+ if (val == null) {
+ return -1;
+ } else {
+ return val.get();
+ }
+ }
+
+ public int incrementAndGet(int commandStatus) {
+ Integer key = new Integer(commandStatus);
+ AtomicInteger val = map.get(key);
+ if (val == null) {
+ val = new AtomicInteger(0);
+ map.put(key, val);
+ }
+ return val.incrementAndGet();
+ }
+
+ public SortedMap<Integer,Integer> createSortedMapSnapshot() {
+ SortedMap<Integer,Integer> sortedMap = new TreeMap<Integer,Integer>();
+ for (Map.Entry<Integer,AtomicInteger> entry : this.map.entrySet()) {
+ sortedMap.put(entry.getKey(), new Integer(entry.getValue().get()));
+ }
+ return sortedMap;
+ }
+
+ @Override
+ public String toString() {
+ if (this.map.isEmpty()) {
+ return "";
+ }
+
+ SortedMap<Integer,Integer> sortedMap = createSortedMapSnapshot();
+
+ StringBuilder to = new StringBuilder();
+ for (Map.Entry<Integer,Integer> entry : sortedMap.entrySet()) {
+ if (to.length() != 0) {
+ to.append(" ");
+ }
+ to.append(entry.getKey());
+ to.append("=");
+ to.append(entry.getValue());
+ }
+
+ return to.toString();
+ }
+
+}
View
14 src/test/java/com/cloudhopper/smpp/demo/ClientMain.java
@@ -98,6 +98,7 @@ public Thread newThread(Runnable r) {
// to enable monitoring (request expiration)
config0.setRequestExpiryTimeout(30000);
config0.setWindowMonitorInterval(15000);
+ config0.setCountersEnabled(true);
//
// create session, enquire link, submit an sms, close session
@@ -160,7 +161,18 @@ public Thread newThread(Runnable r) {
}
if (session0 != null) {
- logger.info("Cleaning up session...");
+ logger.info("Cleaning up session... (final counters)");
+ if (session0.hasCounters()) {
+ logger.info("tx-enquireLink: {}", session0.getCounters().getTxEnquireLink());
+ logger.info("tx-submitSM: {}", session0.getCounters().getTxSubmitSM());
+ logger.info("tx-deliverSM: {}", session0.getCounters().getTxDeliverSM());
+ logger.info("tx-dataSM: {}", session0.getCounters().getTxDataSM());
+ logger.info("rx-enquireLink: {}", session0.getCounters().getRxEnquireLink());
+ logger.info("rx-submitSM: {}", session0.getCounters().getRxSubmitSM());
+ logger.info("rx-deliverSM: {}", session0.getCounters().getRxDeliverSM());
+ logger.info("rx-dataSM: {}", session0.getCounters().getRxDataSM());
+ }
+
session0.shutdown();
// alternatively, could call close(), get outstanding requests from
// the sendWindow (if we wanted to retry them later), then call shutdown()
View
281 src/test/java/com/cloudhopper/smpp/demo/PerformanceClientMain.java
@@ -0,0 +1,281 @@
+/**
+ * Copyright (C) 2011 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+ * file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.cloudhopper.smpp.demo;
+
+import com.cloudhopper.commons.charset.CharsetUtil;
+import com.cloudhopper.commons.util.DecimalUtil;
+import com.cloudhopper.smpp.PduAsyncResponse;
+import com.cloudhopper.smpp.SmppSessionConfiguration;
+import com.cloudhopper.smpp.SmppBindType;
+import com.cloudhopper.smpp.SmppSession;
+import com.cloudhopper.smpp.impl.DefaultSmppClient;
+import com.cloudhopper.smpp.impl.DefaultSmppSessionHandler;
+import com.cloudhopper.smpp.type.Address;
+import com.cloudhopper.smpp.pdu.SubmitSm;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author joelauer (twitter: @jjlauer or <a href="http://twitter.com/jjlauer" target=window>http://twitter.com/jjlauer</a>)
+ */
+public class PerformanceClientMain {
+ private static final Logger logger = LoggerFactory.getLogger(PerformanceClientMain.class);
+
+ //
+ // performance testing options (just for this sample)
+ //
+ // total number of sessions (conns) to create
+ static public final int SESSION_COUNT = 10;
+ // size of window per session
+ static public final int WINDOW_SIZE = 4;
+ // total number of submit to send total across all sessions
+ static public final int SUBMIT_TO_SEND = 2000;
+ // total number of submit sent
+ static public final AtomicInteger SUBMIT_SENT = new AtomicInteger(0);
+
+ static public void main(String[] args) throws Exception {
+ //
+ // setup 3 things required for any session we plan on creating
+ //
+
+ // for monitoring thread use, it's preferable to create your own instance
+ // of an executor with Executors.newCachedThreadPool() and cast it to ThreadPoolExecutor
+ // this permits exposing thinks like executor.getActiveCount() via JMX possible
+ // no point renaming the threads in a factory since underlying Netty
+ // framework does not easily allow you to customize your thread names
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
+
+ // to enable automatic expiration of requests, a second scheduled executor
+ // is required which is what a monitor task will be executed with - this
+ // is probably a thread pool that can be shared with between all client bootstraps
+ ScheduledThreadPoolExecutor monitorExecutor = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ private AtomicInteger sequence = new AtomicInteger(0);
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("SmppClientSessionWindowMonitorPool-" + sequence.getAndIncrement());
+ return t;
+ }
+ });
+
+ // a single instance of a client bootstrap can technically be shared
+ // between any sessions that are created (a session can go to any different
+ // number of SMSCs) - each session created under
+ // a client bootstrap will use the executor and monitorExecutor set
+ // in its constructor - just be *very* careful with the "expectedSessions"
+ // value to make sure it matches the actual number of total concurrent
+ // open sessions you plan on handling - the underlying netty library
+ // used for NIO sockets essentially uses this value as the max number of
+ // threads it will ever use, despite the "max pool size", etc. set on
+ // the executor passed in here
+ DefaultSmppClient clientBootstrap = new DefaultSmppClient(Executors.newCachedThreadPool(), SESSION_COUNT, monitorExecutor);
+
+ // same configuration for each client runner
+ SmppSessionConfiguration config = new SmppSessionConfiguration();
+ config.setWindowSize(WINDOW_SIZE);
+ config.setName("Tester.Session.0");
+ config.setType(SmppBindType.TRANSCEIVER);
+ config.setHost("127.0.0.1");
+ config.setPort(2776);
+ config.setConnectTimeout(10000);
+ config.setSystemId("1234567890");
+ config.setPassword("password");
+ config.getLoggingOptions().setLogBytes(false);
+ // to enable monitoring (request expiration)
+ config.setRequestExpiryTimeout(30000);
+ config.setWindowMonitorInterval(15000);
+ config.setCountersEnabled(true);
+
+ // various latches used to signal when things are ready
+ CountDownLatch allSessionsBoundSignal = new CountDownLatch(SESSION_COUNT);
+ CountDownLatch startSendingSignal = new CountDownLatch(1);
+
+ // create all session runners and executors to run them
+ ThreadPoolExecutor taskExecutor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
+ ClientSessionTask[] tasks = new ClientSessionTask[SESSION_COUNT];
+ for (int i = 0; i < SESSION_COUNT; i++) {
+ tasks[i] = new ClientSessionTask(allSessionsBoundSignal, startSendingSignal, clientBootstrap, config);
+ taskExecutor.submit(tasks[i]);
+ }
+
+ // wait for all sessions to bind
+ logger.info("Waiting up to 7 seconds for all sessions to bind...");
+ if (!allSessionsBoundSignal.await(7000, TimeUnit.MILLISECONDS)) {
+ throw new Exception("One or more sessions were unable to bind, cancelling test");
+ }
+
+ logger.info("Sending signal to start test...");
+ long startTimeMillis = System.currentTimeMillis();
+ startSendingSignal.countDown();
+
+ // wait for all tasks to finish
+ taskExecutor.shutdown();
+ taskExecutor.awaitTermination(3, TimeUnit.DAYS);
+ long stopTimeMillis = System.currentTimeMillis();
+
+ // did everything succeed?
+ int actualSubmitSent = 0;
+ int sessionFailures = 0;
+ for (int i = 0; i < SESSION_COUNT; i++) {
+ if (tasks[i].getCause() != null) {
+ sessionFailures++;
+ logger.error("Task #" + i + " failed with exception: " + tasks[i].getCause());
+ } else {
+ actualSubmitSent += tasks[i].getSubmitRequestSent();
+ }
+ }
+
+ logger.info("Performance client finished:");
+ logger.info(" Sessions: " + SESSION_COUNT);
+ logger.info("Sessions Failed: " + sessionFailures);
+ logger.info(" Time: " + (stopTimeMillis - startTimeMillis) + " ms");
+ logger.info(" Target Submit: " + SUBMIT_TO_SEND);
+ logger.info(" Actual Submit: " + actualSubmitSent);
+ double throughput = (double)actualSubmitSent/((double)(stopTimeMillis - startTimeMillis)/(double)1000);
+ logger.info(" Throughput: " + DecimalUtil.toString(throughput, 3) + " per sec");
+
+ for (int i = 0; i < SESSION_COUNT; i++) {
+ if (tasks[i].session != null && tasks[i].session.hasCounters()) {
+ logger.info(" Session " + i + ": submitSM {}", tasks[i].session.getCounters().getTxSubmitSM());
+ }
+ }
+
+ // this is required to not causing server to hang from non-daemon threads
+ // this also makes sure all open Channels are closed to I *think*
+ logger.info("Shutting down client bootstrap and executors...");
+ clientBootstrap.shutdown();
+ executor.shutdownNow();
+ monitorExecutor.shutdownNow();
+
+ logger.info("Done. Exiting");
+ }
+
+
+ public static class ClientSessionTask implements Runnable {
+
+ private SmppSession session;
+ private CountDownLatch allSessionsBoundSignal;
+ private CountDownLatch startSendingSignal;
+ private DefaultSmppClient clientBootstrap;
+ private SmppSessionConfiguration config;
+ private int submitRequestSent;
+ private int submitResponseReceived;
+ private AtomicBoolean sendingDone;
+ private Exception cause;
+
+ public ClientSessionTask(CountDownLatch allSessionsBoundSignal, CountDownLatch startSendingSignal, DefaultSmppClient clientBootstrap, SmppSessionConfiguration config) {
+ this.allSessionsBoundSignal = allSessionsBoundSignal;
+ this.startSendingSignal = startSendingSignal;
+ this.clientBootstrap = clientBootstrap;
+ this.config = config;
+ this.submitRequestSent = 0;
+ this.submitResponseReceived = 0;
+ this.sendingDone = new AtomicBoolean(false);
+ }
+
+ public Exception getCause() {
+ return this.cause;
+ }
+
+ public int getSubmitRequestSent() {
+ return this.submitRequestSent;
+ }
+
+ @Override
+ public void run() {
+ // a countdownlatch will be used to eventually wait for all responses
+ // to be received by this thread since we don't want to exit too early
+ CountDownLatch allSubmitResponseReceivedSignal = new CountDownLatch(1);
+
+ DefaultSmppSessionHandler sessionHandler = new ClientSmppSessionHandler(allSubmitResponseReceivedSignal);
+ String text160 = "\u20AC Lorem [ipsum] dolor sit amet, consectetur adipiscing elit. Proin feugiat, leo id commodo tincidunt, nibh diam ornare est, vitae accumsan risus lacus sed sem metus.";
+ byte[] textBytes = CharsetUtil.encode(text160, CharsetUtil.CHARSET_GSM);
+
+ try {
+ // create session a session by having the bootstrap connect a
+ // socket, send the bind request, and wait for a bind response
+ session = clientBootstrap.bind(config, sessionHandler);
+
+ // don't start sending until signalled
+ allSessionsBoundSignal.countDown();
+ startSendingSignal.await();
+
+ // all threads compete for processing
+ while (SUBMIT_SENT.getAndIncrement() < SUBMIT_TO_SEND) {
+ SubmitSm submit = new SubmitSm();
+ submit.setSourceAddress(new Address((byte)0x03, (byte)0x00, "40404"));
+ submit.setDestAddress(new Address((byte)0x01, (byte)0x01, "44555519205"));
+ submit.setShortMessage(textBytes);
+ // asynchronous send
+ this.submitRequestSent++;
+ sendingDone.set(true);
+ session.sendRequestPdu(submit, 30000, false);
+ }
+
+ // all threads have sent all submit, we do need to wait for
+ // an acknowledgement for all "inflight" though (synchronize
+ // against the window)
+ logger.info("before waiting sendWindow.size: {}", session.getSendWindow().getSize());
+
+ allSubmitResponseReceivedSignal.await();
+
+ logger.info("after waiting sendWindow.size: {}", session.getSendWindow().getSize());
+
+ session.unbind(5000);
+ } catch (Exception e) {
+ logger.error("", e);
+ this.cause = e;
+ }
+ }
+
+ class ClientSmppSessionHandler extends DefaultSmppSessionHandler {
+
+ private CountDownLatch allSubmitResponseReceivedSignal;
+
+ public ClientSmppSessionHandler(CountDownLatch allSubmitResponseReceivedSignal) {
+ super(logger);
+ this.allSubmitResponseReceivedSignal = allSubmitResponseReceivedSignal;
+ }
+
+ @Override
+ public void fireChannelUnexpectedlyClosed() {
+ // this is an error we didn't really expect for perf testing
+ // its best to at least countDown the latch so we're not waiting forever
+ logger.error("Unexpected close occurred...");
+ this.allSubmitResponseReceivedSignal.countDown();
+ }
+
+ @Override
+ public void fireExpectedPduResponseReceived(PduAsyncResponse pduAsyncResponse) {
+ submitResponseReceived++;
+ // if the sending thread is finished, check if we're done
+ if (sendingDone.get()) {
+ if (submitResponseReceived >= submitRequestSent) {
+ this.allSubmitResponseReceivedSignal.countDown();
+ }
+ }
+ }
+ }
+ }
+}
View
35 src/test/java/com/cloudhopper/smpp/demo/ServerMain.java
@@ -17,6 +17,7 @@
import com.cloudhopper.smpp.SmppServerConfiguration;
import com.cloudhopper.smpp.SmppServerHandler;
import com.cloudhopper.smpp.SmppServerSession;
+import com.cloudhopper.smpp.SmppSession;
import com.cloudhopper.smpp.SmppSessionConfiguration;
import com.cloudhopper.smpp.impl.DefaultSmppServer;
import com.cloudhopper.smpp.impl.DefaultSmppSessionHandler;
@@ -25,6 +26,7 @@
import com.cloudhopper.smpp.pdu.PduRequest;
import com.cloudhopper.smpp.pdu.PduResponse;
import com.cloudhopper.smpp.type.SmppProcessingException;
+import java.lang.ref.WeakReference;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -69,10 +71,12 @@ public Thread newThread(Runnable r) {
SmppServerConfiguration configuration = new SmppServerConfiguration();
configuration.setPort(2776);
configuration.setMaxConnections(10);
+ configuration.setNonBlockingSocketsEnabled(true);
configuration.setDefaultRequestExpiryTimeout(30000);
configuration.setDefaultWindowMonitorInterval(15000);
configuration.setDefaultWindowSize(5);
configuration.setDefaultWindowWaitTimeout(configuration.getDefaultRequestExpiryTimeout());
+ configuration.setDefaultSessionCountersEnabled(true);
// create a server, start it up
DefaultSmppServer smppServer = new DefaultSmppServer(configuration, new DefaultSmppServerHandler(), executor, monitorExecutor);
@@ -87,13 +91,14 @@ public Thread newThread(Runnable r) {
logger.info("Stopping SMPP server...");
smppServer.stop();
logger.info("SMPP server stopped");
+
+ logger.info("Final server counters: {}", smppServer.getCounters());
}
public static class DefaultSmppServerHandler implements SmppServerHandler {
@Override
public void sessionBindRequested(Long sessionId, SmppSessionConfiguration sessionConfiguration, final BaseBind bindRequest) throws SmppProcessingException {
-
// test name change of sessions
// this name actually shows up as thread context....
sessionConfiguration.setName("Application.SMPP." + sessionConfiguration.getSystemId());
@@ -105,19 +110,45 @@ public void sessionBindRequested(Long sessionId, SmppSessionConfiguration sessio
public void sessionCreated(Long sessionId, SmppServerSession session, BaseBindResp preparedBindResponse) throws SmppProcessingException {
logger.info("Session created: {}", session);
// need to do something it now (flag we're ready)
- session.serverReady(new TestSmppSessionHandler());
+ session.serverReady(new TestSmppSessionHandler(session));
}
@Override
public void sessionDestroyed(Long sessionId, SmppServerSession session) {
logger.info("Session destroyed: {}", session);
+ // print out final stats
+ if (session.hasCounters()) {
+ logger.info(" final session rx-submitSM: {}", session.getCounters().getRxSubmitSM());
+ }
+
+ // make sure it's really shutdown
+ session.shutdown();
}
}
public static class TestSmppSessionHandler extends DefaultSmppSessionHandler {
+
+ private WeakReference<SmppSession> sessionRef;
+
+ public TestSmppSessionHandler(SmppSession session) {
+ this.sessionRef = new WeakReference<SmppSession>(session);
+ }
+
@Override
public PduResponse firePduRequestReceived(PduRequest pduRequest) {
+ SmppSession session = sessionRef.get();
+
+ // mimic how long processing could take on a slower smsc
+ try {
+ //Thread.sleep(50);
+ } catch (Exception e) { }
+
+ if (session != null) {
+ //logger.debug("rx-enquireLink: {}", session.getCounters().getRxEnquireLink());
+ //logger.debug("rx-submitSM: {}", session.getCounters().getRxSubmitSM());
+ }
+
// ignore for now (already logged)
return pduRequest.createResponse();
}

0 comments on commit 8414d69

Please sign in to comment.
Something went wrong with that request. Please try again.