Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Prevent Jetty 7 to invoke a CometProcessor multiple time when resumin…

…g inside an AtmosphereHandler
  • Loading branch information...
commit 9d57c089f9cdbe0682e86556dc168adff077c3b3 1 parent a0b5cc9
@jfarcand authored
View
15 modules/cpr/src/main/java/org/atmosphere/container/Jetty7CometSupport.java
@@ -38,6 +38,8 @@
package org.atmosphere.container;
import org.atmosphere.cpr.AsynchronousProcessor;
+import org.atmosphere.cpr.AtmosphereHandler;
+import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.AtmosphereServlet;
import org.atmosphere.cpr.AtmosphereServlet.Action;
@@ -109,6 +111,19 @@ public Action service(HttpServletRequest req, HttpServletResponse res) throws IO
return action;
}
+ @Override
+ public Action resumed(HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ logger.debug("(resumed) invoked:\n HttpServletRequest: {}\n HttpServletResponse: {}", request, response);
+ AtmosphereResourceImpl r =
+ (AtmosphereResourceImpl)request.getAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE);
+ AtmosphereHandler<HttpServletRequest, HttpServletResponse> atmosphereHandler =
+ (AtmosphereHandler<HttpServletRequest, HttpServletResponse>)
+ request.getAttribute(AtmosphereServlet.ATMOSPHERE_HANDLER);
+ atmosphereHandler.onStateChange(r.getAtmosphereResourceEvent());
+ return new Action(Action.TYPE.RESUME);
+ }
+
/**
* {@inheritDoc}
*/
View
12 modules/cpr/src/main/java/org/atmosphere/container/JettyCometSupport.java
@@ -88,6 +88,18 @@ public Action service(HttpServletRequest req, HttpServletResponse response)
} else {
c.suspend(0);
}
+ } else if (action.type == Action.TYPE.RESUME) {
+ logger.debug("Resuming response: {}", response);
+
+ if (!resumed.remove(c)) {
+ c.reset();
+
+ if (req.getAttribute(AtmosphereServlet.RESUMED_ON_TIMEOUT) == null) {
+ timedout(req, response);
+ } else {
+ resumed(req, response);
+ }
+ }
}
} else {
logger.debug("Resuming response: {}", response);
View
8 modules/cpr/src/main/java/org/atmosphere/cpr/AsynchronousProcessor.java
@@ -190,7 +190,13 @@ Action action(HttpServletRequest req, HttpServletResponse res)
req.setAttribute(AtmosphereServlet.SUPPORT_SESSION, supportSession());
AtmosphereHandlerWrapper handlerWrapper = map(req);
- AtmosphereResourceImpl resource = new AtmosphereResourceImpl(config, handlerWrapper.broadcaster, req, res, this, handlerWrapper.atmosphereHandler);
+ AtmosphereResourceImpl resource = null;
+ if (req.getAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE) == null) {
+ resource = new AtmosphereResourceImpl(config, handlerWrapper.broadcaster, req, res, this, handlerWrapper.atmosphereHandler);
+ } else {
+ resource = (AtmosphereResourceImpl) req.getAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE);
+ }
+
handlerWrapper.broadcaster.getBroadcasterConfig().setAtmosphereConfig(config);
req.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, resource);
View
10 modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java
@@ -171,7 +171,9 @@ public void resume() {
} catch (Exception ex) {
logger.debug("Cannot resume an already resumed/cancelled request");
}
- cometSupport.action(this);
+ if (req.getAttribute(PRE_SUSPEND) == null) {
+ cometSupport.action(this);
+ }
} else {
logger.debug("Cannot resume an already resumed/cancelled request");
}
@@ -217,6 +219,12 @@ public void suspend(long timeout, boolean flushComment) {
"response longer than the session timeout. Increase the value of session-timeout in web.xml");
}
+ if(req.getAttribute(DefaultBroadcaster.CACHED) != null) {
+ // Do nothing because we have found cached message which was written already, and the handler resumed.
+ req.removeAttribute(DefaultBroadcaster.CACHED);
+ return;
+ }
+
if (!event.isResumedOnTimeout()) {
if (req.getHeaders("Connection") != null && req.getHeaders("Connection").hasMoreElements()) {
View
2  modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java
@@ -75,6 +75,7 @@
public class DefaultBroadcaster implements Broadcaster {
private static final Logger logger = LoggerFactory.getLogger(DefaultBroadcaster.class);
+ public static final String CACHED = DefaultBroadcaster.class.getName() + ".messagesCached";
protected final ConcurrentLinkedQueue<AtmosphereResource<?, ?>> resources =
new ConcurrentLinkedQueue<AtmosphereResource<?, ?>>();
@@ -538,6 +539,7 @@ public void run() {
protected void checkCachedAndPush(final AtmosphereResource<?, ?> r, final AtmosphereResourceEvent e) {
retrieveTrackedBroadcast(r, e);
if (e.getMessage() instanceof List && !((List) e.getMessage()).isEmpty()) {
+ HttpServletRequest.class.cast(r.getRequest()).setAttribute(CACHED, "true");
broadcast(r, e);
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.