Permalink
Browse files

Polish

  • Loading branch information...
Phillip Webb
Phillip Webb committed Apr 3, 2012
1 parent 52925ac commit 7da1d08c7d05b06ae03507daca2577b1b846cc35
@@ -16,32 +16,32 @@
/**
* {@link TimeoutProtectionStrategy} that works by hot-swapping the original request with the subsequent poll request.
- * Requests that take longer than the {@link #setPollThreshold(long) poll threshold} to respond will be protected. The
- * poll threshold should therefore be set to a value slightly lower than the expected gateway timeout. NOTE: once the
- * poll threshold has been reached the original response will block until a poll occurs. This ensures that none of the
- * response data is lost but can cause a response to take slightly longer to respond than would otherwise be the case.
- * The {@link #setFailTimeout(long)} method should be used to the timeout that will protect against requests that never
- * receive a poll (for example due to network failure). The {@link #setPollTimeout(long)} method can be used to set the
+ * Requests that take longer than the {@link #setThreshold(long) threshold} to respond will be protected. The threshold
+ * should therefore be set to a value slightly lower than the expected gateway timeout. NOTE: once the threshold has
+ * been reached the original response will block until a poll occurs. This ensures that none of the response data is
+ * lost but can cause a request to take slightly longer to respond than would otherwise be the case. The
+ * {@link #setFailTimeout(long)} method should be used to the timeout that will protect against requests that never
+ * receive a poll (for example due to network failure). The {@link #setLongPollTime(long)} method can be used to set the
* long-poll time for the poll request. This value should obviously be less than the gateway timeout.
*
* @author Phillip Webb
*/
public class HotSwappingTimeoutProtectionStrategy implements TimeoutProtectionStrategy {
- private long pollTimeout = TimeUnit.SECONDS.toMillis(6);
+ private long threshold = TimeUnit.SECONDS.toMillis(14);
- private long pollThreshold = TimeUnit.SECONDS.toMillis(14);
+ private long longPollTime = TimeUnit.SECONDS.toMillis(6);
private long failTimeout = TimeUnit.SECONDS.toMillis(30);
private RequestCoordinators requestCoordinators = new RequestCoordinators();
- public HttpServletResponseMonitorFactory getMonitorFactory(final TimeoutProtectionHttpRequest request) {
+ public HttpServletResponseMonitorFactory handleRequest(final TimeoutProtectionHttpRequest request) {
final long startTime = System.currentTimeMillis();
return new HttpServletResponseMonitorFactory<HttpServletResponseMonitor>() {
public HttpServletResponseMonitor getMonitor() {
- if ((HotSwappingTimeoutProtectionStrategy.this.pollThreshold != 0)
- && (System.currentTimeMillis() - startTime < HotSwappingTimeoutProtectionStrategy.this.pollThreshold)) {
+ if ((HotSwappingTimeoutProtectionStrategy.this.threshold != 0)
+ && (System.currentTimeMillis() - startTime < HotSwappingTimeoutProtectionStrategy.this.threshold)) {
return null;
}
RequestCoordinator requestCoordinator = HotSwappingTimeoutProtectionStrategy.this.requestCoordinators
@@ -64,9 +64,9 @@ public HttpServletResponseMonitor getMonitor() {
};
}
- public void cleanup(TimeoutProtectionHttpRequest request, HttpServletResponseMonitorFactory monitorFactory) {
+ public void afterRequest(TimeoutProtectionHttpRequest request, HttpServletResponseMonitorFactory monitorFactory) {
RequestCoordinator requestCoordinator = this.requestCoordinators.get(request);
- requestCoordinator.cleanup();
+ requestCoordinator.finish();
synchronized (requestCoordinator) {
if (!requestCoordinator.isPollResponseConsumed()) {
this.requestCoordinators.delete(request);
@@ -80,13 +80,13 @@ public void handlePoll(TimeoutProtectionHttpRequest request, HttpServletResponse
requestCoordinator.setPollResponse(response);
}
try {
- requestCoordinator.awaitPollReponseConsumed(this.pollTimeout);
+ requestCoordinator.awaitPollReponseConsumed(this.longPollTime);
} catch (InterruptedException e) {
}
synchronized (requestCoordinator) {
if (requestCoordinator.isPollResponseConsumed()) {
try {
- requestCoordinator.awaitCleanup(this.failTimeout);
+ requestCoordinator.awaitFinish(this.failTimeout);
} catch (InterruptedException e) {
throw new IllegalStateException("Timeout waiting for cleanup");
} finally {
@@ -100,20 +100,32 @@ public void handlePoll(TimeoutProtectionHttpRequest request, HttpServletResponse
}
}
- public void setFailTimeout(long failTimeout) {
- this.failTimeout = failTimeout;
+ protected void setRequestCoordinators(RequestCoordinators requestCoordinators) {
+ this.requestCoordinators = requestCoordinators;
}
- public void setPollThreshold(long pollThreshold) {
- this.pollThreshold = pollThreshold;
+ /**
+ * Set the threshold that must be passed before timeout protection will be used
+ * @param threshold the threshold in milliseconds
+ */
+ public void setThreshold(long threshold) {
+ this.threshold = threshold;
}
- public void setPollTimeout(long pollTimeout) {
- this.pollTimeout = pollTimeout;
+ /**
+ * Set the maximum amount of time that a single long poll request can take.
+ * @param longPollTime the long poll time in milliseconds
+ */
+ public void setLongPollTime(long longPollTime) {
+ this.longPollTime = longPollTime;
}
- protected void setRequestCoordinators(RequestCoordinators requestCoordinators) {
- this.requestCoordinators = requestCoordinators;
+ /**
+ * Set the amount of time before a request is considered failed.
+ * @param failTimeout
+ */
+ public void setFailTimeout(long failTimeout) {
+ this.failTimeout = failTimeout;
}
/**
@@ -139,7 +151,7 @@ public synchronized void delete(TimeoutProtectionHttpRequest request) {
}
protected static enum CoordinatedEvent {
- POLL_RESPONSE, POLL_RESPONSE_CONSUMED, CLEANUP
+ POLL_RESPONSE, POLL_RESPONSE_CONSUMED, FINISH
};
/**
@@ -180,8 +192,8 @@ public HttpServletResponse consumePollResponse() {
return this.pollResponse;
}
- public void cleanup() {
- signal(CoordinatedEvent.CLEANUP);
+ public void finish() {
+ signal(CoordinatedEvent.FINISH);
}
public boolean isPollResponseConsumed() {
@@ -196,8 +208,8 @@ public void awaitPollReponseConsumed(long timeout) throws InterruptedException {
await(CoordinatedEvent.POLL_RESPONSE_CONSUMED, timeout);
}
- public void awaitCleanup(long timeout) throws InterruptedException {
- await(CoordinatedEvent.CLEANUP, timeout);
+ public void awaitFinish(long timeout) throws InterruptedException {
+ await(CoordinatedEvent.FINISH, timeout);
}
private void signal(CoordinatedEvent event) {
@@ -1,7 +1,150 @@
package org.springsource.pwebb.spike.cloudfoundry.timeout;
-public class ReplayingTimeoutProtectionStrategy {
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
- // FIXME implement TimeoutProtector that uses the ReplayableHttpServletResponseMonitor to record responses in memory
+import javax.servlet.http.HttpServletResponse;
+import org.springframework.http.HttpStatus;
+import org.springframework.util.Assert;
+import org.springsource.pwebb.spike.cloudfoundry.timeout.monitor.HttpServletResponseMonitor;
+import org.springsource.pwebb.spike.cloudfoundry.timeout.monitor.HttpServletResponseMonitorFactory;
+import org.springsource.pwebb.spike.cloudfoundry.timeout.monitor.ReplayableHttpServletResponseMonitor;
+import org.springsource.pwebb.spike.cloudfoundry.timeout.monitor.ReplayableHttpServletResponseMonitorFactory;
+
+/**
+ * {@link TimeoutProtectionStrategy} that works by recording the original request such that it can be replayed to a
+ * subsequent poll request. Requests that take longer than the {@link #setThreshold(long) threshold} to respond will be
+ * recorded. The threshold should therefore be set to a value slightly lower than the expected gateway timeout. The
+ * {@link #setFailTimeout(long)} method should be used to the timeout that will protect against requests that never
+ * receive a poll (for example due to network failure). The {@link #setLongPollTime(long)} method can be used to set the
+ * long-poll time for the poll request. This value should obviously be less than the gateway timeout.
+ * <p>
+ * This strategy consumes more memory than {@link HotSwappingTimeoutProtectionStrategy} but does not require that
+ * timeouts only occur.
+ *
+ * @author Phillip Webb
+ */
+public class ReplayingTimeoutProtectionStrategy implements TimeoutProtectionStrategy {
+
+ private long threshold = TimeUnit.SECONDS.toMillis(14);
+
+ private long longPollTime = TimeUnit.SECONDS.toMillis(6);
+
+ private long failTimeout = TimeUnit.SECONDS.toMillis(30);
+
+ private Map<String, MonitorFactory> completedRequests = new HashMap<String, MonitorFactory>();
+
+ public HttpServletResponseMonitorFactory handleRequest(final TimeoutProtectionHttpRequest request) {
+ return new MonitorFactory();
+ }
+
+ public void afterRequest(TimeoutProtectionHttpRequest request, HttpServletResponseMonitorFactory monitorFactory) {
+ afterRequest(request, (MonitorFactory) monitorFactory);
+ }
+
+ private void afterRequest(TimeoutProtectionHttpRequest request, MonitorFactory monitorFactory) {
+ if (monitorFactory.wasMonitored()) {
+ synchronized (this.completedRequests) {
+ purgeUnpolledRequests();
+ this.completedRequests.put(request.getUid(), monitorFactory);
+ this.completedRequests.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Cleanup any started monitors that may have never received a poll. This can happen if the client is closed after a
+ * timeout but before a poll.
+ */
+ private void purgeUnpolledRequests() {
+ Iterator<Map.Entry<String, MonitorFactory>> iterator = this.completedRequests.entrySet().iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getValue().isPurgable(this.threshold + this.failTimeout)) {
+ iterator.remove();
+ }
+ }
+ }
+
+ public void handlePoll(TimeoutProtectionHttpRequest request, HttpServletResponse response) throws IOException {
+ String uid = request.getUid();
+ long startTime = System.currentTimeMillis();
+ do {
+ if (!this.completedRequests.containsKey(uid)) {
+ try {
+ this.completedRequests.wait(this.longPollTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ MonitorFactory completedRequest = this.completedRequests.remove(uid);
+ if (completedRequest != null) {
+ completedRequest.replay(response);
+ return;
+ }
+ } while (System.currentTimeMillis() - startTime < this.longPollTime);
+ response.setHeader(TimeoutProtectionHttpHeader.POLL, uid);
+ response.setStatus(HttpStatus.NO_CONTENT.value());
+ }
+
+ /**
+ * Set the threshold that must be passed before timeout protection will be used
+ * @param threshold the threshold in milliseconds
+ */
+ public void setThreshold(long threshold) {
+ this.threshold = threshold;
+ }
+
+ /**
+ * Set the maximum amount of time that a single long poll request can take.
+ * @param longPollTime the long poll time in milliseconds
+ */
+ public void setLongPollTime(long longPollTime) {
+ this.longPollTime = longPollTime;
+ }
+
+ /**
+ * Set the amount of time before a request is considered failed.
+ * @param failTimeout
+ */
+ public void setFailTimeout(long failTimeout) {
+ this.failTimeout = failTimeout;
+ }
+
+ /**
+ * The {@link HttpServletResponseMonitorFactory} used internally.
+ */
+ private class MonitorFactory implements HttpServletResponseMonitorFactory {
+
+ private long startTime;
+
+ private ReplayableHttpServletResponseMonitor monitor;
+
+ public MonitorFactory() {
+ this.startTime = System.currentTimeMillis();
+ }
+
+ public boolean wasMonitored() {
+ return this.monitor != null;
+ }
+
+ public void replay(HttpServletResponse response) throws IOException {
+ Assert.state(wasMonitored(), "Request was not monitored, no poll expected");
+ this.monitor.getReplayableResponse().replay(response);
+ }
+
+ public HttpServletResponseMonitor getMonitor() {
+ long pollThreshold = ReplayingTimeoutProtectionStrategy.this.threshold;
+ if ((pollThreshold == 0) || (System.currentTimeMillis() - this.startTime >= pollThreshold)) {
+ this.monitor = new ReplayableHttpServletResponseMonitorFactory().getMonitor();
+ }
+ return this.monitor;
+ }
+
+ public boolean isPurgable(long timeout) {
+ return (System.currentTimeMillis() - this.startTime > timeout);
+ }
+ }
}
@@ -56,13 +56,13 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
private void doFilter(TimeoutProtectionHttpRequest request, HttpServletResponse response, FilterChain chain)
throws IOException, ServletException {
- HttpServletResponseMonitorFactory monitor = this.strategy.getMonitorFactory(request);
+ HttpServletResponseMonitorFactory monitor = this.strategy.handleRequest(request);
try {
MonitoredHttpServletResponseWrapper monitoredHttpResponse = new MonitoredHttpServletResponseWrapper(
response, monitor);
chain.doFilter(request.getServletRequest(), monitoredHttpResponse);
} finally {
- this.strategy.cleanup(request, monitor);
+ this.strategy.afterRequest(request, monitor);
}
}
@@ -19,15 +19,15 @@
* @param request the request to protect
* @return a {@link HttpServletResponseMonitorFactory} that should be used with the filtered
*/
- HttpServletResponseMonitorFactory getMonitorFactory(TimeoutProtectionHttpRequest request);
+ HttpServletResponseMonitorFactory handleRequest(TimeoutProtectionHttpRequest request);
/**
* Called after the initial response has been written in order to perform any cleanup. This method will be called
* regardless of any exceptions.
* @param request the initial request
- * @param monitorFactory the monitor returned from {@link #getMonitorFactory(TimeoutProtectionHttpRequest)}
+ * @param monitorFactory the monitor returned from {@link #handleRequest(TimeoutProtectionHttpRequest)}
*/
- void cleanup(TimeoutProtectionHttpRequest request, HttpServletResponseMonitorFactory monitorFactory);
+ void afterRequest(TimeoutProtectionHttpRequest request, HttpServletResponseMonitorFactory monitorFactory);
/**
* Handle any poll requests from the client.
Oops, something went wrong.

0 comments on commit 7da1d08

Please sign in to comment.