Skip to content

Commit

Permalink
Separate execution context from scheduling tasks in the future
Browse files Browse the repository at this point in the history
  • Loading branch information
rjmac committed May 2, 2013
1 parent fe5a5f2 commit cd68cea
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 59 deletions.
@@ -0,0 +1,30 @@
package com.socrata.future

import java.util.{TimerTask, Timer}
import scala.concurrent.{Promise, Future, ExecutionContext}
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

class ExecutionContextTimer(val timer: Timer) {
def in[T](duration: FiniteDuration)(f: => T)(implicit ctx: ExecutionContext): Future[T] = {
val promise = Promise[T]()
val task = new TimerTask {
def run() {
ctx.execute(new Runnable {
def run() {
promise.complete(Try(f))
}
})
}
}
timer.schedule(task, duration.toMillis)
promise.future
}
}

object ExecutionContextTimer {
def globalTimer = Implicits.globalTimer
object Implicits {
implicit lazy val globalTimer = new ExecutionContextTimer(new Timer(true))
}
}

This file was deleted.

Expand Up @@ -10,11 +10,11 @@ class EnrichedBoundRequestBuilder(b: AsyncHttpClient#BoundRequestBuilder) {
*
* @param consumer The entry point into the state machine managed by a [[com.socrata.http.NiceAsyncHandler]]
* @param executionContext A strategy for starting tasks asynchronously. */
def makeRequest[T](consumer: StatusConsumer[T])(implicit executionContext: ExecutionContext with Executor): Future[T] = {
def makeRequest[T](consumer: StatusConsumer[T])(implicit executionContext: ExecutionContext): Future[T] = {
WrappedFuture(b.execute(new NiceAsyncHandler(consumer)))
}

def makeRequest()(implicit executionContext: ExecutionContext with Executor): Future[Response] = {
def makeRequest()(implicit executionContext: ExecutionContext): Future[Response] = {
WrappedFuture(b.execute())
}
}
Expand Up @@ -8,7 +8,7 @@ import com.ning.http.client.ListenableFuture
import scala.concurrent.{Promise, Future, ExecutionContext}

private[http] object WrappedFuture {
def apply[A](underlying: ListenableFuture[A])(implicit executionContext: ExecutionContext with Executor): Future[A] = {
def apply[A](underlying: ListenableFuture[A])(implicit executionContext: ExecutionContext): Future[A] = {
val promise = Promise[A]()
underlying.addListener(new Runnable {
override def run() {
Expand All @@ -21,7 +21,15 @@ private[http] object WrappedFuture {
promise.failure(e)
}
}
}, executionContext)
}, executor)
promise.future
}

private def executor(implicit executionContext: ExecutionContext): Executor = executionContext match {
case e: Executor => e
case other =>
new Executor {
def execute(command: Runnable) { executionContext.execute(command) }
}
}
}
Expand Up @@ -2,13 +2,11 @@ package com.socrata.soda2.consumer.http

import scala.concurrent.ExecutionContext

import java.util.concurrent.Executor

import com.ning.http.client.AsyncHttpClient

import com.socrata.soda2.consumer.Consumer
import com.socrata.http.{Authorization, NoAuth}
import com.socrata.future.ScheduledExecutionContext
import com.socrata.future.ExecutionContextTimer

/** An implementation of [[com.socrata.soda2.consumer.Consumer]] which operates on a real HTTP server. */
class HttpConsumer(lowLevel: LowLevelHttp) extends Consumer(lowLevel) {
Expand All @@ -20,7 +18,7 @@ class HttpConsumer(lowLevel: LowLevelHttp) extends Consumer(lowLevel) {
* @param authorization The authorization strategy to use for making queries with this object.
* @param executionContext A strategy for starting asynchronous tasks.
*/
def this(client: AsyncHttpClient, host: String, port: Int = 443, authorization: Authorization = NoAuth)(implicit executionContext: ExecutionContext with ScheduledExecutionContext with Executor) =
def this(client: AsyncHttpClient, host: String, port: Int = 443, authorization: Authorization = NoAuth)(implicit executionContext: ExecutionContext, timer: ExecutionContextTimer) =
this(new LowLevelHttp(client, host, port, authorization))
}

Expand Down
Expand Up @@ -18,10 +18,10 @@ import com.socrata.http.{JsonEntityWriter, Authorization, Headers}
import com.socrata.http.implicits._
import com.socrata.iteratee.CharIteratee
import com.socrata.soda2.{Resource, Soda2Metadata}
import com.socrata.future.ScheduledExecutionContext
import com.socrata.future.ExecutionContextTimer

// should this be moved to soda2.http? See similar comment on LowLevel.
class LowLevelHttp(val client: AsyncHttpClient, val host: String, val port: Int, val authorization: Authorization)(implicit executionContext: ExecutionContext with ScheduledExecutionContext with Executor) extends LowLevel {
class LowLevelHttp(val client: AsyncHttpClient, val host: String, val port: Int, val authorization: Authorization)(implicit executionContext: ExecutionContext, timer: ExecutionContextTimer) extends LowLevel {
import LowLevelHttp._

def get[T](resource: Resource, getParameters: Map[String, Seq[String]], iteratee: (URI, Soda2Metadata) => CharIteratee[T]): Future[T] =
Expand Down Expand Up @@ -105,7 +105,7 @@ class LowLevelHttp(val client: AsyncHttpClient, val host: String, val port: Int,
case Left(newRequest) =>
log.debug("Got 202; retrying in {}s", newRequest.retryAfter)
progressCallback(newRequest.details)
executionContext.schedule(newRequest.retryAfter.seconds) {
timer.in(newRequest.retryAfter.seconds) {
onRetry(newRequest)
}.flatMap(identity)
}
Expand Down Expand Up @@ -181,7 +181,7 @@ class LowLevelHttp(val client: AsyncHttpClient, val host: String, val port: Int,
case Some(JNumber(n)) =>
log.debug("There are still {} pending geocodes; sleeping for 60s", n)
progressCallback(obj)
executionContext.schedule(60.seconds) {
timer.in(60.seconds) {
awaitNoPendingGeocodesFor(resource, progressCallback)
}
case _ =>
Expand Down
Expand Up @@ -2,16 +2,14 @@ package com.socrata.soda2.publisher.http

import scala.concurrent.ExecutionContext

import java.util.concurrent.Executor

import com.ning.http.client.AsyncHttpClient

import com.socrata.soda2.consumer.http.{LowLevelHttp, HttpConsumer}
import com.socrata.soda2.publisher.Publisher
import com.socrata.http.{NoAuth, Authorization}
import com.socrata.future.ScheduledExecutionContext
import com.socrata.future.ExecutionContextTimer

class HttpPublisher(lowLevel: LowLevelHttp) extends HttpConsumer(lowLevel) with Publisher {
def this(client: AsyncHttpClient, host: String, port: Int = 443, authorization: Authorization = NoAuth)(implicit executionContext: ExecutionContext with ScheduledExecutionContext with Executor) =
def this(client: AsyncHttpClient, host: String, port: Int = 443, authorization: Authorization = NoAuth)(implicit executionContext: ExecutionContext, timer: ExecutionContextTimer) =
this(new LowLevelHttp(client, host, port, authorization))
}
@@ -1,5 +1,6 @@
package com.socrata.soda2.consumer.sample

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.Await

Expand All @@ -10,17 +11,15 @@ import com.ning.http.client.{AsyncHttpClientConfig, AsyncHttpClient}

import com.socrata.soda2.consumer.http.HttpConsumer
import com.socrata.soda2.values.SodaString
import com.socrata.future.WrappedScheduledExecutionContext
import com.socrata.future.ExecutionContextTimer.Implicits._

object SimpleQuery {
def main(args: Array[String]) {
val clientConfig = new AsyncHttpClientConfig.Builder().
setSSLContext(SSLContext.getDefault). // Without this, ALL SSL certificates are treated as valid
build()
val client = new AsyncHttpClient(clientConfig)
val executor = Executors.newScheduledThreadPool(0)
try {
implicit val executionContext = new WrappedScheduledExecutionContext(executor)
val service = new HttpConsumer(client, "explore.data.gov")

// "select distinct(firstname) where lastname = 'clinton'" but
Expand All @@ -37,7 +36,6 @@ object SimpleQuery {
println("Done.")
} finally {
client.close()
executor.shutdown()
}
}
}
@@ -1,5 +1,6 @@
package com.socrata.soda2.publisher.sample

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.Await
import scala.concurrent.duration.Duration

Expand All @@ -12,17 +13,15 @@ import com.socrata.soda2.publisher.http.HttpPublisher
import com.socrata.http.BasicAuth
import com.socrata.soda2.Resource
import com.socrata.soda2.values.{SodaNumber, SodaString}
import com.socrata.future.WrappedScheduledExecutionContext
import com.socrata.future.ExecutionContextTimer.Implicits._

object SimpleUploadSample {
def main(args: Array[String]) {
val clientConfig = new AsyncHttpClientConfig.Builder().
setSSLContext(SSLContext.getDefault). // Without this, ALL SSL certificates are treated as valid
build()
val client = new AsyncHttpClient(clientConfig)
val executor = Executors.newScheduledThreadPool(0)
try {
implicit val executionContext = new WrappedScheduledExecutionContext(executor)
// to run this example, you need a Socrata account from
// opendata.socrata.com (or any other Socrata-powered data
// site), a dataset on opendata, and an app token on opendata.
Expand Down

0 comments on commit cd68cea

Please sign in to comment.