Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

bug fixes and enhanchments to some java interfaces #2

Merged
merged 12 commits into from

2 participants

@stefanth

No description provided.

stefanth added some commits
@stefanth stefanth Fix failure message in StringValidationTest unit test (incorrect leng…
…th indicated)
09378ad
@stefanth stefanth Add methods to ClientSession interface so that it can be used as a us…
…eful SMPP session. This allows creating objects that behave like SMPP sessions without subclassing the heavy-weight AbstractSession or SMPPSession classes.
d43b2b6
@stefanth stefanth Raise ProcessRequestException in fireAcceptDeliverSm() if there is no…
… MessageReceiverListener set, otherwise the smpp server will receive an ok for a message that was not correctly delivered
d16eb67
@stefanth stefanth Use readFully() instead of read() for reading in DefaultPDUReader.rea…
…dPDU(). Under some network conditions, it is possible for the code to receive a partial package over the socket and erronously throw an IOException for the socket. The specs for read() don't guarantee that the full requested length is read in one call. Using readFully() fixes that problem.
8296063
@stefanth stefanth the method onStateChange() in an SessionStateListener interface alway…
…s receives a session, so the passed object can be a Session instead of a generic Object
0483393
@stefanth stefanth Retrieve state processor when task is run instead of when enqueued.
Change PDUProcessTask so that state processor is retrieved when task is actually executed, not when task is enqueued. This solves a race condition where the bind state changes while a task is enqueued
1255aee
@stefanth stefanth Fix race condition when session is being bound
When an smpp session is bound and there are messages waiting on the
smsc side, it is legal for the smsc to send a DeliverSM PDU immediately
after it sends a bind response. Because of the PduProcessorDegree,
there is a race condition were both the BindResp and the DeliverSM can
simultaniously be processed but the DeliverSM will be processed like
the session is still in the open state instead of the bound state. This
patch changes the size of the executorService pool to remove the race
condition. It starts with size 1 in open state and when the session is
bound, the pool will be enlarged to the PduProcessorDegree.
c61adc4
@stefanth stefanth Refactored EnquireLinkTimer thread into AbstractSession
There was duplicate code in SMPPSession and SMPPServerSession that should live in AbstractSession.
75593bd
@stefanth stefanth Wait for EnquireLink thread to exit when closing
After the smpp session has been closed with close() method, we shouldn't leave any threads running. Any extra threads should have exited before Session.close() exits, otherwise many webapp containers will complain during webapp shutdown or webapp restart
30167dd
@stefanth stefanth Give EnquireLinkSender thread a name 3b5be56
@stefanth stefanth PDUReaderWorker waits for termination of executor service 6e80cf7
@stefanth stefanth Give PDUReaderWorker thread a name 022ca7d
@uudashr
Owner

That is nice... but what is the purpose of waiting termination...

The idea was that the PDUReaderWorker started the thread pool so it should make sure that it closes it before exiting. However, there is a missing link in the code because nobody is currently waiting for the PDUReaderWorkder to exit when the session is closed. The close method of the session should wait for the PDUReaderWorker to exit before the close method exits. I still haven't implemented how the close() method should wait for that temination so I don't have a commit for that.

@uudashr uudashr merged commit 75a2c16 into uudashr:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 14, 2012
  1. @stefanth
  2. @stefanth

    Add methods to ClientSession interface so that it can be used as a us…

    stefanth authored
    …eful SMPP session. This allows creating objects that behave like SMPP sessions without subclassing the heavy-weight AbstractSession or SMPPSession classes.
  3. @stefanth

    Raise ProcessRequestException in fireAcceptDeliverSm() if there is no…

    stefanth authored
    … MessageReceiverListener set, otherwise the smpp server will receive an ok for a message that was not correctly delivered
  4. @stefanth

    Use readFully() instead of read() for reading in DefaultPDUReader.rea…

    stefanth authored
    …dPDU(). Under some network conditions, it is possible for the code to receive a partial package over the socket and erronously throw an IOException for the socket. The specs for read() don't guarantee that the full requested length is read in one call. Using readFully() fixes that problem.
