Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fixed peformance issue with a relatively high number of SmppServer se…

…ssions
  • Loading branch information...
commit 921f19f1103d29f668ec5d206e3804d03efcdb2e 1 parent ce1b815
@jjlauer jjlauer authored
View
3  Makefile
@@ -5,6 +5,9 @@ run-client:
run-server:
mvn -e test-compile exec:java -Dexec.classpathScope="test" -Dexec.mainClass="com.cloudhopper.smpp.demo.ServerMain"
+run-slow-server:
+ mvn -e test-compile exec:java -Dexec.classpathScope="test" -Dexec.mainClass="com.cloudhopper.smpp.demo.SlowServerMain"
+
run-simulator:
mvn -e test-compile exec:java -Dexec.classpathScope="test" -Dexec.mainClass="com.cloudhopper.smpp.demo.SimulatorMain"
View
20 ReleaseNotes.txt
@@ -8,7 +8,25 @@ SMPP Library
fix any specific problem)
* Upgraded joda-time dependency from 1.6 to 1.6.2 (to keep it current)
* Upgraded ch-commons-util dependency from 4.0 to 4.1
-
+ * Fixed a significant performance issue with the SmppServer having a total
+ session count greater than the number of processors on the server. Main
+ issue was with Netty's default "workerCount" value in its constructor.
+ The maximum number of expected concurrent sessions is recommended to be set
+ in the SmppServerConfiguration object. The new default is 100 max sessions.
+ * Added several other configuration options for an SmppServer:
+ maxConnections - max number of concurrent connections/sessions (default 100)
+ reuseAddress - whether to reuse the server socket (default is true)
+ nonBlockingSocketsEnabled - whether to use NIO (non blocking) or OIO
+ (old blocking) type server sockets. (default is true)
+ * Added an example of a "SlowSmppServer" to test what happens if an SMSC is
+ slow acknowledging submits.
+ * Added support for passing up a SmppChannelException on blocking calls to
+ bind, submit, etc. if the underlying channel is closed during a block.
+ The implementation requires checking the RequestWindow if any callers are
+ waiting and immediately cancelling those requests and setting the "cause" to
+ a ClosedChannelException throwable.
+ * Added a warning to be output to the logger if the number of SMPP server
+ sessions exceeds the maxConnections set in the configuration object.
3.1 - 2011-03-14
* Removed debug logging from DeliveryReceipt parser.
View
62 src/main/java/com/cloudhopper/smpp/SmppServerConfiguration.java
@@ -32,6 +32,12 @@
private boolean autoNegotiateInterfaceVersion;
// smpp version the server supports
private byte interfaceVersion;
+ // max number of connections/sessions this server will expect to handle
+ // this number corrosponds to the number of worker threads handling reading
+ // data from sockets and the thread things will be processed under
+ private int maxConnections;
+ private boolean nonBlockingSocketsEnabled;
+ private boolean reuseAddress;
public SmppServerConfiguration() {
this.name = "SmppServer";
@@ -40,8 +46,54 @@ public SmppServerConfiguration() {
this.systemId = "cloudhopper";
this.autoNegotiateInterfaceVersion = true;
this.interfaceVersion = SmppConstants.VERSION_3_4;
+ this.maxConnections = 100;
+ this.nonBlockingSocketsEnabled = true;
+ this.reuseAddress = true;
}
+ public boolean isReuseAddress() {
+ return reuseAddress;
+ }
+
+ public void setReuseAddress(boolean reuseAddress) {
+ this.reuseAddress = reuseAddress;
+ }
+
+ public boolean isNonBlockingSocketsEnabled() {
+ return nonBlockingSocketsEnabled;
+ }
+
+ /**
+ * Sets if non-blocking (NIO) is used for this server. If true, then the
+ * NIO server socket is used for Netty, otherwise the normal blocking
+ * server socket will be used.
+ * @param nonBlockingSocketsEnabled True if enabled, otherwise false
+ */
+ public void setNonBlockingSocketsEnabled(boolean nonBlockingSocketsEnabled) {
+ this.nonBlockingSocketsEnabled = nonBlockingSocketsEnabled;
+ }
+
+ public int getMaxConnections() {
+ return maxConnections;
+ }
+
+ /**
+ * Set the maximum number of connections this server is configured to
+ * handle.
+ * @param maxConnections Max number of connections. Must be >= 1
+ */
+ public void setMaxConnections(int maxConnections) {
+ if (this.maxConnections < 1) {
+ throw new IllegalArgumentException("MaxConnections must be >= 1");
+ }
+ this.maxConnections = maxConnections;
+ }
+
+ /**
+ * Set the name of the server that is used for some logging and naming of
+ * threads. The default is "SmppServer".
+ * @param value The name of this server
+ */
public void setName(String value) {
this.name = value;
}
@@ -58,6 +110,12 @@ public void setPort(int port) {
this.port = port;
}
+ /**
+ * Set the amount of time to allow a connection to finish binding into the
+ * server before the server automatically closes the connection.
+ * @param value The number of milliseconds to wait for a bind to be established
+ * before a socket will be closed.
+ */
public void setBindTimeout(long value) {
this.bindTimeout = value;
}
@@ -66,6 +124,10 @@ public long getBindTimeout() {
return this.bindTimeout;
}
+ /**
+ * Set the system id that will be returned in a bind response.
+ * @param value The system id to return in a bind response
+ */
public void setSystemId(String value) {
this.systemId = value;
}
View
22 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppServer.java
@@ -45,6 +45,7 @@
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,12 +62,10 @@
private final SmppServerConfiguration configuration;
private final SmppServerHandler serverHandler;
private final PduTranscoder transcoder;
-
private ExecutorService bossThreadPool;
private ChannelFactory channelFactory;
private ServerBootstrap serverBootstrap;
private Channel serverChannel;
-
// shared instance of a timer background thread to close unbound channels
private final Timer bindTimer;
// shared instance of a session id generator (an atomic long)
@@ -83,10 +82,20 @@ public DefaultSmppServer(final SmppServerConfiguration configuration, ExecutorSe
this.serverHandler = serverHandler;
// we'll put the "boss" worker for a server in its own pool
this.bossThreadPool = Executors.newCachedThreadPool();
+
// a factory for creating channels (connections)
- this.channelFactory = new NioServerSocketChannelFactory(this.bossThreadPool, executors);
+ if (configuration.isNonBlockingSocketsEnabled()) {
+ this.channelFactory = new NioServerSocketChannelFactory(this.bossThreadPool, executors, configuration.getMaxConnections());
+ } else {
+ this.channelFactory = new OioServerSocketChannelFactory(this.bossThreadPool, executors);
+ }
+
// tie the server bootstrap to this server socket channel factory
this.serverBootstrap = new ServerBootstrap(this.channelFactory);
+
+ // set options for the server socket that are useful
+ this.serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
+
// we use the same default pipeline for all new channels - no need for a factory
this.serverConnector = new SmppServerConnector(channels, this);
this.serverBootstrap.getPipeline().addLast(SmppChannelConstants.PIPELINE_SERVER_CONNECTOR_NAME, this.serverConnector);
@@ -94,7 +103,7 @@ public DefaultSmppServer(final SmppServerConfiguration configuration, ExecutorSe
this.bindTimer = new Timer(configuration.getName() + "-BindTimer0", true);
// NOTE: this would permit us to customize the "transcoding" context for a server if needed
this.transcoder = new DefaultPduTranscoder(new DefaultPduTranscoderContext());
- this.sessionIdSequence = new AtomicLong(0);
+ this.sessionIdSequence = new AtomicLong(0);
}
public PduTranscoder getTranscoder() {
@@ -200,6 +209,11 @@ protected void createSession(Long sessionId, Channel channel, SmppSessionConfigu
channel.getPipeline().remove(SmppChannelConstants.PIPELINE_SESSION_WRAPPER_NAME);
channel.getPipeline().addLast(SmppChannelConstants.PIPELINE_SESSION_WRAPPER_NAME, new SmppSessionWrapper(session));
+ // check if the # of channels exceeds maxConnections
+ if (this.channels.size() > this.configuration.getMaxConnections()) {
+ logger.warn("The number of connections [{}] exceeds the configured maxConnections of [{}]", this.channels.size(), this.configuration.getMaxConnections());
+ }
+
// session created, now pass it upstream
this.serverHandler.sessionCreated(sessionId, session, preparedBindResponse);
}
View
31 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java
@@ -380,7 +380,7 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
// the request future may have a cause set that we want to unwrap
Throwable cause = requestFuture.getCause();
if (cause != null && cause instanceof ClosedChannelException) {
- throw new SmppChannelException("Channel was closed during bind", cause);
+ throw new SmppChannelException("Channel was closed after sending request, but before receiving response", cause);
} else {
throw new UnrecoverablePduException(e.getMessage(), e);
}
@@ -571,21 +571,20 @@ public void fireChannelClosed() {
}
// most of the time when a channel is closed, we don't necessarily want
- // to do anything special. however, when we're in the middle of a "BIND"
- // request and this happens, we need to break anything waiting!
- if (this.state.get() == STATE_BINDING) {
- if (this.requestWindow.getPendingSize() > 0) {
- logger.warn("In process of bind(), but channel closed and requestWindow has pending requests, probably is a BindRequest that needs cancelled");
- Map<Integer,WindowEntry<Integer,PduRequest,PduResponse>> requests = this.requestWindow.getPendingRequests();
- for (Integer key : requests.keySet()) {
- WindowEntry<Integer,PduRequest,PduResponse> entry = requests.get(key);
- if (entry.getRequest() instanceof BaseBind) {
- logger.warn("Found a BaseBind request in requestWindow, cancelling it");
- try {
- this.requestWindow.cancelRequest(key, new ClosedChannelException());
- } catch (Exception e) { }
- return;
- }
+ // to do anything special -- however when a caller is waiting for a response
+ // to a request and we know the channel closed, we should check for those
+ // specific requests and make sure to cancel them
+ if (this.requestWindow.getPendingSize() > 0) {
+ logger.warn("Channel closed and requestWindow has [{}] pending requests, some may need cancelled immediately", this.requestWindow.getPendingSize());
+ Map<Integer,WindowEntry<Integer,PduRequest,PduResponse>> requests = this.requestWindow.getPendingRequests();
+ for (Integer key : requests.keySet()) {
+ WindowEntry<Integer,PduRequest,PduResponse> entry = requests.get(key);
+ // is the caller waiting?
+ if (entry.getCallerStatus() == WindowEntry.CALLER_WAITING) {
+ logger.warn("Caller waiting on request [{}], cancelling it with a channel closed exception", key);
+ try {
+ this.requestWindow.cancelRequest(key, new ClosedChannelException());
+ } catch (Exception e) { }
}
}
}
View
1  src/main/java/com/cloudhopper/smpp/util/DaemonExecutors.java
@@ -31,6 +31,7 @@
*/
static public ExecutorService newCachedDaemonThreadPool() {
return Executors.newCachedThreadPool(new ThreadFactory() {
+ @Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
View
5 src/test/java/com/cloudhopper/smpp/demo/ClientMain.java
@@ -40,7 +40,6 @@
private static final Logger logger = LoggerFactory.getLogger(ClientMain.class);
static public void main(String[] args) throws Exception {
-
// a bootstrap can be shared (which will reused threads)
// THIS VERSION USES "DAEMON" threads by default
// SmppSessionBootstrap bootstrap = new SmppSessionBootstrap();
@@ -100,10 +99,10 @@ static public void main(String[] args) throws Exception {
submit0.setShortMessage(CharsetUtil.encode("Hello World" + i, CharsetUtil.CHARSET_GSM));
// SYNCHRONOUS TYPE...
- SubmitSmResp submitResp = session0.submit(submit0, 10000);
+ //SubmitSmResp submitResp = session0.submit(submit0, 10000);
// WINDOWED TYPE
- //session0.sendRequestPdu(submit0, 10000, false);
+ session0.sendRequestPdu(submit0, 10000, false);
}
long stopTime = System.currentTimeMillis();
View
4 src/test/java/com/cloudhopper/smpp/demo/ServerMain.java
@@ -36,10 +36,6 @@
private static final Logger logger = LoggerFactory.getLogger(ServerMain.class);
static public void main(String[] args) throws Exception {
-
- //ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
-
-
SmppServerConfiguration configuration = new SmppServerConfiguration();
configuration.setPort(2776);
View
93 src/test/java/com/cloudhopper/smpp/demo/SlowServerMain.java
@@ -0,0 +1,93 @@
+/**
+ * Copyright (C) 2011 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+ * file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed
+ * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package com.cloudhopper.smpp.demo;
+
+import com.cloudhopper.smpp.SmppServerConfiguration;
+import com.cloudhopper.smpp.SmppServerHandler;
+import com.cloudhopper.smpp.SmppServerSession;
+import com.cloudhopper.smpp.SmppSessionConfiguration;
+import com.cloudhopper.smpp.impl.DefaultSmppServer;
+import com.cloudhopper.smpp.impl.DefaultSmppSessionHandler;
+import com.cloudhopper.smpp.pdu.BaseBind;
+import com.cloudhopper.smpp.pdu.BaseBindResp;
+import com.cloudhopper.smpp.pdu.PduRequest;
+import com.cloudhopper.smpp.pdu.PduResponse;
+import com.cloudhopper.smpp.type.SmppProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author joelauer
+ */
+public class SlowServerMain {
+ private static final Logger logger = LoggerFactory.getLogger(SlowServerMain.class);
+
+ private static final long DELAY_BEFORE_RESPONSE = 3000;
+
+ static public void main(String[] args) throws Exception {
+ SmppServerConfiguration configuration = new SmppServerConfiguration();
+ configuration.setPort(2776);
+ configuration.setMaxConnections(10);
+ configuration.setNonBlockingSocketsEnabled(false);
+
+ DefaultSmppServer smppServer = new DefaultSmppServer(configuration, new DefaultSmppServerHandler());
+
+ logger.info("About to start SMPP server");
+ smppServer.start();
+ logger.info("SMPP server started");
+
+ System.out.println("Press any key to stop server");
+ System.in.read();
+
+ logger.info("SMPP server stopping");
+ smppServer.stop();
+ logger.info("SMPP server stopped");
+ }
+
+ public static class DefaultSmppServerHandler implements SmppServerHandler {
+ @Override
+ public void sessionBindRequested(Long sessionId, SmppSessionConfiguration sessionConfiguration, final BaseBind bindRequest) throws SmppProcessingException {
+ // this name actually shows up as thread context....
+ sessionConfiguration.setName("Application.SMPP." + sessionId);
+ }
+
+ @Override
+ public void sessionCreated(Long sessionId, SmppServerSession session, BaseBindResp preparedBindResponse) throws SmppProcessingException {
+ logger.info("Session created: {}", session);
+ // need to do something it now (flag we're ready)
+ session.serverReady(new SlowSmppSessionHandler());
+ }
+
+ @Override
+ public void sessionDestroyed(Long sessionId, SmppServerSession session) {
+ logger.info("Session destroyed: {}", session);
+ }
+
+ }
+
+ public static class SlowSmppSessionHandler extends DefaultSmppSessionHandler {
+ @Override
+ public PduResponse firePduRequestReceived(PduRequest pduRequest) {
+ try {
+ Thread.sleep(DELAY_BEFORE_RESPONSE);
+ } catch (Exception e) { }
+
+ // ignore for now (already logged)
+ return pduRequest.createResponse();
+ }
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.