Skip to content

Commit

Permalink
Add request executor which can handle multiple urls
Browse files Browse the repository at this point in the history
  • Loading branch information
takezoe committed Jan 16, 2018
1 parent 86e2e85 commit ab19389
Showing 1 changed file with 101 additions and 57 deletions.
158 changes: 101 additions & 57 deletions src/main/scala/com/github/takezoe/resty/HttpClientSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,9 @@ object HttpClientSupport {

trait RequestExecutor {

def url: String

def execute[T](httpClient: OkHttpClient, builder: Request.Builder,
configurer: Request.Builder => Unit, clazz: Class[_]): Either[ErrorModel, T]

def executeAsync[T](httpClient: OkHttpClient, builder: Request.Builder,
configurer: Request.Builder => Unit, clazz: Class[_]): Future[Either[ErrorModel, T]]
def execute[T](httpClient: OkHttpClient, configurer: (String, Request.Builder) => Unit, clazz: Class[_]): Either[ErrorModel, T]

def executeAsync[T](httpClient: OkHttpClient, configurer: (String, Request.Builder) => Unit, clazz: Class[_]): Future[Either[ErrorModel, T]]

protected def handleResponse[T](request: Request, response: Response, clazz: Class[_]): Either[ErrorModel, T] = {
try {
Expand Down Expand Up @@ -127,12 +122,16 @@ class SimpleRequestExecutor(val url: String, config: HttpExecutorConfig) extends
private val disabledTime = new AtomicReference[Option[Long]](None)
private val failureCount = new AtomicInteger(0)

def execute[T](httpClient: OkHttpClient, builder: Request.Builder,
configurer: Request.Builder => Unit, clazz: Class[_]): Either[ErrorModel, T] = {
def isAvailable: Boolean = {
(config.maxFailure <= 0 || failureCount.get() <= config.maxFailure)
}

def execute[T](httpClient: OkHttpClient, configurer: (String, Request.Builder) => Unit, clazz: Class[_]): Either[ErrorModel, T] = {
try {
checkWhetherEnabled()

configurer(builder)
val builder = new Request.Builder()
configurer(url, builder)

val request = builder.build()
val response = withRetry(httpClient, request, config)
Expand All @@ -145,7 +144,7 @@ class SimpleRequestExecutor(val url: String, config: HttpExecutorConfig) extends

} catch {
case e: Exception => {
if(config.maxFailure > 0 && failureCount.incrementAndGet() >= config.maxFailure){
if(config.maxFailure > 0 && failureCount.incrementAndGet() > config.maxFailure){
disabledTime.set(Some(System.currentTimeMillis))
}
Left(ErrorModel(Seq(e.toString)))
Expand Down Expand Up @@ -183,12 +182,12 @@ class SimpleRequestExecutor(val url: String, config: HttpExecutorConfig) extends
???
}

def executeAsync[T](httpClient: OkHttpClient, builder: Request.Builder,
configurer: Request.Builder => Unit, clazz: Class[_]): Future[Either[ErrorModel, T]] = {
def executeAsync[T](httpClient: OkHttpClient, configurer: (String, Request.Builder) => Unit, clazz: Class[_]): Future[Either[ErrorModel, T]] = {
try {
checkWhetherEnabled()

configurer(builder)
val builder = new Request.Builder()
configurer(url, builder)

val request = builder.build()
val promise = Promise[Either[ErrorModel, T]]()
Expand All @@ -201,7 +200,7 @@ class SimpleRequestExecutor(val url: String, config: HttpExecutorConfig) extends
Thread.sleep(config.retryInterval) // TODO Don't brock a thread here!
httpClient.newCall(request)
} else {
if(config.maxFailure > 0 && failureCount.incrementAndGet() >= config.maxFailure){
if(config.maxFailure > 0 && failureCount.incrementAndGet() > config.maxFailure){
disabledTime.set(Some(System.currentTimeMillis))
}
promise.failure(e)
Expand All @@ -224,6 +223,35 @@ class SimpleRequestExecutor(val url: String, config: HttpExecutorConfig) extends
}
}

class RandomRequestExecutor(val urls: Seq[String], config: HttpExecutorConfig) extends RequestExecutor {

private val executors = urls.map(url => new SimpleRequestExecutor(url, config))

private def nextExecutor: Option[RequestExecutor] = {
val availableExecutors = executors.filter((_.isAvailable))
if(availableExecutors.isEmpty){
None
} else {
Some(availableExecutors((scala.math.random * availableExecutors.length).toInt))
}
}

override def execute[T](httpClient: OkHttpClient, configurer: (String, Request.Builder) => Unit, clazz: Class[_]): Either[ErrorModel, T] = {
nextExecutor match {
case Some(executor) => executor.execute(httpClient, configurer, clazz)
case None => Left(ErrorModel(Seq("No available url!")))
}
}

override def executeAsync[T](httpClient: OkHttpClient, configurer: (String, Request.Builder) => Unit, clazz: Class[_]): Future[Either[ErrorModel, T]] = {
nextExecutor match {
case Some(executor) => executor.executeAsync(httpClient, configurer, clazz)
case None => Future.successful(Left(ErrorModel(Seq("No available url!"))))
}
}

}

/**
* Configuration of behavior of HttpClient.
*
Expand All @@ -244,87 +272,103 @@ trait HttpClientSupport {
protected def httpClient = HttpClientSupport.httpClient

def httpGet[T](executor: RequestExecutor, configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Either[ErrorModel, T] = {
val builder = new Request.Builder().url(executor.url).get()
executor.execute(httpClient, builder, configurer, c.runtimeClass)
executor.execute(httpClient, (url, builder) => {
builder.url(url).get()
configurer(builder)
}, c.runtimeClass)
}

def httpGetAsync[T](executor: RequestExecutor, configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Future[Either[ErrorModel, T]] = {
val builder = new Request.Builder().url(executor.url).get()
executor.executeAsync(httpClient, builder, configurer, c.runtimeClass)
executor.executeAsync(httpClient, (url, builder) => {
builder.url(url).get()
configurer(builder)
}, c.runtimeClass)
}

def httpPost[T](executor: RequestExecutor, params: Map[String, String], configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Either[ErrorModel, T] = {
val formBuilder = new FormBody.Builder()
params.foreach { case (key, value) => formBuilder.add(key, value) }
executor.execute(httpClient, (url, builder) => {
val formBuilder = new FormBody.Builder()
params.foreach { case (key, value) => formBuilder.add(key, value) }

builder.url(url).post(formBuilder.build())

val builder = new Request.Builder().url(executor.url).post(formBuilder.build())
executor.execute(httpClient, builder, configurer, c.runtimeClass)
configurer(builder)
}, c.runtimeClass)
}

def httpPostAsync[T](executor: RequestExecutor, params: Map[String, String], configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Future[Either[ErrorModel, T]] = {
val formBuilder = new FormBody.Builder()
params.foreach { case (key, value) => formBuilder.add(key, value) }
executor.executeAsync(httpClient, (url, builder) => {
val formBuilder = new FormBody.Builder()
params.foreach { case (key, value) => formBuilder.add(key, value) }

builder.url(url).post(formBuilder.build())

val builder = new Request.Builder().url(executor.url).post(formBuilder.build())
executor.executeAsync(httpClient, builder, configurer, c.runtimeClass)
configurer(builder)
}, c.runtimeClass)
}

def httpPostJson[T](executor: RequestExecutor, doc: AnyRef, configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Either[ErrorModel, T] = {
val builder = new Request.Builder().url(executor.url)
.post(RequestBody.create(HttpClientSupport.ContentType_JSON, JsonUtils.serialize(doc)))

executor.execute(httpClient, builder, configurer, c.runtimeClass)
executor.execute(httpClient, (url, builder) => {
builder.url(url).post(RequestBody.create(HttpClientSupport.ContentType_JSON, JsonUtils.serialize(doc)))
configurer(builder)
}, c.runtimeClass)
}

def httpPostJsonAsync[T](executor: RequestExecutor, doc: AnyRef, configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Future[Either[ErrorModel, T]] = {
val builder = new Request.Builder().url(executor.url)
.post(RequestBody.create(HttpClientSupport.ContentType_JSON, JsonUtils.serialize(doc)))

executor.executeAsync(httpClient, builder, configurer, c.runtimeClass)
executor.executeAsync(httpClient, (url, builder) => {
builder.url(url).post(RequestBody.create(HttpClientSupport.ContentType_JSON, JsonUtils.serialize(doc)))
configurer(builder)
}, c.runtimeClass)
}

def httpPut[T](executor: RequestExecutor, params: Map[String, String], configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Either[ErrorModel, T] = {
val formBuilder = new FormBody.Builder()
params.foreach { case (key, value) => formBuilder.add(key, value) }
executor.execute(httpClient, (url, builder) => {
val formBuilder = new FormBody.Builder()
params.foreach { case (key, value) => formBuilder.add(key, value) }

val builder = new Request.Builder().url(executor.url).put(formBuilder.build())
executor.execute(httpClient, builder, configurer, c.runtimeClass)
builder.url(url).put(formBuilder.build())

configurer(builder)
}, c.runtimeClass)
}

def httpPutAsync[T](executor: RequestExecutor, params: Map[String, String], configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Future[Either[ErrorModel, T]] = {
val formBuilder = new FormBody.Builder()
params.foreach { case (key, value) => formBuilder.add(key, value) }
executor.executeAsync(httpClient, (url, builder) => {
val formBuilder = new FormBody.Builder()
params.foreach { case (key, value) => formBuilder.add(key, value) }

val builder = new Request.Builder().url(executor.url).put(formBuilder.build())
builder.url(url).put(formBuilder.build())

executor.executeAsync(httpClient, builder, configurer, c.runtimeClass)
configurer(builder)
}, c.runtimeClass)
}

def httpPutJson[T](executor: RequestExecutor, doc: AnyRef, configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Either[ErrorModel, T] = {
val builder = new Request.Builder().url(executor.url)
.put(RequestBody.create(HttpClientSupport.ContentType_JSON, JsonUtils.serialize(doc)))

executor.execute(httpClient, builder, configurer, c.runtimeClass)
executor.execute(httpClient, (url, builder) => {
builder.url(url).put(RequestBody.create(HttpClientSupport.ContentType_JSON, JsonUtils.serialize(doc)))
configurer(builder)
}, c.runtimeClass)
}

def httpPutJsonAsync[T](executor: RequestExecutor, doc: AnyRef, configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Future[Either[ErrorModel, T]] = {
val builder = new Request.Builder().url(executor.url)
.put(RequestBody.create(HttpClientSupport.ContentType_JSON, JsonUtils.serialize(doc)))

executor.executeAsync(httpClient, builder, configurer, c.runtimeClass)
executor.executeAsync(httpClient, (url, builder) => {
builder.url(url).put(RequestBody.create(HttpClientSupport.ContentType_JSON, JsonUtils.serialize(doc)))
configurer(builder)
}, c.runtimeClass)
}

def httpDelete[T](executor: RequestExecutor, configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Either[ErrorModel, T] = {
val builder = new Request.Builder().url(executor.url).delete()

executor.execute(httpClient, builder, configurer, c.runtimeClass)
executor.execute(httpClient, (url, builder) => {
builder.url(url).delete()
configurer(builder)
}, c.runtimeClass)
}

def httpDeleteAsync[T](executor: RequestExecutor, configurer: Request.Builder => Unit = (builder) => ())(implicit c: ClassTag[T]): Future[Either[ErrorModel, T]] = {
val builder = new Request.Builder().url(executor.url).delete()

executor.executeAsync(httpClient, builder, configurer, c.runtimeClass)
executor.executeAsync(httpClient, (url, builder) => {
builder.url(url).delete()
configurer(builder)
}, c.runtimeClass)
}

}
Expand Down

0 comments on commit ab19389

Please sign in to comment.