Commits on Feb 15, 2012
  1. @stefanth

    the method onStateChange() in an SessionStateListener interface alway…

    stefanth authored
    …s receives a session, so the passed object can be a Session instead of a generic Object
  2. @stefanth

    Retrieve state processor when task is run instead of when enqueued.

    stefanth authored
    Change PDUProcessTask so that state processor is retrieved when task is actually executed, not when task is enqueued. This solves a race condition where the bind state changes while a task is enqueued
  3. @stefanth

    Fix race condition when session is being bound

    stefanth authored
    When an smpp session is bound and there are messages waiting on the
    smsc side, it is legal for the smsc to send a DeliverSM PDU immediately
    after it sends a bind response. Because of the PduProcessorDegree,
    there is a race condition were both the BindResp and the DeliverSM can
    simultaniously be processed but the DeliverSM will be processed like
    the session is still in the open state instead of the bound state. This
    patch changes the size of the executorService pool to remove the race
    condition. It starts with size 1 in open state and when the session is
    bound, the pool will be enlarged to the PduProcessorDegree.
Commits on Feb 16, 2012
  1. @stefanth

    Refactored EnquireLinkTimer thread into AbstractSession

    stefanth authored
    There was duplicate code in SMPPSession and SMPPServerSession that should live in AbstractSession.
  2. @stefanth

    Wait for EnquireLink thread to exit when closing

    stefanth authored
    After the smpp session has been closed with close() method, we shouldn't leave any threads running. Any extra threads should have exited before Session.close() exits, otherwise many webapp containers will complain during webapp shutdown or webapp restart
  3. @stefanth
  4. @stefanth
  5. @stefanth
