Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AJAX Request deduplication, take 2 #1328

Merged
merged 8 commits into from Oct 11, 2012
250 changes: 199 additions & 51 deletions web/webkit/src/main/scala/net/liftweb/http/LiftServlet.scala
Expand Up @@ -423,75 +423,223 @@ class LiftServlet extends Loggable {
} }
} }


/**
* Tracks the two aspects of an AJAX version: the sequence number,
* whose sole purpose is to identify requests that are retries for the
* same resource, and pending requests, which indicates how many
* requests are still queued for this particular page version on the
* client. The latter is used to expire result data for sequence
* numbers that are no longer needed.
*/
private case class AjaxVersionInfo(renderVersion:String, sequenceNumber:Long, pendingRequests:Int)
private object AjaxVersions {
def unapply(ajaxPathPart: String) : Option[AjaxVersionInfo] = {
val separator = ajaxPathPart.indexOf("-")
if (separator > -1 && ajaxPathPart.length > separator + 2)
Some(
AjaxVersionInfo(ajaxPathPart.substring(0, separator),
java.lang.Long.parseLong(ajaxPathPart.substring(separator + 1, ajaxPathPart.length - 1), 36),
Integer.parseInt(ajaxPathPart.substring(ajaxPathPart.length - 1), 36))
)
else
None
}
}
/**
* Extracts two versions from a given AJAX path:
* - The RenderVersion, which is used for GC purposes.
* - The requestVersions, which let us determine if this is
* a request we've already dealt with or are currently dealing
* with (so we don't rerun the associated handler). See
* handleVersionedAjax for more.
*
* The requestVersion is passed to the function that is passed in.
*/
private def extractVersions[T](path: List[String])(f: (Box[AjaxVersionInfo]) => T): T = {
path match {
case first :: AjaxVersions(versionInfo @ AjaxVersionInfo(renderVersion, _, _)) :: _ =>
RenderVersion.doWith(renderVersion)(f(Full(versionInfo)))
case _ => f(Empty)
}
}

/**
* Runs the actual AJAX processing. This includes handling __lift__GC,
* or running the parameters in the session. onComplete is run when the
* AJAX request has completed with a response that is meant for the
* user. In cases where the request is taking too long to respond,
* an LAFuture may be used to delay the real response (and thus the
* invocation of onComplete) while this function returns an empty
* response.
*/
private def runAjax(liftSession: LiftSession,
requestState: Req): Box[LiftResponse] = {
try {
requestState.param("__lift__GC") match {
case Full(_) =>
liftSession.updateFuncByOwner(RenderVersion.get, millis)
Full(JavaScriptResponse(js.JsCmds.Noop))

case _ =>
try {
val what = flatten(try {
liftSession.runParams(requestState)
} catch {
case ResponseShortcutException(_, Full(to), _) =>
import js.JsCmds._
List(RedirectTo(to))
})

val what2 = what.flatMap {
case js: JsCmd => List(js)
case jv: JValue => List(jv)
case n: NodeSeq => List(n)
case js: JsCommands => List(js)
case r: LiftResponse => List(r)
case s => Nil
}

val ret: LiftResponse = what2 match {
case (json: JsObj) :: Nil => JsonResponse(json)
case (jv: JValue) :: Nil => JsonResponse(jv)
case (js: JsCmd) :: xs => {
(JsCommands(S.noticesToJsCmd :: Nil) &
((js :: xs).flatMap {
case js: JsCmd => List(js)
case _ => Nil
}.reverse) &
S.jsToAppend).toResponse
}

case (n: Node) :: _ => XmlResponse(n)
case (ns: NodeSeq) :: _ => XmlResponse(Group(ns))
case (r: LiftResponse) :: _ => r
case _ => JsCommands(S.noticesToJsCmd :: JsCmds.Noop :: S.jsToAppend).toResponse
}

LiftRules.cometLogger.debug("AJAX Response: " + liftSession.uniqueId + " " + ret)

Full(ret)
} finally {
if (S.functionMap.size > 0) {
liftSession.updateFunctionMap(S.functionMap, RenderVersion.get, millis)
S.clearFunctionMap
}
}
}
} catch {
case foc: LiftFlowOfControlException => throw foc
case e => S.assertExceptionThrown() ; NamedPF.applyBox((Props.mode, requestState, e), LiftRules.exceptionHandler.toList);
}
}

