Skip to content

Commit

Permalink
finagle-core: Use configured deadline params
Browse files Browse the repository at this point in the history
Problem

We can set the `TimeoutFilter.PropagateDeadlines` and
`TimeoutFilter.PreferDeadlineOverTimeout` to configure behavior on the Client.
When the MethodBuilder interface is used, the `perRequestModule` does only look
at the `Default` values of these configurations, preventing custom configuration.

Solution

Pass the `TimeoutFilter.PropagateDeadlines` and
`TimeoutFilter.PreferDeadlineOverTimeout` as Params to the `perRequestModule` to
instantiate the `TimeoutFilter` with correct configuration.

Result

The MethodBuilder will pick up configured `TimeoutFilter.PropagateDeadlines`
and `TimeoutFilter.PreferDeadlineOverTimeout` parameters.

Signed-off-by: Jens Kat <jenskat@gmail.com>
  • Loading branch information
DieBauer committed Nov 1, 2023
1 parent 97abd88 commit 9c22777
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,31 +95,34 @@ object DynamicTimeout {
* @see [[LatencyCompensation]]
*/
def perRequestModule[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module3[
TimeoutFilter.Param,
param.Timer,
LatencyCompensation.Compensation,
new Stack.ModuleParams[
ServiceFactory[Req, Rep]
] {
val role: Stack.Role = TimeoutFilter.role
val description: String =
"Apply a dynamic timeout-derived deadline to request"

def make(
defaultTimeout: TimeoutFilter.Param,
timer: param.Timer,
compensation: LatencyCompensation.Compensation,
next: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] = {
val parameters = Seq(
implicitly[Stack.Param[TimeoutFilter.Param]],
implicitly[Stack.Param[param.Timer]],
implicitly[Stack.Param[LatencyCompensation.Compensation]],
implicitly[Stack.Param[TimeoutFilter.PropagateDeadlines]],
implicitly[Stack.Param[TimeoutFilter.PreferDeadlineOverTimeout]]
)

override def make(params: Stack.Params, next: ServiceFactory[Req, Rep]): ServiceFactory[Req, Rep] = {

val filter = new TimeoutFilter[Req, Rep](
timeoutFn(
PerRequestKey,
defaultTimeout.tunableTimeout,
params[TimeoutFilter.Param].tunableTimeout,
TimeoutFilter.Param.Default, // tunableTimeout() should always produce a value,
compensation.howlong // but we fall back on the default if not
params[LatencyCompensation.Compensation].howlong // but we fall back on the default if not
),
duration => new IndividualRequestTimeoutException(duration),
timer.timer
params[param.Timer].timer,
params[TimeoutFilter.PropagateDeadlines].enabled,
params[TimeoutFilter.PreferDeadlineOverTimeout].enabled
)
filter.andThen(next)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ import com.twitter.finagle.server.StackServer
import com.twitter.finagle.util.DefaultTimer
import com.twitter.finagle.mux
import com.twitter.finagle._
import com.twitter.util.Await
import com.twitter.util.Future
import com.twitter.finagle.service.TimeoutFilter.PropagateDeadlines
import com.twitter.util.Closable.close
import com.twitter.util.{Await, Closable, Future}

import java.net.InetSocketAddress
import org.scalatest.funsuite.AnyFunSuite

import java.util.concurrent.atomic.AtomicBoolean

class MethodBuilderTest extends AnyFunSuite {

private def await[T](f: Future[T]): T = Await.result(f, 15.seconds)
Expand Down Expand Up @@ -153,4 +157,37 @@ class MethodBuilderTest extends AnyFunSuite {
mux.Request.empty,
mux.Response.empty
)

test("Methodbuilder client does not propagate Deadlines") {
val deadlinePresent = new AtomicBoolean(true)
val service = Service.mk { request: http.Request =>
deadlinePresent.set(request.headerMap.get("Finagle-Ctx-com.twitter.finagle.Deadline").isDefined)
Future.value(http.Response())
}

val server = Http.server
.serve("localhost:*", service)
val addr = server.boundAddress.asInstanceOf[InetSocketAddress]

val noPropagationClient = Http.client
.withLabel("backend-noprop")
.configured(PropagateDeadlines(false).mk())
.methodBuilder(s"${addr.getHostName}:${addr.getPort}")
.newService

val defaultClient = Http.client
.withLabel("backend")
.methodBuilder(s"${addr.getHostName}:${addr.getPort}")
.newService

await(noPropagationClient(http.Request("/")))
assert(!deadlinePresent.get())
await(defaultClient(http.Request("/")))
assert(deadlinePresent.get())

await(server.close())
await(noPropagationClient.close())
await(defaultClient.close())
}

}

0 comments on commit 9c22777

Please sign in to comment.