This page is out of date. Refresh to see the latest.
View
2  jsmpp-examples/src/main/java/org/jsmpp/examples/StressServer.java
@@ -129,7 +129,7 @@ public void onAcceptReplaceSm(ReplaceSm replaceSm, SMPPServerSession source)
private class SessionStateListenerImpl implements SessionStateListener {
public void onStateChange(SessionState newState, SessionState oldState,
- Object source) {
+ Session source) {
SMPPServerSession session = (SMPPServerSession)source;
logger.info("New state of " + session.getSessionId() + " is " + newState);
}
View
3  jsmpp-examples/src/main/java/org/jsmpp/examples/SubmitMultipartMultilangualExample.java
@@ -36,6 +36,7 @@
import org.jsmpp.extra.SessionState;
import org.jsmpp.session.BindParameter;
import org.jsmpp.session.SMPPSession;
+import org.jsmpp.session.Session;
import org.jsmpp.session.SessionStateListener;
/**
@@ -51,7 +52,7 @@
private static final int MAX_SINGLE_MSG_SEGMENT_SIZE_7BIT = 160;
private class SessionStateListenerImpl implements SessionStateListener {
- public void onStateChange(SessionState newState, SessionState oldState, Object source) {
+ public void onStateChange(SessionState newState, SessionState oldState, Session source) {
System.out.println("Session state changed from " + oldState + " to " + newState);
}
}
View
3  jsmpp-examples/src/main/java/org/jsmpp/examples/gateway/AutoReconnectGateway.java
@@ -31,6 +31,7 @@
import org.jsmpp.extra.SessionState;
import org.jsmpp.session.BindParameter;
import org.jsmpp.session.SMPPSession;
+import org.jsmpp.session.Session;
import org.jsmpp.session.SessionStateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,7 +161,7 @@ public void run() {
*/
private class SessionStateListenerImpl implements SessionStateListener {
public void onStateChange(SessionState newState, SessionState oldState,
- Object source) {
+ Session source) {
if (newState.equals(SessionState.CLOSED)) {
logger.info("Session closed");
reconnectAfter(reconnectInterval);
View
14 jsmpp/src/main/java/org/jsmpp/DefaultPDUReader.java
@@ -16,7 +16,6 @@
import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
import org.jsmpp.bean.Command;
import org.jsmpp.util.OctetUtil;
@@ -57,7 +56,7 @@ public Command readPDUHeader(DataInputStream in)
/* (non-Javadoc)
* @see org.jsmpp.PDUReader#readPDU(java.io.InputStream, org.jsmpp.bean.Command)
*/
- public byte[] readPDU(InputStream in, Command pduHeader) throws IOException {
+ public byte[] readPDU(DataInputStream in, Command pduHeader) throws IOException {
return readPDU(in, pduHeader.getCommandLength(), pduHeader
.getCommandId(), pduHeader.getCommandStatus(), pduHeader
.getSequenceNumber());
@@ -66,7 +65,7 @@ public Command readPDUHeader(DataInputStream in)
/* (non-Javadoc)
* @see org.jsmpp.PDUReader#readPDU(java.io.InputStream, int, int, int, int)
*/
- public byte[] readPDU(InputStream in, int commandLength, int commandId,
+ public byte[] readPDU(DataInputStream in, int commandLength, int commandId,
int commandStatus, int sequenceNumber) throws IOException {
byte[] b = new byte[commandLength];
@@ -76,15 +75,8 @@ public Command readPDUHeader(DataInputStream in)
System.arraycopy(OctetUtil.intToBytes(sequenceNumber), 0, b, 12, 4);
if (commandLength > 16) {
- int len = commandLength - 16;
- int totalReaded = -1;
synchronized (in) {
- totalReaded = in.read(b, 16, commandLength - 16);
- }
- if (totalReaded != len) {
- throw new IOException(
- "Unexpected length of byte readed. Expecting " + len
- + " but only read " + totalReaded);
+ in.readFully(b, 16, commandLength - 16);
}
}
return b;
View
4 jsmpp/src/main/java/org/jsmpp/PDUReader.java
@@ -52,7 +52,7 @@ public Command readPDUHeader(DataInputStream in)
* @return the complete byte of smpp command.
* @throws IOException if an I/O error occurs.
*/
- public byte[] readPDU(InputStream in, Command pduHeader) throws IOException;
+ public byte[] readPDU(DataInputStream in, Command pduHeader) throws IOException;
/**
* Read all smpp pdu (excluding the command header) with specified pdu
@@ -66,7 +66,7 @@ public Command readPDUHeader(DataInputStream in)
* @return the complete byte of smpp command.
* @throws IOException if an I/O error occurs.
*/
- public byte[] readPDU(InputStream in, int commandLength, int commandId,
+ public byte[] readPDU(DataInputStream in, int commandLength, int commandId,
int commandStatus, int sequenceNumber) throws IOException;
}
View
4 jsmpp/src/main/java/org/jsmpp/SynchronizedPDUReader.java
@@ -56,7 +56,7 @@ public SynchronizedPDUReader(PDUReader pduReader) {
* @see org.jsmpp.PDUReader#readPDU(java.io.InputStream,
* org.jsmpp.bean.Command)
*/
- public byte[] readPDU(InputStream in, Command pduHeader) throws IOException {
+ public byte[] readPDU(DataInputStream in, Command pduHeader) throws IOException {
synchronized (in) {
return pduReader.readPDU(in, pduHeader);
}
@@ -68,7 +68,7 @@ public SynchronizedPDUReader(PDUReader pduReader) {
* @see org.jsmpp.PDUReader#readPDU(java.io.InputStream, int, int,
* int, int)
*/
- public byte[] readPDU(InputStream in, int commandLength, int commandId,
+ public byte[] readPDU(DataInputStream in, int commandLength, int commandId,
int commandStatus, int sequenceNumber) throws IOException {
synchronized (in) {
return pduReader.readPDU(in, commandLength, commandId,
View
64 jsmpp/src/main/java/org/jsmpp/session/AbstractSession.java
@@ -18,6 +18,7 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.jsmpp.InvalidResponseException;
import org.jsmpp.PDUException;
@@ -60,6 +61,8 @@
private int enquireLinkTimer = 5000;
private long transactionTimer = 2000;
+ protected EnquireLinkSender enquireLinkSender;
+
public AbstractSession(PDUSender pduSender) {
this.pduSender = pduSender;
}
@@ -111,6 +114,10 @@ public SessionState getSessionState() {
return sessionContext().getSessionState();
}
+ protected synchronized boolean isReadPdu() {
+ return getSessionState().isBound() || getSessionState().equals(SessionState.OPEN);
+ }
+
public void addSessionStateListener(SessionStateListener l) {
if (l != null) {
sessionContext().addSessionStateListener(l);
@@ -201,6 +208,12 @@ public void close() {
} catch (IOException e) {
}
}
+
+ try {
+ enquireLinkSender.join();
+ } catch (InterruptedException e) {
+ logger.warn("interrupted while waiting for enquireLinkSender thread to exit");
+ }
}
/**
@@ -376,5 +389,56 @@ protected void ensureTransmittable(String activityName, boolean only) throws IOE
}
}
+ protected class EnquireLinkSender extends Thread {
+ private final AtomicBoolean sendingEnquireLink = new AtomicBoolean(false);
+
+ public EnquireLinkSender()
+ {
+ super("EnquireLinkSender: " + AbstractSession.this);
+ }
+
+ @Override
+ public void run() {
+ logger.info("Starting EnquireLinkSender");
+ while (isReadPdu()) {
+ while (!sendingEnquireLink.compareAndSet(true, false) && isReadPdu()) {
+ synchronized (sendingEnquireLink) {
+ try {
+ sendingEnquireLink.wait(500);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ if (!isReadPdu()) {
+ break;
+ }
+ try {
+ sendEnquireLink();
+ } catch (ResponseTimeoutException e) {
+ close();
+ } catch (InvalidResponseException e) {
+ // lets unbind gracefully
+ unbindAndClose();
+ } catch (IOException e) {
+ close();
+ }
+ }
+ logger.info("EnquireLinkSender stop");
+ }
+
+ /**
+ * This method will send enquire link asynchronously.
+ */
+ public void enquireLink() {
+ if (sendingEnquireLink.compareAndSet(false, true)) {
+ logger.debug("Sending enquire link notify");
+ synchronized (sendingEnquireLink) {
+ sendingEnquireLink.notify();
+ }
+ } else {
+ logger.debug("Not sending enquire link notify");
+ }
+ }
+ }
}
View
2  jsmpp/src/main/java/org/jsmpp/session/AbstractSessionContext.java
@@ -77,7 +77,7 @@ public void removeSessionStateListener(SessionStateListener l) {
}
protected void fireStateChanged(SessionState newState,
- SessionState oldState, Object source) {
+ SessionState oldState, Session source) {
synchronized (sessionStateListeners) {
for (SessionStateListener l : sessionStateListeners) {
l.onStateChange(newState, oldState, source);
View
82 jsmpp/src/main/java/org/jsmpp/session/ClientSession.java
@@ -19,6 +19,7 @@
import org.jsmpp.InvalidResponseException;
import org.jsmpp.PDUException;
import org.jsmpp.bean.Address;
+import org.jsmpp.bean.BindType;
import org.jsmpp.bean.DataCoding;
import org.jsmpp.bean.ESMClass;
import org.jsmpp.bean.NumberingPlanIndicator;
@@ -201,4 +202,85 @@ void replaceShortMessage(String messageId, TypeOfNumber sourceAddrTon,
RegisteredDelivery registeredDelivery, byte smDefaultMsgId,
byte[] shortMessage) throws PDUException, ResponseTimeoutException,
InvalidResponseException, NegativeResponseException, IOException;
+
+ /**
+ * Open connection and bind immediately. The default
+ * timeout is 1 minutes.
+ *
+ * @param host is the SMSC host address.
+ * @param port is the SMSC listen port.
+ * @param bindType is the bind type.
+ * @param systemId is the system id.
+ * @param password is the password.
+ * @param systemType is the system type.
+ * @param addrTon is the address TON.
+ * @param addrNpi is the address NPI.
+ * @param addressRange is the address range.
+ * @throws IOException if there is an IO error found.
+ */
+ void connectAndBind(String host, int port, BindType bindType,
+ String systemId, String password, String systemType,
+ TypeOfNumber addrTon, NumberingPlanIndicator addrNpi,
+ String addressRange) throws IOException;
+
+ /**
+ * Open connection and bind immediately with specified timeout. The default
+ * timeout is 1 minutes.
+ *
+ * @param host is the SMSC host address.
+ * @param port is the SMSC listen port.
+ * @param bindType is the bind type.
+ * @param systemId is the system id.
+ * @param password is the password.
+ * @param systemType is the system type.
+ * @param addrTon is the address TON.
+ * @param addrNpi is the address NPI.
+ * @param addressRange is the address range.
+ * @param timeout is the timeout.
+ * @throws IOException if there is an IO error found.
+ */
+ public void connectAndBind(String host, int port, BindType bindType,
+ String systemId, String password, String systemType,
+ TypeOfNumber addrTon, NumberingPlanIndicator addrNpi,
+ String addressRange, long timeout) throws IOException;
+
+ /**
+ * Open connection and bind immediately.
+ *
+ * @param host is the SMSC host address.
+ * @param port is the SMSC listen port.
+ * @param bindParam is the bind parameters.
+ * @return the SMSC system id.
+ * @throws IOException if there is an IO error found.
+ */
+ public String connectAndBind(String host, int port,
+ BindParameter bindParam)
+ throws IOException;
+
+ /**
+ * Open connection and bind immediately.
+ *
+ * @param host is the SMSC host address.
+ * @param port is the SMSC listen port.
+ * @param bindParam is the bind parameters.
+ * @param timeout is the timeout.
+ * @return the SMSC system id.
+ * @throws IOException if there is an IO error found.
+ */
+ public String connectAndBind(String host, int port,
+ BindParameter bindParam, long timeout)
+ throws IOException;
+
+ /**
+ * Get the current message receiver listener that is currently registered for this smpp session.
+ * @return The current message receiver listener
+ */
+ public MessageReceiverListener getMessageReceiverListener();
+
+ /**
+ * Sets a message receiver listener for this smpp session.
+ * @param messageReceiverListener is the new listener
+ */
+ public void setMessageReceiverListener(
+ MessageReceiverListener messageReceiverListener);
}
View
39 jsmpp/src/main/java/org/jsmpp/session/PDUProcessTask.java
@@ -18,7 +18,6 @@
import org.jsmpp.SMPPConstant;
import org.jsmpp.bean.Command;
-import org.jsmpp.session.state.SMPPSessionState;
/**
* @author uudashr
@@ -27,17 +26,17 @@
public class PDUProcessTask implements Runnable {
private final Command pduHeader;
private final byte[] pdu;
- private final SMPPSessionState stateProcessor;
+ private final SMPPSessionContext sessionContext;
private final ResponseHandler responseHandler;
private final ActivityNotifier activityNotifier;
private final Runnable onIOExceptionTask;
public PDUProcessTask(Command pduHeader, byte[] pdu,
- SMPPSessionState stateProcessor, ResponseHandler responseHandler,
+ SMPPSessionContext sessionContext, ResponseHandler responseHandler,
ActivityNotifier activityNotifier, Runnable onIOExceptionTask) {
this.pduHeader = pduHeader;
this.pdu = pdu;
- this.stateProcessor = stateProcessor;
+ this.sessionContext = sessionContext;
this.responseHandler = responseHandler;
this.activityNotifier = activityNotifier;
this.onIOExceptionTask = onIOExceptionTask;
@@ -50,66 +49,66 @@ public void run() {
case SMPPConstant.CID_BIND_TRANSMITTER_RESP:
case SMPPConstant.CID_BIND_TRANSCEIVER_RESP:
activityNotifier.notifyActivity();
- stateProcessor.processBindResp(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processBindResp(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_GENERIC_NACK:
activityNotifier.notifyActivity();
- stateProcessor.processGenericNack(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processGenericNack(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_ENQUIRE_LINK:
activityNotifier.notifyActivity();
- stateProcessor.processEnquireLink(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processEnquireLink(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_ENQUIRE_LINK_RESP:
activityNotifier.notifyActivity();
- stateProcessor.processEnquireLinkResp(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processEnquireLinkResp(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_SUBMIT_SM_RESP:
activityNotifier.notifyActivity();
- stateProcessor.processSubmitSmResp(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processSubmitSmResp(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_SUBMIT_MULTI_RESP:
activityNotifier.notifyActivity();
- stateProcessor.processSubmitMultiResp(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processSubmitMultiResp(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_QUERY_SM_RESP:
activityNotifier.notifyActivity();
- stateProcessor.processQuerySmResp(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processQuerySmResp(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_DELIVER_SM:
activityNotifier.notifyActivity();
- stateProcessor.processDeliverSm(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processDeliverSm(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_DATA_SM:
activityNotifier.notifyActivity();
- stateProcessor.processDataSm(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processDataSm(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_DATA_SM_RESP:
activityNotifier.notifyActivity();
- stateProcessor.processDataSmResp(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processDataSmResp(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_CANCEL_SM_RESP:
activityNotifier.notifyActivity();
- stateProcessor.processCancelSmResp(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processCancelSmResp(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_REPLACE_SM_RESP:
activityNotifier.notifyActivity();
- stateProcessor.processReplaceSmResp(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processReplaceSmResp(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_ALERT_NOTIFICATION:
activityNotifier.notifyActivity();
- stateProcessor.processAlertNotification(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processAlertNotification(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_UNBIND:
activityNotifier.notifyActivity();
- stateProcessor.processUnbind(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processUnbind(pduHeader, pdu, responseHandler);
break;
case SMPPConstant.CID_UNBIND_RESP:
activityNotifier.notifyActivity();
- stateProcessor.processUnbindResp(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processUnbindResp(pduHeader, pdu, responseHandler);
break;
default:
- stateProcessor.processUnknownCid(pduHeader, pdu, responseHandler);
+ sessionContext.getStateProcessor().processUnknownCid(pduHeader, pdu, responseHandler);
}
} catch (IOException e) {
onIOExceptionTask.run();
View
52 jsmpp/src/main/java/org/jsmpp/session/SMPPServerSession.java
@@ -22,7 +22,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.jsmpp.DefaultPDUReader;
import org.jsmpp.DefaultPDUSender;
@@ -79,7 +78,6 @@
private ServerMessageReceiverListener messageReceiverListener;
private ServerResponseDeliveryListener responseDeliveryListener;
- private final EnquireLinkSender enquireLinkSender;
private BindRequestReceiver bindRequestReceiver = new BindRequestReceiver(responseHandler);
public SMPPServerSession(Connection conn,
@@ -148,11 +146,6 @@ public BindRequest waitForBind(long timeout) throws IllegalStateException,
}
}
- private synchronized boolean isReadPdu() {
- SessionState sessionState = sessionContext.getSessionState();
- return sessionState.isBound() || sessionState.equals(SessionState.OPEN);
- }
-
public void deliverShortMessage(String serviceType,
TypeOfNumber sourceAddrTon, NumberingPlanIndicator sourceAddrNpi,
String sourceAddr, TypeOfNumber destAddrTon,
@@ -520,53 +513,10 @@ private void notifyNoActivity() {
}
}
- private class EnquireLinkSender extends Thread {
- private final AtomicBoolean sendingEnquireLink = new AtomicBoolean(false);
-
- @Override
- public void run() {
- logger.info("Starting EnquireLinkSender");
- while (isReadPdu()) {
- while (!sendingEnquireLink.compareAndSet(true, false) && isReadPdu()) {
- synchronized (sendingEnquireLink) {
- try {
- sendingEnquireLink.wait(500);
- } catch (InterruptedException e) {
- }
- }
- }
- if (!isReadPdu()) {
- break;
- }
- try {
- sendEnquireLink();
- } catch (ResponseTimeoutException e) {
- close();
- } catch (InvalidResponseException e) {
- // lets unbind gracefully
- unbindAndClose();
- } catch (IOException e) {
- close();
- }
- }
- logger.info("EnquireLinkSender stop");
- }
-
- /**
- * This method will send enquire link asynchronously.
- */
- public void enquireLink() {
- if (sendingEnquireLink.compareAndSet(false, true)) {
- synchronized (sendingEnquireLink) {
- sendingEnquireLink.notify();
- }
- }
- }
- }
private class BoundStateListener implements SessionStateListener {
public void onStateChange(SessionState newState, SessionState oldState,
- Object source) {
+ Session source) {
if (newState.isBound()) {
enquireLinkSender.start();
}
View
84 jsmpp/src/main/java/org/jsmpp/session/SMPPSession.java
@@ -20,7 +20,8 @@
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.jsmpp.DefaultPDUReader;
import org.jsmpp.DefaultPDUSender;
@@ -101,11 +102,11 @@
private DataInputStream in;
private OutputStream out;
+ private PDUReaderWorker pduReaderWorker;
private final ResponseHandler responseHandler = new ResponseHandlerImpl();
private MessageReceiverListener messageReceiverListener;
private BoundSessionStateListener sessionStateListener = new BoundSessionStateListener();
private SMPPSessionContext sessionContext = new SMPPSessionContext(this, sessionStateListener);
- private EnquireLinkSender enquireLinkSender;
/**
* Default constructor of {@link SMPPSession}. The next action might be
@@ -227,7 +228,8 @@ public String connectAndBind(String host, int port,
in = new DataInputStream(conn.getInputStream());
out = conn.getOutputStream();
- new PDUReaderWorker().start();
+ pduReaderWorker = new PDUReaderWorker();
+ pduReaderWorker.start();
String smscSystemId = sendBind(bindParam.getBindType(), bindParam.getSystemId(), bindParam.getPassword(), bindParam.getSystemType(),
InterfaceVersion.IF_34, bindParam.getAddrTon(), bindParam.getAddrNpi(), bindParam.getAddressRange(), timeout);
sessionContext.bound(bindParam.getBindType());
@@ -443,10 +445,6 @@ protected GenericMessageReceiverListener messageReceiverListener() {
return messageReceiverListener;
}
- private synchronized boolean isReadPdu() {
- return sessionContext.getSessionState().isBound() || sessionContext.getSessionState().equals(SessionState.OPEN);
- }
-
@Override
protected void finalize() throws Throwable {
close();
@@ -457,6 +455,7 @@ private void fireAcceptDeliverSm(DeliverSm deliverSm) throws ProcessRequestExcep
messageReceiverListener.onAcceptDeliverSm(deliverSm);
} else {
logger.warn("Receive deliver_sm but MessageReceiverListener is null. Short message = " + new String(deliverSm.getShortMessage()));
+ throw new ProcessRequestException("No message receiver listener registered", SMPPConstant.STAT_ESME_RX_T_APPN);
}
}
@@ -536,8 +535,13 @@ public void sendUnbindResp(int sequenceNumber) throws IOException {
*
*/
private class PDUReaderWorker extends Thread {
- private ExecutorService executorService = Executors.newFixedThreadPool(getPduProcessorDegree());
+ // start with serial execution of pdu processing, when the session is bound the pool will be enlarge up to the PduProcessorDegree
+ private ExecutorService executorService = Executors.newFixedThreadPool(1);
+ public PDUReaderWorker() {
+ super("PDUReaderWorker: " + SMPPSession.this);
+ }
+
private Runnable onIOExceptionTask = new Runnable() {
public void run() {
close();
@@ -546,12 +550,17 @@ public void run() {
@Override
public void run() {
- logger.info("Starting PDUReaderWorker with processor degree:{} ...", getPduProcessorDegree());
+ logger.info("Starting PDUReaderWorker");
while (isReadPdu()) {
readPDU();
}
close();
executorService.shutdown();
+ try {
+ executorService.awaitTermination(getTransactionTimer(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("interrupted while waiting for executor service pool to finish");
+ }
logger.info("PDUReaderWorker stop");
}
@@ -569,7 +578,7 @@ private void readPDU() {
* process it concurrently.
*/
PDUProcessTask task = new PDUProcessTask(pduHeader, pdu,
- sessionContext.getStateProcessor(), responseHandler,
+ sessionContext, responseHandler,
sessionContext, onIOExceptionTask);
executorService.execute(task);
@@ -598,54 +607,7 @@ private void notifyNoActivity() {
}
}
}
-
-
- private class EnquireLinkSender extends Thread {
- private final AtomicBoolean sendingEnquireLink = new AtomicBoolean(false);
-
- @Override
- public void run() {
- logger.info("Starting EnquireLinkSender");
- while (isReadPdu()) {
- while (!sendingEnquireLink.compareAndSet(true, false) && isReadPdu()) {
- synchronized (sendingEnquireLink) {
- try {
- sendingEnquireLink.wait(500);
- } catch (InterruptedException e) {
- }
- }
- }
- if (!isReadPdu()) {
- break;
- }
- try {
- sendEnquireLink();
- } catch (ResponseTimeoutException e) {
- close();
- } catch (InvalidResponseException e) {
- // lets unbind gracefully
- unbindAndClose();
- } catch (IOException e) {
- close();
- }
- }
- logger.info("EnquireLinkSender stop");
- }
-
- /**
- * This method will send enquire link asynchronously.
- */
- public void enquireLink() {
- if (sendingEnquireLink.compareAndSet(false, true)) {
- logger.debug("Sending enquire link notify");
- synchronized (sendingEnquireLink) {
- sendingEnquireLink.notify();
- }
- } else {
- logger.debug("Not sending enquire link notify");
- }
- }
- }
+
/**
* Session state listener for internal class use.
@@ -655,7 +617,7 @@ public void enquireLink() {
*/
private class BoundSessionStateListener implements SessionStateListener {
public void onStateChange(SessionState newState, SessionState oldState,
- Object source) {
+ Session source) {
/*
* We need to set SO_TIMEOUT to sessionTimer so when timeout occur,
* a SocketTimeoutException will be raised. When Exception raised we
@@ -667,6 +629,10 @@ public void onStateChange(SessionState newState, SessionState oldState,
} catch (IOException e) {
logger.error("Failed setting so_timeout for session timer", e);
}
+
+ logger.info("Changing processor degree to {}", getPduProcessorDegree());
+ ((ThreadPoolExecutor)pduReaderWorker.executorService).setCorePoolSize(getPduProcessorDegree());
+ ((ThreadPoolExecutor)pduReaderWorker.executorService).setMaximumPoolSize(getPduProcessorDegree());
}
}
}
View
2  jsmpp/src/main/java/org/jsmpp/session/SessionStateListener.java
@@ -34,5 +34,5 @@
* @param source is source of changed state.
*/
public void onStateChange(SessionState newState, SessionState oldState,
- Object source);
+ Session source);
}
View
2  jsmpp/src/test/java/org/jsmpp/bean/StringValidationTest.java
@@ -63,7 +63,7 @@ public void testValidation() {
try {
StringValidator.validateString("smsgwsmsgwsmsgwee", StringParameter.SYSTEM_ID);
- fail("Should be fail inserting 16 char of string");
+ fail("Should be fail inserting 17 char of string");
} catch (PDUStringException e) {
}
}
Something went wrong with that request. Please try again.