// Retry requests will stop trying to wait for the original request to
// complete 500ms after the client's timeout. This is because, while
// we want the original thread to complete so that it can provide an
// answer for future retries, we don't want retries tying up resources
// when the client won't receive the response anyway.
private lazy val ajaxPostTimeout: Long = LiftRules.ajaxPostTimeout * 1000L + 500L
/**
* Kick off AJAX handling. Extracts relevant versions and handles the
* begin/end servicing requests. Then checks whether to wait on an
* existing request for this same version to complete or whether to
* do the actual processing.
*/
private def handleAjax(liftSession: LiftSession, private def handleAjax(liftSession: LiftSession,
requestState: Req): Box[LiftResponse] = { requestState: Req): Box[LiftResponse] = {
extractVersion(requestState.path.partPath) { extractVersions(requestState.path.partPath) { versionInfo =>

LiftRules.cometLogger.debug("AJAX Request: " + liftSession.uniqueId + " " + requestState.params) LiftRules.cometLogger.debug("AJAX Request: " + liftSession.uniqueId + " " + requestState.params)
tryo { tryo {
LiftSession.onBeginServicing.foreach(_(liftSession, requestState)) LiftSession.onBeginServicing.foreach(_(liftSession, requestState))
} }


val ret = try { // Here, a Left[LAFuture] indicates a future that needs to be
requestState.param("__lift__GC") match { // *satisfied*, meaning we will run the request processing.
case Full(_) => // A Right[LAFuture] indicates a future we need to *wait* on,
liftSession.updateFuncByOwner(RenderVersion.get, millis) // meaning we will return the result of whatever satisfies the
Full(JavaScriptResponse(js.JsCmds.Noop)) // future.

val nextAction:Either[LAFuture[Box[LiftResponse]], LAFuture[Box[LiftResponse]]] =
case _ => versionInfo match {
try { case Full(AjaxVersionInfo(_, handlerVersion, pendingRequests)) =>
val what = flatten(try { val renderVersion = RenderVersion.get
liftSession.runParams(requestState)
} catch { liftSession.withAjaxRequests { currentAjaxRequests =>
case ResponseShortcutException(_, Full(to), _) => // Create a new future, put it in the request list, and return
import js.JsCmds._ // the associated info with the future that needs to be
List(RedirectTo(to)) // satisfied by the current request handler.
}) def newRequestInfo = {

val info = AjaxRequestInfo(handlerVersion, new LAFuture[Box[LiftResponse]], millis)
val what2 = what.flatMap {
case js: JsCmd => List(js) val existing = currentAjaxRequests.getOrElseUpdate(renderVersion, Nil)
case jv: JValue => List(jv) currentAjaxRequests += (renderVersion -> (info :: existing))
case n: NodeSeq => List(n)
case js: JsCommands => List(js) info
case r: LiftResponse => List(r)
case s => Nil
} }


val ret: LiftResponse = what2 match { val infoList = currentAjaxRequests.get(renderVersion)
case (json: JsObj) :: Nil => JsonResponse(json) val (requestInfo, result) =
case (jv: JValue) :: Nil => JsonResponse(jv) infoList
case (js: JsCmd) :: xs => { .flatMap { entries =>
(JsCommands(S.noticesToJsCmd :: Nil) & entries
((js :: xs).flatMap { .find(_.requestVersion == handlerVersion)
case js: JsCmd => List(js) .map { entry =>
case _ => Nil (entry, Right(entry.responseFuture))
}.reverse) & }
S.jsToAppend).toResponse }
} .getOrElse {
val entry = newRequestInfo


case (n: Node) :: _ => XmlResponse(n) (entry, Left(entry.responseFuture))
case (ns: NodeSeq) :: _ => XmlResponse(Group(ns)) }
case (r: LiftResponse) :: _ => r
case _ => JsCommands(S.noticesToJsCmd :: JsCmds.Noop :: S.jsToAppend).toResponse
}


LiftRules.cometLogger.debug("AJAX Response: " + liftSession.uniqueId + " " + ret) // If there are no other pending requests, we can
// invalidate all the render version's AJAX entries except
// for the current one, as the client is no longer looking
// to retry any of them.
if (pendingRequests == 0) {
// Satisfy anyone waiting on futures for invalid
// requests with a failure.
for {
list <- infoList
entry <- list if entry.requestVersion != handlerVersion
} {
entry.responseFuture.satisfy(Failure("Request no longer pending."))
}


Full(ret) currentAjaxRequests += (renderVersion -> List(requestInfo))
} finally {
if (S.functionMap.size > 0) {
liftSession.updateFunctionMap(S.functionMap, RenderVersion.get, millis)
S.clearFunctionMap
} }

result
} }

case _ =>
// Create a future that processes the ajax response
// immediately. This runs if we don't have a handler
// version, which happens in cases like AJAX requests for
// Lift GC that don't go through the de-duping pipeline.
// Because we always return a Left here, the ajax processing
// always runs for this type of request.
Left(new LAFuture[Box[LiftResponse]])
} }
} catch {
case foc: LiftFlowOfControlException => throw foc val ret:Box[LiftResponse] =
case e => S.assertExceptionThrown() ; NamedPF.applyBox((Props.mode, requestState, e), LiftRules.exceptionHandler.toList); nextAction match {
} case Left(future) =>
val result = runAjax(liftSession, requestState)
future.satisfy(result)

result

case Right(future) =>
val ret = future.get(ajaxPostTimeout) openOr Failure("AJAX retry timeout.")

ret
}

tryo { tryo {
LiftSession.onEndServicing.foreach(_(liftSession, requestState, ret)) LiftSession.onEndServicing.foreach(_(liftSession, requestState, ret))
} }

ret ret
} }
} }
Expand Down
51 changes: 48 additions & 3 deletions web/webkit/src/main/scala/net/liftweb/http/LiftSession.scala
Expand Up @@ -499,6 +499,15 @@ private final case class PostPageFunctions(renderVersion: String,


} }


