Skip to content

Commit

Permalink
airframe-http: Various client fixes (#1142)
Browse files Browse the repository at this point in the history
- Use the default design builder in Finagle examples
- Resolve ReflectObjectFactory.isStatic early because sbt may unload classloader early and causes NPE for Thread.getContextClassLoader. This often happened when testing examples project.
- Fix HttpResposneCodec to properly read raw MsgPack
- Use custom codec properly in FinagleClient
- Throw HttpClientException with 200 code when the request succeeded but converting the response body into the target object failed
  • Loading branch information
xerial committed Jun 19, 2020
1 parent 3b8887c commit 3d78428
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,23 @@ class FinagleClient(address: ServerAddress, config: FinagleClientConfig)
response.asInstanceOf[Future[A]]
} else {
// Need a conversion
val codec = MessageCodec.of[A]
val codec = codecFactory.of[A]
response
.map { r =>
val msgpack = responseCodec.toMsgPack(r)
codec.unpack(msgpack)
try {
codec.unpack(msgpack)
} catch {
case NonFatal(e) =>
val msg = s"Failed to parse the response body ${r}: ${r.contentString}"
warn(msg)
throw new HttpClientException(
r,
HttpStatus.ofCode(r.statusCode),
msg,
e
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package wvlet.airframe.http.finagle

import com.twitter.finagle.http.{Request, Response, Status}
import com.twitter.util.Await
import wvlet.airframe.Design
import wvlet.airframe.control.Control.withResource
import wvlet.airframe.http._
import wvlet.airspec.AirSpec
Expand Down Expand Up @@ -42,6 +43,11 @@ trait FinagleClientTestApi extends LogSupport {
User(id, "leo", getRequestId(request))
}

@Endpoint(method = HttpMethod.GET, path = "/user/:id/invalid")
def getInvalidUser(id: Int, request: Request): HttpMessage.Response = {
Http.response(HttpStatus.Ok_200).withJson("""{"id":"abc","user":"NA","requestId":"xxx"}""")
}

@Endpoint(method = HttpMethod.GET, path = "/user/info")
def getResource(id: Int, name: String, request: Request): User = {
User(id, name, getRequestId(request))
Expand Down Expand Up @@ -99,129 +105,140 @@ trait FinagleClientTestApi extends LogSupport {
/**
*/
class FinagleClientTest extends AirSpec {
val r = Router.add[FinagleClientTestApi]
val d = finagleDefaultDesign
.bind[FinagleServerConfig].toInstance(
FinagleServerConfig(name = "test-server", router = r)
)
.noLifeCycleLogging
private val r = Router.add[FinagleClientTestApi]

override protected def design: Design =
Finagle.server
.withName("test-server")
.withRouter(r)
.designWithSyncClient

def `create client`: Unit = {
def `convert HTTP responses into objects`(client: FinagleSyncClient): Unit = {
def addRequestId(request: Request): Request = {
request.headerMap.put("X-Request-Id", "10")
request
}
// Sending an implementation specific Request type
val ret = client.send(Request("/")).contentString
ret shouldBe "Ok"

// Using HTTP request wrappers
client.get[User]("/user/1") shouldBe User(1, "leo", "N/A")
client.getResource[UserRequest, User]("/user/info", UserRequest(2, "kai")) shouldBe User(2, "kai", "N/A")
client.getResource[UserRequest, User]("/user/info2", UserRequest(2, "kai")) shouldBe User(2, "kai", "N/A")
client.list[Seq[User]]("/user") shouldBe Seq(User(1, "leo", "N/A"))

client.post[User]("/user", User(2, "yui", "N/A")) shouldBe User(2, "yui", "N/A")
client.postOps[User, User]("/user", User(2, "yui", "N/A")) shouldBe User(2, "yui", "N/A")

client.put[User]("/user", User(10, "aina", "N/A")) shouldBe User(10, "aina", "N/A")
client.putOps[User, User]("/user", User(10, "aina", "N/A")) shouldBe User(10, "aina", "N/A")

client.patch[User]("/user", User(20, "joy", "N/A")) shouldBe User(20, "joy", "N/A")
client.patchOps[User, User]("/user", User(20, "joy", "N/A")) shouldBe User(20, "joy", "N/A")

client.delete[User]("/user/1") shouldBe User(1, "xxx", "N/A")
client.deleteOps[DeleteRequestBody, User]("/user/1", DeleteRequestBody(true)) shouldBe User(1, "xxx", "N/A")

// application/x-msgpack test
client.get[User]("/user/1", { r => r.accept = "application/x-msgpack"; r }) shouldBe User(1, "leo", "N/A")
client.post[User]("/user", User(2, "yui", "N/A"), { r => r.accept = "application/x-msgpack"; r }) shouldBe User(
2,
"yui",
"N/A"
)

d.build[FinagleServer] { server =>
withResource(FinagleClient.newSyncClient(server.localAddress)) { client =>
// Sending an implementation specific Request type
val ret = client.send(Request("/")).contentString
ret shouldBe "Ok"

// Using HTTP request wrappers
client.get[User]("/user/1") shouldBe User(1, "leo", "N/A")
client.getResource[UserRequest, User]("/user/info", UserRequest(2, "kai")) shouldBe User(2, "kai", "N/A")
client.getResource[UserRequest, User]("/user/info2", UserRequest(2, "kai")) shouldBe User(2, "kai", "N/A")
client.list[Seq[User]]("/user") shouldBe Seq(User(1, "leo", "N/A"))

client.post[User]("/user", User(2, "yui", "N/A")) shouldBe User(2, "yui", "N/A")
client.postOps[User, User]("/user", User(2, "yui", "N/A")) shouldBe User(2, "yui", "N/A")

client.put[User]("/user", User(10, "aina", "N/A")) shouldBe User(10, "aina", "N/A")
client.putOps[User, User]("/user", User(10, "aina", "N/A")) shouldBe User(10, "aina", "N/A")

client.patch[User]("/user", User(20, "joy", "N/A")) shouldBe User(20, "joy", "N/A")
client.patchOps[User, User]("/user", User(20, "joy", "N/A")) shouldBe User(20, "joy", "N/A")

client.delete[User]("/user/1") shouldBe User(1, "xxx", "N/A")
client.deleteOps[DeleteRequestBody, User]("/user/1", DeleteRequestBody(true)) shouldBe User(1, "xxx", "N/A")

// Get a response as is
client.get[Response]("/response").contentString shouldBe "raw response"

// Using a custom HTTP header
client.get[User]("/user/1", addRequestId) shouldBe User(1, "leo", "10")
client.getResource[UserRequest, User]("/user/info", UserRequest(2, "kai"), addRequestId) shouldBe User(
2,
"kai",
"10"
)
client.getResource[UserRequest, User]("/user/info2", UserRequest(2, "kai"), addRequestId) shouldBe User(
2,
"kai",
"10"
)

client.list[Seq[User]]("/user", addRequestId) shouldBe Seq(User(1, "leo", "10"))

client.post[User]("/user", User(2, "yui", "N/A"), addRequestId) shouldBe User(2, "yui", "10")
client.postOps[User, User]("/user", User(2, "yui", "N/A"), addRequestId) shouldBe User(2, "yui", "10")
client
.postRaw[User](
"/user",
User(2, "yui", "N/A"),
addRequestId
).contentString shouldBe """{"id":2,"name":"yui","requestId":"10"}"""

client.put[User]("/user", User(10, "aina", "N/A"), addRequestId) shouldBe User(10, "aina", "10")
client.putOps[User, User]("/user", User(10, "aina", "N/A"), addRequestId) shouldBe User(10, "aina", "10")
client
.putRaw[User](
"/user",
User(10, "aina", "N/A"),
addRequestId
).contentString shouldBe """{"id":10,"name":"aina","requestId":"10"}"""

client.patch[User]("/user", User(20, "joy", "N/A"), addRequestId) shouldBe User(20, "joy", "10")
client.patchOps[User, User]("/user", User(20, "joy", "N/A"), addRequestId) shouldBe User(20, "joy", "10")
client
.patchRaw[User](
"/user",
User(20, "joy", "N/A"),
addRequestId
).contentString shouldBe """{"id":20,"name":"joy","requestId":"10"}"""

client.delete[User]("/user/1", addRequestId) shouldBe User(1, "xxx", "10")
client.deleteOps[DeleteRequestBody, User]("/user/1", DeleteRequestBody(true), addRequestId) shouldBe User(
1,
"xxx",
"10"
)
client.deleteRaw("/user/1", addRequestId).contentString shouldBe """{"id":1,"name":"xxx","requestId":"10"}"""
}
}
// Get a response as is
client.get[Response]("/response").contentString shouldBe "raw response"

// Using a custom HTTP header
client.get[User]("/user/1", addRequestId) shouldBe User(1, "leo", "10")
client.getResource[UserRequest, User]("/user/info", UserRequest(2, "kai"), addRequestId) shouldBe User(
2,
"kai",
"10"
)
client.getResource[UserRequest, User]("/user/info2", UserRequest(2, "kai"), addRequestId) shouldBe User(
2,
"kai",
"10"
)

client.list[Seq[User]]("/user", addRequestId) shouldBe Seq(User(1, "leo", "10"))

client.post[User]("/user", User(2, "yui", "N/A"), addRequestId) shouldBe User(2, "yui", "10")
client.postOps[User, User]("/user", User(2, "yui", "N/A"), addRequestId) shouldBe User(2, "yui", "10")
client
.postRaw[User](
"/user",
User(2, "yui", "N/A"),
addRequestId
).contentString shouldBe """{"id":2,"name":"yui","requestId":"10"}"""

client.put[User]("/user", User(10, "aina", "N/A"), addRequestId) shouldBe User(10, "aina", "10")
client.putOps[User, User]("/user", User(10, "aina", "N/A"), addRequestId) shouldBe User(10, "aina", "10")
client
.putRaw[User](
"/user",
User(10, "aina", "N/A"),
addRequestId
).contentString shouldBe """{"id":10,"name":"aina","requestId":"10"}"""

client.patch[User]("/user", User(20, "joy", "N/A"), addRequestId) shouldBe User(20, "joy", "10")
client.patchOps[User, User]("/user", User(20, "joy", "N/A"), addRequestId) shouldBe User(20, "joy", "10")
client
.patchRaw[User](
"/user",
User(20, "joy", "N/A"),
addRequestId
).contentString shouldBe """{"id":20,"name":"joy","requestId":"10"}"""

client.delete[User]("/user/1", addRequestId) shouldBe User(1, "xxx", "10")
client.deleteOps[DeleteRequestBody, User]("/user/1", DeleteRequestBody(true), addRequestId) shouldBe User(
1,
"xxx",
"10"
)
client.deleteRaw("/user/1", addRequestId).contentString shouldBe """{"id":1,"name":"xxx","requestId":"10"}"""
}

def `fail request`: Unit = {
d.build[FinagleServer] { server =>
withResource(
Finagle.client
.withMaxRetry(3)
.withBackOff(initialIntervalMillis = 1)
.newSyncClient(server.localAddress)
) { client =>
warn("Starting http client failure tests")

{
// Test max retry failure
val ex = intercept[HttpClientMaxRetryException] {
val resp = client.get[String]("/busy")
}
warn(ex.getMessage)
ex.retryContext.retryCount shouldBe 3
ex.retryContext.maxRetry shouldBe 3
val cause = ex.retryContext.lastError.asInstanceOf[HttpClientException]
cause.status shouldBe HttpStatus.InternalServerError_500
def `fail request`(server: FinagleServer): Unit = {
withResource(
Finagle.client
.withMaxRetry(3)
.withBackOff(initialIntervalMillis = 1)
.newSyncClient(server.localAddress)
) { client =>
warn("Starting http client failure tests")

{
// Test max retry failure
val ex = intercept[HttpClientMaxRetryException] {
val resp = client.get[String]("/busy")
}
warn(ex.getMessage)
ex.retryContext.retryCount shouldBe 3
ex.retryContext.maxRetry shouldBe 3
val cause = ex.retryContext.lastError.asInstanceOf[HttpClientException]
cause.status shouldBe HttpStatus.InternalServerError_500
}

{
// Non retryable response
val cause = intercept[HttpClientException] {
client.get[String]("/forbidden")
}
warn(cause.getMessage)
cause.status shouldBe HttpStatus.Forbidden_403
}

{
// Non retryable response
val cause = intercept[HttpClientException] {
client.get[String]("/forbidden")
}
warn(cause.getMessage)
cause.status shouldBe HttpStatus.Forbidden_403
{
// Parse invalid json response
val e = intercept[HttpClientException] {
client.get[User]("/user/1/invalid")
}
e.status shouldBe HttpStatus.Ok_200
e.getCause.getClass shouldBe classOf[IllegalArgumentException]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class HttpResponseCodec[Resp: HttpResponseAdapter] extends MessageCodec[HttpResp
override def pack(p: Packer, v: HttpResponse[_]): Unit = {
v.contentType match {
case Some("application/x-msgpack") =>
// Raw msgpack response
val b = v.contentBytes
p.packArrayHeader(b.length)
p.writePayload(b)
case Some(x) if x.startsWith("application/json") =>
// JSON -> MsgPack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ object ReflectSurfaceFactory extends LogSupport {
}

private class ReflectObjectFactory extends ObjectFactory {
private lazy val isStatic = mirror.classSymbol(rawType).isStatic
private val isStatic = mirror.classSymbol(rawType).isStatic
private def outerInstance: Option[AnyRef] = {
if (isStatic) {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ package wvlet.airframe.examples.http

import com.twitter.finagle.http.Response
import wvlet.airframe.control.Control.withResource
import wvlet.airframe.http.finagle.{FinagleClient, FinagleServer}
import wvlet.airframe.http.{Endpoint, HttpMethod, Router, finagle}
import wvlet.airframe.http.finagle.{Finagle, FinagleClient, FinagleServer}
import wvlet.airframe.http.{Endpoint, HttpMethod, Router}
import wvlet.log.LogSupport

/**
Expand Down Expand Up @@ -63,7 +63,11 @@ object Http_02_ObjectMapping extends App with LogSupport {
}

val router = Router.add[MyApp]
val design = finagle.newFinagleServerDesign(name = "myapp", router = router)
val design =
Finagle.server
.withName("myapp")
.withRouter(router)
.design

design.build[FinagleServer] { server =>
withResource(FinagleClient.newSyncClient(server.localAddress)) { client =>
Expand All @@ -74,7 +78,7 @@ object Http_02_ObjectMapping extends App with LogSupport {

client.get[Response]("/v1/resource/resource_path")
}

// Add this code to keep running the server process
//server.waitServerTermination
}
}
Loading

0 comments on commit 3d78428

Please sign in to comment.