Skip to content

Commit 15bb04a

Browse files
tysonnorrismarkusthoemmes
authored andcommitted
Introduce a ContainerClient interface and an akka based implementation. (apache#3812)
HttpUtils (http client for invoker -> action container) uses org.apache.http client that is synchronous and poor performing for concurrent requests. I ran into problems using it with concurrent activation support. Instead of trying to force that client to work, this is work towards replacing it (or re-replacing it) with akka http based client.
1 parent 8f8863c commit 15bb04a

File tree

20 files changed

+660
-108
lines changed

20 files changed

+660
-108
lines changed

common/scala/src/main/scala/whisk/common/Logging.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ object LoggingMarkers {
250250
private val activation = "activation"
251251
private val kafka = "kafka"
252252
private val loadbalancer = "loadbalancer"
253+
private val containerClient = "containerClient"
253254

254255
/*
255256
* Controller related markers
@@ -297,6 +298,8 @@ object LoggingMarkers {
297298
def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, "kubectl", start, Some(cmd), Map("cmd" -> cmd))
298299
def INVOKER_CONTAINER_START(containerState: String) =
299300
LogMarkerToken(invoker, "containerStart", count, Some(containerState), Map("containerState" -> containerState))
301+
val CONTAINER_CLIENT_RETRIES =
302+
LogMarkerToken(containerClient, "retries", count)
300303

301304
// Kafka related markers
302305
def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count)
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package whisk.core.containerpool
19+
20+
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
22+
import akka.http.scaladsl.marshalling.Marshal
23+
import akka.http.scaladsl.model.HttpMethods
24+
import akka.http.scaladsl.model.HttpRequest
25+
import akka.http.scaladsl.model.HttpResponse
26+
import akka.http.scaladsl.model.MediaTypes
27+
import akka.http.scaladsl.model.MessageEntity
28+
import akka.http.scaladsl.model.StatusCodes
29+
import akka.http.scaladsl.model.headers.Accept
30+
import akka.http.scaladsl.model.headers.Connection
31+
import akka.http.scaladsl.unmarshalling.Unmarshal
32+
import akka.stream.StreamTcpException
33+
import akka.stream.scaladsl.Sink
34+
import akka.stream.scaladsl.Source
35+
import akka.util.ByteString
36+
import scala.concurrent.Await
37+
import scala.concurrent.ExecutionContext
38+
import scala.concurrent.Future
39+
import scala.concurrent.TimeoutException
40+
import scala.concurrent.duration._
41+
import scala.util.Try
42+
import scala.util.control.NonFatal
43+
import spray.json._
44+
import whisk.common.Logging
45+
import whisk.common.LoggingMarkers.CONTAINER_CLIENT_RETRIES
46+
import whisk.common.MetricEmitter
47+
import whisk.common.TransactionId
48+
import whisk.core.entity.ActivationResponse.ContainerHttpError
49+
import whisk.core.entity.ActivationResponse._
50+
import whisk.core.entity.ByteSize
51+
import whisk.core.entity.size.SizeLong
52+
import whisk.http.PoolingRestClient
53+
54+
/**
55+
* This HTTP client is used only in the invoker to communicate with the action container.
56+
* It allows to POST a JSON object and receive JSON object back; that is the
57+
* content type and the accept headers are both 'application/json.
58+
* This implementation uses the akka http host-level client API.
59+
*
60+
* @param hostname the host name
61+
* @param port the port
62+
* @param timeout the timeout in msecs to wait for a response
63+
* @param maxResponse the maximum size in bytes the connection will accept
64+
* @param queueSize once all connections are used, how big of queue to allow for additional requests
65+
* @param retryInterval duration between retries for TCP connection errors
66+
*/
67+
protected class AkkaContainerClient(
68+
hostname: String,
69+
port: Int,
70+
timeout: FiniteDuration,
71+
maxResponse: ByteSize,
72+
queueSize: Int,
73+
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
74+
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
75+
with ContainerClient {
76+
77+
def close() = Await.result(shutdown(), 30.seconds)
78+
79+
/**
80+
* Posts to hostname/endpoint the given JSON object.
81+
* Waits up to timeout before aborting on a good connection.
82+
* If the endpoint is not ready, retry up to timeout.
83+
* Every retry reduces the available timeout so that this method should not
84+
* wait longer than the total timeout (within a small slack allowance).
85+
*
86+
* @param endpoint the path the api call relative to hostname
87+
* @param body the JSON value to post (this is usually a JSON objecT)
88+
* @param retry whether or not to retry on connection failure
89+
* @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
90+
*/
91+
def post(endpoint: String, body: JsValue, retry: Boolean)(
92+
implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
93+
94+
//create the request
95+
val req = Marshal(body).to[MessageEntity].map { b =>
96+
//DO NOT reuse the connection
97+
//For details on Connection: Close handling, see:
98+
// - https://doc.akka.io/docs/akka-http/current/common/http-model.html#http-headers
99+
// - http://github.com/akka/akka-http/tree/v10.1.3/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala#L470-L571
100+
HttpRequest(HttpMethods.POST, endpoint, entity = b)
101+
.withHeaders(Connection("close"), Accept(MediaTypes.`application/json`))
102+
}
103+
104+
retryingRequest(req, timeout, retry)
105+
.flatMap {
106+
case (response, retries) => {
107+
if (retries > 0) {
108+
logging.debug(this, s"completed request to $endpoint after $retries retries")
109+
MetricEmitter.emitHistogramMetric(CONTAINER_CLIENT_RETRIES, retries)
110+
}
111+
112+
response.entity.contentLengthOption match {
113+
case Some(contentLength) if response.status != StatusCodes.NoContent =>
114+
if (contentLength <= maxResponse.toBytes) {
115+
Unmarshal(response.entity.withSizeLimit(maxResponse.toBytes)).to[String].map { o =>
116+
Right(ContainerResponse(response.status.intValue, o, None))
117+
}
118+
} else {
119+
truncated(response.entity.dataBytes).map { s =>
120+
Right(ContainerResponse(response.status.intValue, s, Some(contentLength.B, maxResponse)))
121+
}
122+
}
123+
case _ =>
124+
//handle missing Content-Length as NoResponseReceived
125+
//also handle 204 as NoResponseReceived, for parity with ApacheBlockingContainerClient client
126+
response.discardEntityBytes().future.map(_ => Left(NoResponseReceived()))
127+
}
128+
}
129+
}
130+
.recover {
131+
case t: TimeoutException => Left(Timeout(t))
132+
case NonFatal(t) => Left(ConnectionError(t))
133+
}
134+
}
135+
//returns a Future HttpResponse -> Int (where Int is the retryCount)
136+
private def retryingRequest(req: Future[HttpRequest],
137+
timeout: FiniteDuration,
138+
retry: Boolean,
139+
retryCount: Int = 0): Future[(HttpResponse, Int)] = {
140+
request(req)
141+
.map((_, retryCount))
142+
.recoverWith {
143+
case t: StreamTcpException if retry =>
144+
val newTimeout = timeout - retryInterval
145+
if (newTimeout > Duration.Zero) {
146+
akka.pattern.after(retryInterval, as.scheduler)(retryingRequest(req, newTimeout, retry, retryCount + 1))
147+
} else {
148+
logging.warn(
149+
this,
150+
s"POST failed after $retryCount retries with $t - no more retries because timeout exceeded.")
151+
Future.failed(new TimeoutException(t.getMessage))
152+
}
153+
}
154+
}
155+
156+
private def truncated(responseBytes: Source[ByteString, _],
157+
previouslyCaptured: ByteString = ByteString.empty): Future[String] = {
158+
responseBytes.prefixAndTail(1).runWith(Sink.head).flatMap {
159+
case (Nil, tail) =>
160+
//ignore the tail (MUST CONSUME ENTIRE ENTITY!)
161+
tail.runWith(Sink.ignore).map(_ => previouslyCaptured.utf8String)
162+
case (Seq(prefix), tail) =>
163+
val truncatedResponse = previouslyCaptured ++ prefix
164+
if (truncatedResponse.size < maxResponse.toBytes) {
165+
truncated(tail, truncatedResponse)
166+
} else {
167+
//ignore the tail (MUST CONSUME ENTIRE ENTITY!)
168+
//captured string MAY be larger than the max response, so take only maxResponse bytes to get the exact length
169+
tail.runWith(Sink.ignore).map(_ => truncatedResponse.take(maxResponse.toBytes.toInt).utf8String)
170+
}
171+
}
172+
}
173+
}
174+
175+
object AkkaContainerClient {
176+
177+
/** A helper method to post one single request to a connection. Used for container tests. */
178+
def post(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)(
179+
implicit logging: Logging,
180+
as: ActorSystem,
181+
ec: ExecutionContext,
182+
tid: TransactionId): (Int, Option[JsObject]) = {
183+
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
184+
val response = executeRequest(connection, endPoint, content)
185+
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
186+
connection.close()
187+
result
188+
}
189+
190+
/** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */
191+
def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: FiniteDuration)(
192+
implicit logging: Logging,
193+
tid: TransactionId,
194+
as: ActorSystem,
195+
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
196+
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
197+
val futureResults = contents.map { executeRequest(connection, endPoint, _) }
198+
val results = Await.result(Future.sequence(futureResults), timeout + 10.seconds) //additional timeout to complete futures
199+
connection.close()
200+
results
201+
}
202+
203+
private def executeRequest(connection: AkkaContainerClient, endpoint: String, content: JsValue)(
204+
implicit logging: Logging,
205+
as: ActorSystem,
206+
ec: ExecutionContext,
207+
tid: TransactionId): Future[(Int, Option[JsObject])] = {
208+
209+
val res = connection
210+
.post(endpoint, content, true)
211+
.map({
212+
case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
213+
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
214+
case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException()
215+
case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
216+
throw new java.util.concurrent.TimeoutException()
217+
case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
218+
})
219+
220+
res
221+
}
222+
}

common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala renamed to common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,15 @@ import whisk.core.entity.ByteSize
3838
import whisk.core.entity.size.SizeLong
3939

4040
import scala.annotation.tailrec
41+
import scala.concurrent._
4142
import scala.concurrent.{Await, ExecutionContext, Future}
4243
import scala.concurrent.duration._
4344
import scala.util.{Failure, Success, Try}
4445
import scala.util.control.NoStackTrace
4546

47+
// Used internally to wrap all exceptions for which the request can be retried
48+
protected[containerpool] case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace
49+
4650
/**
4751
* This HTTP client is used only in the invoker to communicate with the action container.
4852
* It allows to POST a JSON object and receive JSON object back; that is the
@@ -56,13 +60,16 @@ import scala.util.control.NoStackTrace
5660
* @param maxResponse the maximum size in bytes the connection will accept
5761
* @param maxConcurrent the maximum number of concurrent requests allowed (Default is 1)
5862
*/
59-
protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize, maxConcurrent: Int = 1)(
60-
implicit logging: Logging) {
63+
protected class ApacheBlockingContainerClient(hostname: String,
64+
timeout: FiniteDuration,
65+
maxResponse: ByteSize,
66+
maxConcurrent: Int = 1)(implicit logging: Logging, ec: ExecutionContext)
67+
extends ContainerClient {
6168

6269
/**
6370
* Closes the HttpClient and all resources allocated by it.
6471
*
65-
* This will close the HttpClient that is generated for this instance of HttpUtils. That will also cause the
72+
* This will close the HttpClient that is generated for this instance of ApacheBlockingContainerClient. That will also cause the
6673
* ConnectionManager to be closed alongside.
6774
*/
6875
def close(): Unit = HttpClientUtils.closeQuietly(connection)
@@ -80,20 +87,21 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse
8087
* @return Left(Error Message) or Right(Status Code, Response as UTF-8 String)
8188
*/
8289
def post(endpoint: String, body: JsValue, retry: Boolean)(
83-
implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
90+
implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = {
8491
val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
8592
entity.setContentType("application/json")
8693

8794
val request = new HttpPost(baseUri.setPath(endpoint).build)
8895
request.addHeader(HttpHeaders.ACCEPT, "application/json")
8996
request.setEntity(entity)
9097

91-
execute(request, timeout, maxConcurrent, retry)
98+
Future {
99+
blocking {
100+
execute(request, timeout, maxConcurrent, retry)
101+
}
102+
}
92103
}
93104

94-
// Used internally to wrap all exceptions for which the request can be retried
95-
private case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace
96-
97105
// Annotation will make the compiler complain if no tail recursion is possible
98106
@tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, maxConcurrent: Int, retry: Boolean)(
99107
implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
@@ -191,39 +199,47 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse
191199
.build
192200
}
193201

194-
object HttpUtils {
202+
object ApacheBlockingContainerClient {
195203

196204
/** A helper method to post one single request to a connection. Used for container tests. */
197-
def post(host: String, port: Int, endPoint: String, content: JsValue)(implicit logging: Logging,
198-
tid: TransactionId): (Int, Option[JsObject]) = {
199-
val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
205+
def post(host: String, port: Int, endPoint: String, content: JsValue)(
206+
implicit logging: Logging,
207+
tid: TransactionId,
208+
ec: ExecutionContext): (Int, Option[JsObject]) = {
209+
val timeout = 90.seconds
210+
val connection = new ApacheBlockingContainerClient(s"$host:$port", timeout, 1.MB)
200211
val response = executeRequest(connection, endPoint, content)
212+
val result = Await.result(response, timeout)
201213
connection.close()
202-
response
214+
result
203215
}
204216

205217
/** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */
206218
def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: Duration)(
207219
implicit logging: Logging,
208220
tid: TransactionId,
209221
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
210-
val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB, contents.size)
211-
val futureResults = contents.map(content => Future { executeRequest(connection, endPoint, content) })
222+
val connection = new ApacheBlockingContainerClient(s"$host:$port", 90.seconds, 1.MB, contents.size)
223+
val futureResults = contents.map { content =>
224+
executeRequest(connection, endPoint, content)
225+
}
212226
val results = Await.result(Future.sequence(futureResults), timeout)
213227
connection.close()
214228
results
215229
}
216230

217-
private def executeRequest(connection: HttpUtils, endpoint: String, content: JsValue)(
231+
private def executeRequest(connection: ApacheBlockingContainerClient, endpoint: String, content: JsValue)(
218232
implicit logging: Logging,
219-
tid: TransactionId): (Int, Option[JsObject]) = {
220-
connection.post(endpoint, content, retry = true) match {
233+
tid: TransactionId,
234+
ec: ExecutionContext): Future[(Int, Option[JsObject])] = {
235+
connection.post(endpoint, content, retry = true) map {
221236
case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption)
222237
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
223238
case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException()
224239
case Left(ConnectionError(_: java.net.SocketTimeoutException)) =>
225240
throw new java.util.concurrent.TimeoutException()
226241
case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
227242
}
243+
228244
}
229245
}

0 commit comments

Comments
 (0)