/**
* The responseFuture will be satisfied by the original request handling
* thread when the response has been calculated. Retries will wait for the
* future to be satisfied in order to return the proper response.
*/
private[http] final case class AjaxRequestInfo(requestVersion: Long,
responseFuture: LAFuture[Box[LiftResponse]],
lastSeen: Long)

/** /**
* The LiftSession class containg the session state information * The LiftSession class containg the session state information
*/ */
Expand Down Expand Up @@ -557,6 +566,20 @@ class LiftSession(private[http] val _contextPath: String, val uniqueId: String,
*/ */
private var postPageFunctions: Map[String, PostPageFunctions] = Map() private var postPageFunctions: Map[String, PostPageFunctions] = Map()


/**
* A list of AJAX requests that may or may not be pending for this
* session. There is an entry for every AJAX request we don't *know*
* has completed successfully or been discarded by the client.
*
* See LiftServlet.handleAjax for how we determine we no longer need
* to hold a reference to an AJAX request.
*/
private var ajaxRequests = scala.collection.mutable.Map[String,List[AjaxRequestInfo]]()

private[http] def withAjaxRequests[T](fn: (scala.collection.mutable.Map[String, List[AjaxRequestInfo]]) => T) = {
ajaxRequests.synchronized { fn(ajaxRequests) }
}

/** /**
* The synchronization lock for the postPageFunctions * The synchronization lock for the postPageFunctions
*/ */
Expand Down Expand Up @@ -709,10 +732,8 @@ class LiftSession(private[http] val _contextPath: String, val uniqueId: String,
w match { w match {
// if it's going to a CometActor, batch up the commands // if it's going to a CometActor, batch up the commands
case Full(id) if asyncById.contains(id) => asyncById.get(id).toList.flatMap(a => case Full(id) if asyncById.contains(id) => asyncById.get(id).toList.flatMap(a =>
a.!?(a.cometProcessingTimeout, ActionMessageSet(f.map(i => buildFunc(i)), state)) match { a.!?(ActionMessageSet(f.map(i => buildFunc(i)), state)) match {
case Full(li: List[_]) => li
case li: List[_] => li case li: List[_] => li
case Empty => Full(a.cometProcessingTimeoutHandler())
case other => Nil case other => Nil
}) })
case _ => f.map(i => buildFunc(i).apply()) case _ => f.map(i => buildFunc(i).apply())
Expand Down Expand Up @@ -836,6 +857,22 @@ class LiftSession(private[http] val _contextPath: String, val uniqueId: String,
} }
} }


withAjaxRequests { currentAjaxRequests =>
for {
(version, requestInfos) <- currentAjaxRequests
} {
val remaining =
requestInfos.filter { info =>
(now - info.lastSeen) <= LiftRules.unusedFunctionsLifeTime
}

if (remaining.length > 0)
currentAjaxRequests += (version -> remaining)
else
currentAjaxRequests -= version
}
}

synchronized { synchronized {
messageCallback.foreach { messageCallback.foreach {
case (k, f) => case (k, f) =>
Expand Down Expand Up @@ -963,6 +1000,14 @@ class LiftSession(private[http] val _contextPath: String, val uniqueId: String,
} postPageFunctions += (ownerName -> funcInfo.updateLastSeen) } postPageFunctions += (ownerName -> funcInfo.updateLastSeen)
} }


withAjaxRequests { currentAjaxRequests =>
currentAjaxRequests.get(ownerName).foreach { requestInfos =>
val updated = requestInfos.map(_.copy(lastSeen = time))

currentAjaxRequests += (ownerName -> updated)
}
}

synchronized { synchronized {
(0 /: messageCallback)((l, v) => l + (v._2.owner match { (0 /: messageCallback)((l, v) => l + (v._2.owner match {
case Full(owner) if (owner == ownerName) => case Full(owner) if (owner == ownerName) =>
Expand Down