Skip to content

Commit

Permalink
airframe-http: Http client interface clean up (#2231)
Browse files Browse the repository at this point in the history
* Remove requestFilter from generated RPC clients
* Extract ChannelConfig from HttpClientConfig
* Fix async client generator
  • Loading branch information
xerial committed Jun 4, 2022
1 parent cc82251 commit 439fb5b
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 121 deletions.
Expand Up @@ -31,7 +31,6 @@ object RPCClientGenerator extends HttpClientGenerator {
s"""${header(src.destPackageName)}
|
|import wvlet.airframe.http._
|import wvlet.airframe.http.HttpMessage.Request
|import wvlet.airframe.http.client.{SyncClient, AsyncClient}
|import scala.concurrent.Future
|
Expand Down Expand Up @@ -73,7 +72,11 @@ object RPCClientGenerator extends HttpClientGenerator {
}

def syncClientClass: String =
s"""class RPCSyncClient(private val client:SyncClient) extends AutoCloseable {
s"""class RPCSyncClient(private val client:SyncClient) extends wvlet.airframe.http.client.ClientFactory[RPCSyncClient] with AutoCloseable {
| override protected def build(newConfig: HttpClientConfig): RPCSyncClient = {
| new RPCSyncClient(client.withConfig(_ => newConfig))
| }
| override protected def config: HttpClientConfig = client.config
| override def close(): Unit = { client.close() }
| def getClient: SyncClient = client
|
Expand All @@ -82,10 +85,13 @@ object RPCClientGenerator extends HttpClientGenerator {
|""".stripMargin

def asyncClientClass: String =
s"""class RPCAsyncClient(private val client:AsyncClient) extends AutoCloseable {
s"""class RPCAsyncClient(private val client:AsyncClient) extends wvlet.airframe.http.client.ClientFactory[RPCAsyncClient] with AutoCloseable {
| override protected def build(newConfig: HttpClientConfig): RPCAsyncClient = {
| new RPCAsyncClient(client.withConfig(_ => newConfig))
| }
| override protected def config: HttpClientConfig = client.config
| override def close(): Unit = { client.close() }
| def getClient: AsyncClient = client
|
|${indent(asyncClientBody)}
|}
|""".stripMargin
Expand All @@ -111,8 +117,7 @@ object RPCClientGenerator extends HttpClientGenerator {
def sendRequestArgs(m: ClientMethodDef): String = {
Seq(
s""""${m.path}"""",
m.clientCallParameters.mkString(", "),
"requestFilter"
m.clientCallParameters.mkString(", ")
).mkString(", ")
}

Expand All @@ -121,7 +126,7 @@ object RPCClientGenerator extends HttpClientGenerator {
.map { m =>
val inputArgs =
m.inputParameters
.map(x => s"${x.name}: ${x.surface.fullTypeName}") ++ Seq("requestFilter: Request => Request = identity")
.map(x => s"${x.name}: ${x.surface.fullTypeName}")

val returnType = if (isAsync) s"Future[${m.returnType.fullTypeName}]" else m.returnType.fullTypeName

Expand Down
Expand Up @@ -39,11 +39,11 @@ class JSClientChannel(serverAddress: ServerAddress, private[client] val config:
// nothing to do
}

override def send(request: HttpMessage.Request, requestConfig: HttpClientConfig): HttpMessage.Response = ???
override def send(request: HttpMessage.Request, channelConfig: ChannelConfig): HttpMessage.Response = ???

override def sendAsync(
request: HttpMessage.Request,
requestConfig: HttpClientConfig
channelConfig: ChannelConfig
): Future[HttpMessage.Response] = {

val xhr = new dom.XMLHttpRequest()
Expand Down
Expand Up @@ -65,30 +65,30 @@ class JavaClientChannel(serverAddress: ServerAddress, private[http] val config:
}
}

override def send(req: Request, requestConfig: HttpClientConfig): Response = {
override def send(req: Request, channelConfig: ChannelConfig): Response = {
// New Java's HttpRequest is immutable, so we can reuse the same request instance
val httpRequest = buildRequest(serverAddress, req, requestConfig)
val httpRequest = buildRequest(serverAddress, req, channelConfig)
val httpResponse: HttpResponse[InputStream] =
javaHttpClient.send(httpRequest, BodyHandlers.ofInputStream())

readResponse(httpResponse)
}

override def sendAsync(req: Request, requestConfig: HttpClientConfig): Future[Response] = {
Future.apply(send(req, requestConfig))
override def sendAsync(req: Request, channelConfig: ChannelConfig): Future[Response] = {
Future.apply(send(req, channelConfig))
}

private def buildRequest(
serverAddress: ServerAddress,
request: Request,
requestConfig: HttpClientConfig
channelConfig: ChannelConfig
): HttpRequest = {
val uri = s"${serverAddress.uri}${if (request.uri.startsWith("/")) request.uri
else s"/${request.uri}"}"

val requestBuilder = HttpRequest
.newBuilder(URI.create(uri))
.timeout(java.time.Duration.ofMillis(requestConfig.readTimeout.toMillis))
.timeout(java.time.Duration.ofMillis(channelConfig.readTimeout.toMillis))

// Set HTTP request headers
request.header.entries.foreach(h => requestBuilder.setHeader(h.key, h.value))
Expand Down
Expand Up @@ -28,55 +28,51 @@ import scala.jdk.CollectionConverters._
class URLConnectionChannel(serverAddress: ServerAddress, config: HttpClientConfig) extends HttpChannel {
override private[client] implicit def executionContext: ExecutionContext = config.newExecutionContext

override def send(request: Request, requestConfig: HttpClientConfig): Response = {
override def send(request: Request, channelConfig: ChannelConfig): Response = {
val url = s"${serverAddress.uri}${if (request.uri.startsWith("/")) request.uri
else s"/${request.uri}"}"

// Send the request with retry support. Setting the context request is necessary to properly show
// the request path upon errors
requestConfig.retryContext.runWithContext(request) {
val conn0: HttpURLConnection =
new java.net.URL(url).openConnection().asInstanceOf[HttpURLConnection]
conn0.setRequestMethod(request.method)
for (e <- request.header.entries) {
conn0.setRequestProperty(e.key, e.value)
}
conn0.setDoInput(true)
val conn0: HttpURLConnection =
new java.net.URL(url).openConnection().asInstanceOf[HttpURLConnection]
conn0.setRequestMethod(request.method)
for (e <- request.header.entries) {
conn0.setRequestProperty(e.key, e.value)
}
conn0.setDoInput(true)

def timeoutMillis(d: Duration): Int = {
if (d.isFinite) {
d.toMillis.toInt
} else {
0
}
def timeoutMillis(d: Duration): Int = {
if (d.isFinite) {
d.toMillis.toInt
} else {
0
}
}

conn0.setReadTimeout(timeoutMillis(requestConfig.readTimeout))
conn0.setConnectTimeout(timeoutMillis(requestConfig.connectTimeout))
conn0.setInstanceFollowRedirects(true)
conn0.setReadTimeout(timeoutMillis(channelConfig.readTimeout))
conn0.setConnectTimeout(timeoutMillis(channelConfig.connectTimeout))
conn0.setInstanceFollowRedirects(true)

val conn = conn0 // config.connectionFilter(conn0)
val content = request.contentBytes
if (content.nonEmpty) {
conn.setDoOutput(true)
Control.withResource(conn.getOutputStream()) { (out: OutputStream) =>
out.write(content)
out.flush()
}
val conn = conn0 // config.connectionFilter(conn0)
val content = request.contentBytes
if (content.nonEmpty) {
conn.setDoOutput(true)
Control.withResource(conn.getOutputStream()) { (out: OutputStream) =>
out.write(content)
out.flush()
}
}

try {
Control.withResource(conn.getInputStream()) { (in: InputStream) =>
readResponse(conn, in)
}
} catch {
case e: IOException if conn.getResponseCode != -1 =>
// When the request fails, but the server still returns meaningful responses
// (e.g., 404 NotFound throws FileNotFoundException)
Control.withResource(conn.getErrorStream()) { (err: InputStream) =>
readResponse(conn, err)
}
try {
Control.withResource(conn.getInputStream()) { (in: InputStream) =>
readResponse(conn, in)
}
} catch {
case e: IOException if conn.getResponseCode != -1 =>
// When the request fails, but the server still returns meaningful responses
// (e.g., 404 NotFound throws FileNotFoundException)
Control.withResource(conn.getErrorStream()) { (err: InputStream) =>
readResponse(conn, err)
}
}
}

Expand All @@ -102,8 +98,8 @@ class URLConnectionChannel(serverAddress: ServerAddress, config: HttpClientConfi
response.withContent(responseContentBytes)
}

override def sendAsync(req: Request, requestConfig: HttpClientConfig): Future[Response] = {
Future.apply(send(req, requestConfig))
override def sendAsync(req: Request, channelConfig: ChannelConfig): Future[Response] = {
Future.apply(send(req, channelConfig))
}

override def close(): Unit = {
Expand Down
Expand Up @@ -15,28 +15,29 @@ package wvlet.airframe.http
import wvlet.airframe.codec.MessageCodecFactory
import wvlet.airframe.control.CircuitBreaker
import wvlet.airframe.control.Retry.RetryContext
import wvlet.airframe.http.HttpMessage.{Request, Response}
import wvlet.airframe.http.client.{
AsyncClient,
AsyncClientImpl,
ClientFilter,
HttpClientBackend,
SyncClient,
SyncClientImpl
}
import wvlet.airframe.http.HttpMessage.Request
import wvlet.airframe.http.client.{AsyncClient, ClientFilter, HttpClientBackend, SyncClient}
import wvlet.airframe.rx.{Rx, RxStream}

import java.util.concurrent.TimeUnit
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}

object HttpClientConfig {
// Tell the IntelliJ that Compat object implements CompatAPI. This a workaround of IntelliJ, which cannot properly highlight cross-build project code:
// https://youtrack.jetbrains.com/issue/SCL-19567/Support-of-CrossType-Full-wanted
private def compat: CompatApi = wvlet.airframe.http.Compat
}

import HttpClientConfig._
import wvlet.airframe.http.HttpClientConfig._

/**
* Contains only http channel related configurations in HttpClientConfig
*/
trait ChannelConfig {
def connectTimeout: Duration
def readTimeout: Duration
}

/**
*/
Expand All @@ -63,7 +64,7 @@ case class HttpClientConfig(
// TODO: This execution context needs to reference a global one if we need to use it in Scala JVM
Rx.future(f)(compat.defaultExecutionContext)
}
) {
) extends ChannelConfig {
def newSyncClient(serverAddress: String): SyncClient =
backend.newSyncClient(ServerAddress(serverAddress), this)

Expand Down
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.airframe.http.client

import wvlet.airframe.control.CircuitBreaker
import wvlet.airframe.control.Retry.RetryContext
import wvlet.airframe.http.HttpClientConfig
import wvlet.airframe.http.HttpMessage.Request

import scala.concurrent.duration.Duration

/**
* Interface for customizing config for each requests
*
* @tparam ClientImpl
*/
trait ClientFactory[ClientImpl] {

protected def config: HttpClientConfig

/**
* Create a new client sharing the same underlying http client
* @param newConfig
* @return
*/
protected def build(newConfig: HttpClientConfig): ClientImpl

def withRequestFilter(requestFilter: Request => Request): ClientImpl = {
build(config.withRequestFilter(requestFilter))
}
def withClientFilter(filter: ClientFilter): ClientImpl = {
build(config.withClientFilter(filter))
}
def withRetryContext(filter: RetryContext => RetryContext): ClientImpl = {
build(config.withRetryContext(filter))
}
def withConfig(filter: HttpClientConfig => HttpClientConfig): ClientImpl = {
build(filter(config))
}
def withConnectTimeout(duration: Duration): ClientImpl = {
build(config.withConnectTimeout(duration))
}
def withReadTimeout(duration: Duration): ClientImpl = {
build(config.withReadTimeout(duration))
}
def withCircuitBreaker(filter: CircuitBreaker => CircuitBreaker): ClientImpl = {
build(config.withCircuitBreaker(filter))
}
}
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package wvlet.airframe.http.client

import wvlet.airframe.http.ChannelConfig
import wvlet.airframe.http.HttpMessage.{Request, Response}

import scala.concurrent.{ExecutionContext, Future}

/**
* A low-level interface for sending HTTP requests without managing retries or filters
*/
trait HttpChannel extends AutoCloseable {

/**
* Send the request without modification.
* @param req
* @param channelConfig
* @return
*/
def send(req: Request, channelConfig: ChannelConfig): Response
def sendAsync(req: Request, channelConfig: ChannelConfig): Future[Response]

private[client] implicit def executionContext: ExecutionContext
}

0 comments on commit 439fb5b

Please sign in to comment.