Browse files

Added counting of avg window size and estimated processing time

  • Loading branch information...
1 parent c49bf5d commit abf198960f5deb96da6f9b503078bcde556fd909 @jjlauer jjlauer committed May 8, 2011
View
23 src/main/java/com/cloudhopper/smpp/PduAsyncResponse.java
@@ -20,7 +20,7 @@
/**
* A container to hold an asynchronous response and include information tracked
* internally by an SmppSession. For example, an instance of this class will
- * contain the original request, the response, and the processing time.
+ * contain the original request, the response, and a few timestamps.
*
* @author joelauer (twitter: @jjlauer or <a href="http://twitter.com/jjlauer" target=window>http://twitter.com/jjlauer</a>)
*/
@@ -39,7 +39,7 @@
public PduResponse getResponse();
/**
- * Gets the amount of time required to accept the request into the session's
+ * Gets the amount of time required to accept the request into the session
* send window (for a free slot to open up).
* @return The amount of time (in ms) to accept the request into the send window
*/
@@ -51,8 +51,27 @@
* went out on the wire till a response was received on the wire. Does not
* include any time required waiting for a slot in the window to become
* available.
+ * <br><br>
+ * NOTE: If the window size is > 1, this value can be somewhat misleading.
+ * The remote endpoint would process X number of requests ahead of this one
+ * that went out ahead of it in the window. This does represent the total
+ * response time, but doesn't mean the remote endpoint is this slow at processing
+ * one request. In cases of high load where the window is always full, the
+ * windowWaitTime actually represents how fast the remote endpoint is processing
+ * requests.
* @return The amount of time (in ms) to receive a response from remote endpoint
*/
public long getResponseTime();
+ /**
+ * Gets an estimate of the processing time required by the remote endpoint
+ * to process this request. The value is calculated with the following
+ * formula: "response time" divided by the "window size" at the time of the
+ * request.
+ * @return The amount of estimated time (in ms) to receive a response from
+ * the remote endpoint just for this request (as opposed to potentially
+ * this request and all requests ahead of it in the window).
+ */
+ public long getEstimatedProcessingTime();
+
}
View
27 src/main/java/com/cloudhopper/smpp/SmppSession.java
@@ -203,11 +203,11 @@
* Attempts to "unbind" the session, waiting up to a specified period of
* milliseconds for an unbind response from the remote endpoint. Regardless of whether
* a proper unbind response was received, the socket/channel is closed.
- * @param timeoutInMillis The number of milliseconds to wait until an unbind
+ * @param timeoutMillis The number of milliseconds to wait until an unbind
* response is received from the SMSC.
* @see #close()
*/
- public void unbind(long timeoutInMillis);
+ public void unbind(long timeoutMillis);
/**
* Destroy a session by ensuring the socket is closed and all
@@ -226,7 +226,7 @@
* takes to transmit the actual bytes on the socket, and for the remote
* endpoint to send a response back.
* @param request The request to send to the remote endpoint
- * @param timeoutInMillis The number of milliseconds to wait until a valid
+ * @param timeoutMillis The number of milliseconds to wait until a valid
* response is received.
* @return A valid response to the request
* @throws RecoverablePduException Thrown when a recoverable PDU error occurs.
@@ -243,7 +243,7 @@
* @throws InterruptedException The calling thread was interrupted while waiting
* to acquire a lock or write/read the bytes from the socket/channel.
*/
- public EnquireLinkResp enquireLink(EnquireLink request, long timeoutInMillis) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;
+ public EnquireLinkResp enquireLink(EnquireLink request, long timeoutMillis) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;
/**
* Synchronously sends a "submit" request to the remote endpoint and
@@ -252,7 +252,7 @@
* takes to transmit the actual bytes on the socket, and for the remote
* endpoint to send a response back.
* @param request The request to send to the remote endpoint
- * @param timeoutInMillis The number of milliseconds to wait until a valid
+ * @param timeoutMillis The number of milliseconds to wait until a valid
* response is received.
* @return A valid response to the request
* @throws RecoverablePduException Thrown when a recoverable PDU error occurs.
@@ -269,7 +269,7 @@
* @throws InterruptedException The calling thread was interrupted while waiting
* to acquire a lock or write/read the bytes from the socket/channel.
*/
- public SubmitSmResp submit(SubmitSm request, long timeoutInMillis) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;
+ public SubmitSmResp submit(SubmitSm request, long timeoutMillis) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;
/**
* Main underlying method for sending a request PDU to the remote endpoint.
@@ -284,9 +284,14 @@
* the "fireExpectedPduResponseReceived" method on the session handler.
* Please note that its possible th response PDU really isn't
* the correct PDU we were waiting for, so the caller should verify it.
- * The best example is that a "Generic_Nack" could be returned.
+ * For example it is possible that a "Generic_Nack" could be returned by
+ * the remote endpoint in response to a PDU.
* @param requestPdu The request PDU to send
- * @param timeoutInMillis The length of time to wait for a response PDU
+ * @param timeoutMillis If synchronous is true, this represents the time to
+ * wait for a slot to open in the underlying window AND the time to wait
+ * for a response back from the remote endpoint. If synchronous is false,
+ * this only represents the time to wait for a slot to open in the
+ * underlying window.
* @param synchronous True if the calling thread plans on waiting for a
* response on the returned future. False if the calling thread plans
* on discarding the returned future and expects the response PDU to
@@ -307,7 +312,7 @@
* @throws InterruptedException The calling thread was interrupted while waiting
* to acquire a lock or write/read the bytes from the socket/channel.
*/
- public WindowFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest request, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;
+ public WindowFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest request, long timeoutMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException;
/**
* Main underlying method for sending a response PDU to the remote endpoint.
@@ -316,9 +321,9 @@
* @param response The response PDU to send
* @throws RecoverablePduException Thrown when a recoverable PDU error occurs.
* A recoverable PDU error includes the partially decoded PDU in order
- * to generate a negative acknowledgement (NACK) response.
+ * to generate a negative acknowledgment (NACK) response.
* @throws UnrecoverablePduException Thrown when an unrecoverable PDU error
- * occurs. This indicates a seriours error occurred and usually indicates
+ * occurs. This indicates a serious error occurred and usually indicates
* the session should be immediately terminated.
* @throws SmppChannelException Thrown when the underlying socket/channel was
* unable to write the request.
View
16 src/main/java/com/cloudhopper/smpp/impl/DefaultPduAsyncResponse.java
@@ -52,6 +52,15 @@ public long getWindowWaitTime() {
public long getResponseTime() {
return future.getAcceptToDoneTime();
}
+
+ @Override
+ public long getEstimatedProcessingTime() {
+ long responseTime = getResponseTime();
+ if (responseTime == 0 || future.getWindowSize() == 0) {
+ return 0;
+ }
+ return (responseTime / future.getWindowSize());
+ }
@Override
public String toString() {
@@ -60,13 +69,16 @@ public String toString() {
buf.append(HexUtil.toHexString(this.future.getKey()));
buf.append("] windowWaitTime [");
buf.append(getWindowWaitTime());
- buf.append("] responseTime [");
- buf.append(getWindowWaitTime());
+ buf.append(" ms] responseTime [");
+ buf.append(getResponseTime());
+ buf.append(" ms] estProcessingTime [");
+ buf.append(getEstimatedProcessingTime());
buf.append(" ms] reqType [");
buf.append(getRequest().getName());
buf.append("] respType [");
buf.append(getResponse().getName());
buf.append("]");
return buf.toString();
}
+
}
View
22 src/main/java/com/cloudhopper/smpp/impl/DefaultSmppSession.java
@@ -479,7 +479,7 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
@SuppressWarnings("unchecked")
@Override
- public WindowFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest pdu, long timeoutInMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException {
+ public WindowFuture<Integer,PduRequest,PduResponse> sendRequestPdu(PduRequest pdu, long timeoutMillis, boolean synchronous) throws RecoverablePduException, UnrecoverablePduException, SmppTimeoutException, SmppChannelException, InterruptedException {
// assign the next PDU sequence # if its not yet assigned
if (!pdu.hasSequenceNumberAssigned()) {
pdu.setSequenceNumber(this.sequenceNumber.next());
@@ -490,7 +490,7 @@ protected PduResponse sendRequestAndGetResponse(PduRequest requestPdu, long time
WindowFuture<Integer,PduRequest,PduResponse> future = null;
try {
- future = sendWindow.offer(pdu.getSequenceNumber(), pdu, timeoutInMillis, configuration.getRequestExpiryTimeout(), synchronous);
+ future = sendWindow.offer(pdu.getSequenceNumber(), pdu, timeoutMillis, configuration.getRequestExpiryTimeout(), synchronous);
} catch (DuplicateKeyException e) {
throw new UnrecoverablePduException(e.getMessage(), e);
} catch (OfferTimeoutException e) {
@@ -576,7 +576,7 @@ public void firePduReceived(Pdu pdu) {
if (responsePdu != null) {
try {
long responseTime = System.currentTimeMillis() - startTime;
- this.countSendResponsePdu(responsePdu, responseTime);
+ this.countSendResponsePdu(responsePdu, responseTime, responseTime);
this.sendResponsePdu(responsePdu);
} catch (Exception e) {
@@ -593,7 +593,7 @@ public void firePduReceived(Pdu pdu) {
WindowFuture<Integer,PduRequest,PduResponse> future = this.sendWindow.complete(receivedPduSeqNum, responsePdu);
if (future != null) {
logger.trace("Found a future in the window for seqNum [{}]", receivedPduSeqNum);
- this.countReceiveResponsePdu(responsePdu, future.getOfferToAcceptTime(), future.getAcceptToDoneTime());
+ this.countReceiveResponsePdu(responsePdu, future.getOfferToAcceptTime(), future.getAcceptToDoneTime(), (future.getAcceptToDoneTime() / future.getWindowSize()));
// if this isn't null, we found a match to a request
int callerStateHint = future.getCallerStateHint();
@@ -613,7 +613,7 @@ public void firePduReceived(Pdu pdu) {
this.sessionHandler.fireUnexpectedPduResponseReceived(responsePdu);
}
} else {
- this.countReceiveResponsePdu(responsePdu, 0, 0);
+ this.countReceiveResponsePdu(responsePdu, 0, 0, 0);
// original request either expired OR was completely unexpected
this.sessionHandler.fireUnexpectedPduResponseReceived(responsePdu);
@@ -708,7 +708,7 @@ private void countSendRequestPdu(PduRequest pdu) {
}
}
- private void countSendResponsePdu(PduResponse pdu, long responseTime) {
+ private void countSendResponsePdu(PduResponse pdu, long responseTime, long estimatedProcessingTime) {
if (this.counters == null) {
return; // noop
}
@@ -718,21 +718,25 @@ private void countSendResponsePdu(PduResponse pdu, long responseTime) {
case SmppConstants.CMD_ID_SUBMIT_SM_RESP:
this.counters.getRxSubmitSM().incrementResponseAndGet();
this.counters.getRxSubmitSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getRxSubmitSM().addRequestEstimatedProcessingTimeAndGet(estimatedProcessingTime);
this.counters.getRxSubmitSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
break;
case SmppConstants.CMD_ID_DELIVER_SM_RESP:
this.counters.getRxDeliverSM().incrementResponseAndGet();
this.counters.getRxDeliverSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getRxDeliverSM().addRequestEstimatedProcessingTimeAndGet(estimatedProcessingTime);
this.counters.getRxDeliverSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
break;
case SmppConstants.CMD_ID_DATA_SM_RESP:
this.counters.getRxDataSM().incrementResponseAndGet();
this.counters.getRxDataSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getRxDataSM().addRequestEstimatedProcessingTimeAndGet(estimatedProcessingTime);
this.counters.getRxDataSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
break;
case SmppConstants.CMD_ID_ENQUIRE_LINK_RESP:
this.counters.getRxEnquireLink().incrementResponseAndGet();
this.counters.getRxEnquireLink().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getRxEnquireLink().addRequestEstimatedProcessingTimeAndGet(estimatedProcessingTime);
this.counters.getRxEnquireLink().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
break;
}
@@ -785,7 +789,7 @@ private void countReceiveRequestPdu(PduRequest pdu) {
}
}
- private void countReceiveResponsePdu(PduResponse pdu, long waitTime, long responseTime) {
+ private void countReceiveResponsePdu(PduResponse pdu, long waitTime, long responseTime, long estimatedProcessingTime) {
if (this.counters == null) {
return; // noop
}
@@ -796,24 +800,28 @@ private void countReceiveResponsePdu(PduResponse pdu, long waitTime, long respon
this.counters.getTxSubmitSM().incrementResponseAndGet();
this.counters.getTxSubmitSM().addRequestWaitTimeAndGet(waitTime);
this.counters.getTxSubmitSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getTxSubmitSM().addRequestEstimatedProcessingTimeAndGet(estimatedProcessingTime);
this.counters.getTxSubmitSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
break;
case SmppConstants.CMD_ID_DELIVER_SM_RESP:
this.counters.getTxDeliverSM().incrementResponseAndGet();
this.counters.getTxDeliverSM().addRequestWaitTimeAndGet(waitTime);
this.counters.getTxDeliverSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getTxDeliverSM().addRequestEstimatedProcessingTimeAndGet(estimatedProcessingTime);
this.counters.getTxDeliverSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
break;
case SmppConstants.CMD_ID_DATA_SM_RESP:
this.counters.getTxDataSM().incrementResponseAndGet();
this.counters.getTxDataSM().addRequestWaitTimeAndGet(waitTime);
this.counters.getTxDataSM().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getTxDataSM().addRequestEstimatedProcessingTimeAndGet(estimatedProcessingTime);
this.counters.getTxDataSM().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
break;
case SmppConstants.CMD_ID_ENQUIRE_LINK_RESP:
this.counters.getTxEnquireLink().incrementResponseAndGet();
this.counters.getTxEnquireLink().addRequestWaitTimeAndGet(waitTime);
this.counters.getTxEnquireLink().addRequestResponseTimeAndGet(responseTime);
+ this.counters.getTxEnquireLink().addRequestEstimatedProcessingTimeAndGet(estimatedProcessingTime);
this.counters.getTxEnquireLink().getResponseCommandStatusCounter().incrementAndGet(pdu.getCommandStatus());
break;
}
View
23 src/main/java/com/cloudhopper/smpp/util/ConcurrentCommandCounter.java
@@ -27,6 +27,7 @@
private AtomicInteger requestExpired;
private AtomicLong requestWaitTime;
private AtomicLong requestResponseTime;
+ private AtomicLong requestEstimatedProcessingTime;
private AtomicInteger response;
private ConcurrentCommandStatusCounter responseCommandStatusCounter;
@@ -35,15 +36,17 @@ public ConcurrentCommandCounter() {
this.requestExpired = new AtomicInteger(0);
this.requestWaitTime = new AtomicLong(0);
this.requestResponseTime = new AtomicLong(0);
+ this.requestEstimatedProcessingTime = new AtomicLong(0);
this.response = new AtomicInteger(0);
this.responseCommandStatusCounter = new ConcurrentCommandStatusCounter();
}
- public ConcurrentCommandCounter(int request, int requestExpired, long requestWaitTime, long requestResponseTime, int response, final ConcurrentCommandStatusCounter responseCommandStatusCounter) {
+ public ConcurrentCommandCounter(int request, int requestExpired, long requestWaitTime, long requestResponseTime, long requestEstimatedProcessingTime, int response, final ConcurrentCommandStatusCounter responseCommandStatusCounter) {
this.request = new AtomicInteger(request);
this.requestExpired = new AtomicInteger(requestExpired);
this.requestWaitTime = new AtomicLong(requestWaitTime);
this.requestResponseTime = new AtomicLong(requestResponseTime);
+ this.requestEstimatedProcessingTime = new AtomicLong(requestEstimatedProcessingTime);
this.response = new AtomicInteger(response);
this.responseCommandStatusCounter = responseCommandStatusCounter.copy();
}
@@ -53,12 +56,13 @@ public void reset() {
this.requestExpired.set(0);
this.requestWaitTime.set(0);
this.requestResponseTime.set(0);
+ this.requestEstimatedProcessingTime.set(0);
this.response.set(0);
this.responseCommandStatusCounter.reset();
}
public ConcurrentCommandCounter createSnapshot() {
- return new ConcurrentCommandCounter(request.get(), requestExpired.get(), requestWaitTime.get(), requestResponseTime.get(), response.get(), responseCommandStatusCounter);
+ return new ConcurrentCommandCounter(request.get(), requestExpired.get(), requestWaitTime.get(), requestResponseTime.get(), requestEstimatedProcessingTime.get(), response.get(), responseCommandStatusCounter);
}
public int getRequest() {
@@ -92,6 +96,14 @@ public long getRequestResponseTime() {
public long addRequestResponseTimeAndGet(long responseTime) {
return this.requestResponseTime.addAndGet(responseTime);
}
+
+ public long getRequestEstimatedProcessingTime() {
+ return this.requestEstimatedProcessingTime.get();
+ }
+
+ public long addRequestEstimatedProcessingTimeAndGet(long estimatedProcessingTime) {
+ return this.requestEstimatedProcessingTime.addAndGet(estimatedProcessingTime);
+ }
public int getResponse() {
return this.response.get();
@@ -129,6 +141,13 @@ public String toString() {
}
to.append(DecimalUtil.toString(avgResponseTime, 1));
+ to.append("ms avgEstimatedProcessingTime=");
+ double avgEstimatedProcessingTime = 0;
+ if (getResponse() > 0) {
+ avgEstimatedProcessingTime = (double)getRequestEstimatedProcessingTime()/(double)getResponse();
+ }
+ to.append(DecimalUtil.toString(avgEstimatedProcessingTime, 1));
+
to.append("ms cmdStatus=[");
to.append(this.responseCommandStatusCounter.toString());
to.append("]]");
View
3 src/test/java/com/cloudhopper/smpp/demo/PerformanceClientMain.java
@@ -48,7 +48,7 @@
// total number of sessions (conns) to create
static public final int SESSION_COUNT = 10;
// size of window per session
- static public final int WINDOW_SIZE = 4;
+ static public final int WINDOW_SIZE = 50;
// total number of submit to send total across all sessions
static public final int SUBMIT_TO_SEND = 2000;
// total number of submit sent
@@ -148,6 +148,7 @@ public Thread newThread(Runnable r) {
logger.info("Performance client finished:");
logger.info(" Sessions: " + SESSION_COUNT);
+ logger.info(" Window Size: " + WINDOW_SIZE);
logger.info("Sessions Failed: " + sessionFailures);
logger.info(" Time: " + (stopTimeMillis - startTimeMillis) + " ms");
logger.info(" Target Submit: " + SUBMIT_TO_SEND);
View
6 src/test/java/com/cloudhopper/smpp/demo/ServerMain.java
@@ -70,13 +70,13 @@ public Thread newThread(Runnable r) {
// create a server configuration
SmppServerConfiguration configuration = new SmppServerConfiguration();
configuration.setPort(2776);
- configuration.setMaxConnectionSize(1);
+ configuration.setMaxConnectionSize(10);
configuration.setNonBlockingSocketsEnabled(true);
configuration.setDefaultRequestExpiryTimeout(30000);
- //configuration.setDefaultWindowMonitorInterval(15000);
+ configuration.setDefaultWindowMonitorInterval(15000);
configuration.setDefaultWindowSize(5);
configuration.setDefaultWindowWaitTimeout(configuration.getDefaultRequestExpiryTimeout());
- configuration.setDefaultSessionCountersEnabled(false);
+ configuration.setDefaultSessionCountersEnabled(true);
configuration.setJmxEnabled(true);
// create a server, start it up

0 comments on commit abf1989

Please sign in to comment.