Permalink
Browse files

AJAX version-based deduplication.

The meat of the deal. Based on the AJAX version appended to the request
GUID, we determine whether we've already seen this request. If so, we
wait for the original request to complete before returning the resulting
value.  If we already completed the request before, we return the same
answer without re-running the associated parameters. AJAX requests that
need to wait are put into continuations if available.
  • Loading branch information...
1 parent 0f7b081 commit ba0b3130949459fbcb3dc07d89e23106c046aa03 @Shadowfiend Shadowfiend committed Aug 18, 2012
@@ -416,48 +416,146 @@ class LiftServlet extends Loggable {
toReturn
}
- private def extractVersion[T](path: List[String])(f: => T): T = {
+ private object AjaxVersions {
+ def unapply(ajaxPathPart: String) : Option[(String,Int)] = {
+ val funcLength = Helpers.nextFuncName.length
+ if (ajaxPathPart.length > funcLength)
+ Some(
+ (ajaxPathPart.substring(0, funcLength),
+ ajaxPathPart.charAt(funcLength))
+ )
+ else
+ None
+ }
+ }
+ /**
+ * Extracts two versions from a given AJAX path:
+ * - The RenderVersion, which is used for GC purposes.
+ * - The requestVersion, which lets us determine if this is
+ * a request we've already dealt with or are currently dealing
+ * with (so we don't rerun the associated handler). See
+ * handleVersionedAjax for more.
+ *
+ * The requestVersion is passed to the function that is passed in.
+ */
+ private def extractVersions[T](path: List[String])(f: (Box[Int]) => T): T = {
path match {
- case first :: second :: _ => RenderVersion.doWith(second)(f)
- case _ => f
+ case first :: AjaxVersions(renderVersion, requestVersion) :: _ =>
+ RenderVersion.doWith(renderVersion)(f(Full(requestVersion)))
+ case _ => f(Empty)
}
}
- private def handleAjax(liftSession: LiftSession,
- requestState: Req): Box[LiftResponse] = {
- extractVersion(requestState.path.partPath) {
+ /**
+ * An actor that manages AJAX continuations from container (Jetty style).
+ */
+ class AjaxContinuationActor(request: Req, session: LiftSession,
+ onBreakout: Box[LiftResponse] => Unit) extends LiftActor {
+ private var response: Box[LiftResponse] = Empty
+ private var done = false
- LiftRules.cometLogger.debug("AJAX Request: " + liftSession.uniqueId + " " + requestState.params)
- tryo {
- LiftSession.onBeginServicing.foreach(_(liftSession, requestState))
- }
+ def messageHandler = {
+ case AjaxResponseComplete(completeResponse) =>
+ response = completeResponse
+ LAPinger.schedule(this, BreakOut(), 5 millis)
- val ret = try {
- requestState.param("__lift__GC") match {
- case Full(_) =>
- liftSession.updateFuncByOwner(RenderVersion.get, millis)
- Full(JavaScriptResponse(js.JsCmds.Noop))
+ case BreakOut() if ! done =>
+ done = true
+ session.exitComet(this)
+ onBreakout(response)
- case _ =>
- try {
- val what = flatten(try {
- liftSession.runParams(requestState)
- } catch {
- case ResponseShortcutException(_, Full(to), _) =>
- import js.JsCmds._
- List(RedirectTo(to))
- })
+ case _ =>
+ }
+
+ override def toString = "AJAX Continuation Actor"
+ }
+
+ private case class AjaxResponseComplete(response: Box[LiftResponse])
+
+ /**
+ * existingResponse is Empty if we have no response for this request
+ * yet. pendingActors is a list of actors who want to be notified when
+ * this response is received.
+ */
+ private case class AjaxRequestInfo(requestVersion:Int, existingResponse:Box[Box[LiftResponse]], pendingActors:List[LiftActor])
+
+ private lazy val ajaxPostTimeout: Long = LiftRules.ajaxPostTimeout * 1000L
+ private val currentAjaxRequests = scala.collection.mutable.Map[String,AjaxRequestInfo]()
- val what2 = what.flatMap {
+ /**
+ * Runs the actual AJAX processing. This includes handling __lift__GC,
+ * or running the parameters in the session. onComplete is run when the
+ * AJAX request has completed with a response that is meant for the
+ * user. In cases where the request is taking too long to respond,
+ * an LAFuture may be used to delay the real response (and thus the
+ * invocation of onComplete) while this function returns an empty
+ * response.
+ */
+ private def runAjax(requestState: Req,
+ liftSession: LiftSession,
+ onCompleteFn: Box[(LiftResponse)=>Unit]): Box[LiftResponse] = {
+ // We don't want to call the onComplete function if we're returning
+ // an error response, as if there is a comet timeout while
+ // processing. This is because the onComplete function is meant to
+ // indicate a successful completion, and will short-circuit any
+ // other AJAX requests with the same version with the same
+ // response. Error responses are specific to the AJAX request,
+ // rather than to the version of the AJAX request. Successful
+ // responses are for all AJAX requests with the same version.
+ var callCompleteFn = true
+
+ val ret = try {
+ requestState.param("__lift__GC") match {
+ case Full(_) =>
+ val renderVersion = RenderVersion.get
+ liftSession.updateFuncByOwner(renderVersion, millis)
+
+ // FIXME We need to clean up currentAjaxRequests entries when
+ // FIXME the page expires.
+
+ Full(JavaScriptResponse(js.JsCmds.Noop))
+
+ case _ =>
+ try {
+ val what = flatten(try {
+ liftSession.runParams(requestState)
+ } catch {
+ case ResponseShortcutException(_, Full(to), _) =>
+ import js.JsCmds._
+ List(RedirectTo(to))
+ })
+
+ def processResponse(response: List[Any]): LiftResponse = {
+ val what2 = response.flatMap {
case js: JsCmd => List(js)
case jv: JValue => List(jv)
case n: NodeSeq => List(n)
case js: JsCommands => List(js)
case r: LiftResponse => List(r)
+
+ // This strange case comes from ajax requests run in
+ // a comet context that don't complete in the
+ // cometTimeout time frame. The js is the result of the
+ // comet processing timeout handler, while the future
+ // gives us access to the eventual result of the ajax
+ // request.
+ case (js, future: LAFuture[List[Any]]) if onCompleteFn.isDefined =>
+ // Wait for the future on a separate thread.
+ Schedule.schedule(() => {
+ val result = flatten(future.get)
+ onCompleteFn.foreach(_(processResponse(result)))
+ }, 0 seconds)
+
+ // But this request is done for. Return the comet
+ // processing timeout result, but don't mark the
+ // request complete; that happens whenever we satisfy
+ // the future above.
+ callCompleteFn = false
+ List(js)
case s => Nil
}
- val ret: LiftResponse = what2 match {
+ what2 match {
case (json: JsObj) :: Nil => JsonResponse(json)
case (jv: JValue) :: Nil => JsonResponse(jv)
case (js: JsCmd) :: xs => {
@@ -474,21 +572,174 @@ class LiftServlet extends Loggable {
case (r: LiftResponse) :: _ => r
case _ => JsCommands(S.noticesToJsCmd :: JsCmds.Noop :: S.jsToAppend).toResponse
}
+ }
- LiftRules.cometLogger.debug("AJAX Response: " + liftSession.uniqueId + " " + ret)
+ val ret: LiftResponse = processResponse(what)
- Full(ret)
- } finally {
- if (S.functionMap.size > 0) {
- liftSession.updateFunctionMap(S.functionMap, RenderVersion.get, millis)
- S.clearFunctionMap
+ LiftRules.cometLogger.debug("AJAX Response: " + liftSession.uniqueId + " " + ret)
+
+ Full(ret)
+ } finally {
+ if (S.functionMap.size > 0) {
+ liftSession.updateFunctionMap(S.functionMap, RenderVersion.get, millis)
+ S.clearFunctionMap
+ }
+ }
+ }
+ } catch {
+ case foc: LiftFlowOfControlException => throw foc
+ case e => S.assertExceptionThrown() ; NamedPF.applyBox((Props.mode, requestState, e), LiftRules.exceptionHandler.toList);
+ }
+
+ for {
+ response <- ret if callCompleteFn
+ onComplete <- onCompleteFn
+ } {
+ onComplete(response)
+ }
+
+ ret
+ }
+
+ /**
+ * Handles the details of versioned AJAX requests. If the version is
+ * Empty, runs the request through a normal AJAX flow with no
+ * continuations or special handling. Repeated calls will cause
+ * repeated evaluation.
+ *
+ * If the version is Full, the request is tracked. Subsequent requests
+ * for the same version will suspend until a response is available,
+ * then they will return the response.
+ *
+ * cont is the AjaxContinuationActor for this request, and
+ * suspendRequest is the function that will suspend the request.
+ * These are present to support non-continuation containers; these
+ * will present a no-op to suspendRequest.
+ */
+ private def handleVersionedAjax(handlerVersion: Box[Int],
+ cont: AjaxContinuationActor,
+ suspendRequest: () => Any,
+ requestState: Req,
+ liftSession: LiftSession): Box[LiftResponse] = {
+ handlerVersion match {
+ case Full(handlerVersion) =>
+ val renderVersion = RenderVersion.get
+
+ val toReturn: Box[Box[LiftResponse]] =
@fmpwizard

fmpwizard Aug 20, 2012

Owner

Why do you use Box[Box[LiftResponse]] instead of just Box[LiftResponse] ?

Could we run into a situation where we would get a Full(Failure(x,y,z)) ? and then think that the response is still fine?

@Shadowfiend

Shadowfiend Aug 20, 2012

Owner

So, the ajax runner returns a Box[LiftResponse] as it did before these changes. An Empty or Failure at the top level of the Box[Box[]] means we haven't calculated a response and therefore it should be calculated. A Full() means the runAjax function calculated a response, and that is what the is. The response won't be fine, it will just already be precomputed as a Failure. Does that make sense?

@fmpwizard

fmpwizard Aug 20, 2012

Owner

It does, thanks for the clarification, would you mind adding it as a comment?

@Shadowfiend

Shadowfiend Aug 20, 2012

Owner

Sure. Yeah I inlined the relevant comment in the returns under the collect. But I think I'll centralize it up top.

+ currentAjaxRequests.synchronized {
+ currentAjaxRequests.get(renderVersion).collect {
+ case AjaxRequestInfo(storedVersion, _, pendingActors) if handlerVersion != storedVersion =>
+ // Break out of any actors for the stale version.
+ pendingActors.foreach(_ ! BreakOut())
+
+ // Evict the older version's info.
+ currentAjaxRequests +=
+ (renderVersion ->
+ AjaxRequestInfo(handlerVersion, Empty, cont :: Nil))
+
+ suspendRequest()
+
+ Empty // no response available, triggers the actual AJAX computation below
+
+ case AjaxRequestInfo(storedVersion, existingResponseBox @ Full(_), _) =>
+ existingResponseBox // return the Full response Box
+
+ case AjaxRequestInfo(storedVersion, _, pendingActors) =>
+ currentAjaxRequests +=
+ (renderVersion ->
+ AjaxRequestInfo(handlerVersion, Empty, cont :: pendingActors))
+
+ suspendRequest()
+
+ Full(Full(EmptyResponse))
+ } openOr {
+ currentAjaxRequests +=
+ (renderVersion ->
+ AjaxRequestInfo(handlerVersion, Empty, cont :: Nil))
+
+ suspendRequest()
+
+ Empty // no response available, triggers the actual AJAX computation below
+ }
+ }
+
+ toReturn or {
+ Full(runAjax(requestState, liftSession, Full((result: LiftResponse) => {
+ // When we get the response, synchronizedly check that the
+ // versions are still the same in the map, and, if so, update
+ // any waiting actors then clear the actor list and update the
+ // request info to include the response in case any other
+ // requests come in with this version.
+ currentAjaxRequests.synchronized {
+ currentAjaxRequests.get(renderVersion).collect {
+ case AjaxRequestInfo(storedVersion, _, pendingActors) if storedVersion == handlerVersion =>
+ pendingActors.foreach(_ ! AjaxResponseComplete(Full(result)))
+ currentAjaxRequests +=
+ (renderVersion ->
+ AjaxRequestInfo(handlerVersion, Full(Full(result)), Nil))
}
}
+ })))
+ } openOr Empty
+
+ case _ =>
+ runAjax(requestState, liftSession, Empty)
+ }
+ }
+
+ /**
+ * Kick off AJAX handling. Extracts relevant versions and handles the
+ * begin/end servicing requests, as well as generation of
+ * ContinuationActors and choosing between continuation and
+ * continuationless request handling.
+ */
+ private def handleAjax(liftSession: LiftSession,
+ requestState: Req): Box[LiftResponse] = {
+ extractVersions(requestState.path.partPath) { handlerVersion =>
+ LiftRules.cometLogger.debug("AJAX Request: " + liftSession.uniqueId + " " + requestState.params)
+ tryo {
+ LiftSession.onBeginServicing.foreach(_(liftSession, requestState))
+ }
+
+ def suspendingActor = {
+ new AjaxContinuationActor(requestState, liftSession,
+ response => {
+ requestState.request.resume((requestState, S.init(requestState, liftSession)
+ (response.map(LiftRules.performTransform) openOr EmptyResponse)))})
+ }
+
+ def waitingActorForFuture(future: LAFuture[Box[LiftResponse]]) = {
+ new AjaxContinuationActor(requestState, liftSession,
+ response => future.satisfy(response))
+ }
+
+ val possibleFuture =
+ if (requestState.request.suspendResumeSupport_?)
+ Empty
+ else
+ Full(new LAFuture[Box[LiftResponse]])
+ val (cont, suspendRequest) =
+ possibleFuture.map { f =>
+ (waitingActorForFuture(f), () => ())
+ } openOr {
+ (suspendingActor, () => requestState.request.suspend(ajaxPostTimeout + 500L))
}
- } catch {
- case foc: LiftFlowOfControlException => throw foc
- case e => S.assertExceptionThrown() ; NamedPF.applyBox((Props.mode, requestState, e), LiftRules.exceptionHandler.toList);
+
+ val ret: Box[LiftResponse] = try {
+ liftSession.enterComet(cont -> requestState)
+
+ val result: Box[LiftResponse] =
+ handleVersionedAjax(handlerVersion, cont, suspendRequest, requestState, liftSession)
+
+ possibleFuture.map(_.get(ajaxPostTimeout) match {
+ case Full(response) => response
+ case _ => Empty
+ }) openOr result
+ } finally {
+ if (! requestState.request.suspendResumeSupport_?)
+ liftSession.exitComet(cont)
}
+
tryo {
LiftSession.onEndServicing.foreach(_(liftSession, requestState, ret))
}
@@ -675,7 +675,6 @@ class LiftSession(private[http] val _contextPath: String, val uniqueId: String,
* Executes the user's functions based on the query parameters
*/
def runParams(state: Req): List[Any] = {
-
val toRun = {
// get all the commands, sorted by owner,
(state.uploadedFiles.map(_.name) ::: state.paramNames).distinct.
@@ -708,13 +707,28 @@ class LiftSession(private[http] val _contextPath: String, val uniqueId: String,
val f = toRun.filter(_.owner == w)
w match {
// if it's going to a CometActor, batch up the commands
- case Full(id) if asyncById.contains(id) => asyncById.get(id).toList.flatMap(a =>
- a.!?(a.cometProcessingTimeout, ActionMessageSet(f.map(i => buildFunc(i)), state)) match {
+ case Full(id) if asyncById.contains(id) => asyncById.get(id).toList.flatMap(a => {
+ val future =
+ a.!<(ActionMessageSet(f.map(i => buildFunc(i)), state))
+
+ def processResult(result: Any): List[Any] = result match {
case Full(li: List[_]) => li
case li: List[_] => li
- case Empty => Full(a.cometProcessingTimeoutHandler())
+ // We return the future so it can, from AJAX requests, be
+ // satisfied and update the pending ajax request map.
+ case Empty =>
+ val processingFuture = new LAFuture[Any]
+ // Wait for and process the future on a separate thread.
+ Schedule.schedule(() => {
+ processingFuture.satisfy(processResult(future.get))
+ }, 0 seconds)
+ List((a.cometProcessingTimeoutHandler, processingFuture))
case other => Nil
- })
+ }
+
+ processResult(future.get(a.cometProcessingTimeout))
+ })
+
case _ => f.map(i => buildFunc(i).apply())
}
}

0 comments on commit ba0b313

Please sign in to comment.