Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

close() and draining in PoolingService.

  • Loading branch information...
commit 5edac2a3978231e2cb427733d7a57c22624de0a7 1 parent 3cad08c
@mariusae mariusae authored
View
3  finagle-core/src/main/scala/com/twitter/finagle/Exceptions.scala
@@ -19,6 +19,9 @@ class WriteException(e: Throwable) extends ChannelException {
override def toString = "%s: %s".format(super.toString, e.toString)
}
+// Service layer errors.
+class ServiceException extends Exception
+class ServiceClosedException extends ServiceException
// Subclass this for application exceptions
class ApplicationException extends Exception
View
6 finagle-core/src/main/scala/com/twitter/finagle/Service.scala
@@ -30,6 +30,12 @@ abstract class Service[-Req, +Rep] extends (Req => Future[Rep]) {
* Release any external resources. Overriden in subclasses.
*/
def close() {}
+
+ /**
+ * Determines whether this service is available (can accept requests
+ * with a reasonable likelihood of success.
+ */
+ def isAvailable: Boolean = true
}
/**
View
84 finagle-core/src/main/scala/com/twitter/finagle/channel/PoolingService.scala
@@ -1,51 +1,85 @@
package com.twitter.finagle.channel
+import scala.annotation.tailrec
+
import java.util.concurrent.ConcurrentLinkedQueue
import org.jboss.netty.bootstrap.ClientBootstrap
-import com.twitter.util.{Promise, Throw}
+import com.twitter.util.{Promise, Throw, Future}
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.util.{Ok, Error}
-import com.twitter.finagle.Service
+import com.twitter.finagle.{Service, WriteException, ServiceClosedException}
+
+// TODO: spec.
class PoolingService[Req, Rep](bootstrap: ClientBootstrap)
extends Service[Req, Rep]
{
+ private[this] var isOpen = true
private[this] val channelServices =
new ConcurrentLinkedQueue[ChannelService[Req, Rep]]
- private[this] def dequeue() = {
- var service = null: ChannelService[Req, Rep]
- do {
- service = channelServices.poll()
- } while ((service ne null) && !service.isAvailable)
-
- service
+ @tailrec private[this] def dequeue(): Option[ChannelService[Req, Rep]] = {
+ val service = channelServices.poll()
+ if (service eq null) {
+ None
+ } else if (!service.isAvailable) {
+ service.close()
+ dequeue()
+ } else {
+ Some(service)
+ }
}
- private[this] def enqueue(service: ChannelService[Req, Rep]) {
- channelServices offer service
+ private[this] def enqueue(service: ChannelService[Req, Rep]) = synchronized {
+ if (isOpen && service.isAvailable)
+ channelServices offer service
+ else
+ service.close()
}
- def apply(request: Req) = {
- val service = dequeue()
- if (service eq null) {
- val replyFuture = new Promise[Rep]
+ def apply(request: Req): Future[Rep] = {
+ val service = synchronized {
+ if (!isOpen)
+ return Future.exception(new ServiceClosedException)
+
+ dequeue()
+ }
- bootstrap.connect() {
- case Ok(channel) =>
- val service = new ChannelService[Req, Rep](channel)
- service(request) ensure { enqueue(service) } respond { replyFuture.update(_) }
+ service match {
+ case Some(service) =>
+ service(request) ensure { enqueue(service) }
- case Error(cause) =>
- replyFuture() = Throw(cause)
- }
+ case None =>
+ val replyFuture = new Promise[Rep]
- replyFuture
- } else {
- service(request) ensure { enqueue(service) }
+ bootstrap.connect() {
+ case Ok(channel) =>
+ val service = new ChannelService[Req, Rep](channel)
+ service(request) ensure { enqueue(service) } respond { replyFuture.update(_) }
+
+ case Error(cause) =>
+ // Any connection error is a write error.
+ replyFuture() = Throw(new WriteException(cause))
+ }
+
+ replyFuture
}
}
+
+ override def isAvailable = synchronized { isOpen }
+
+ override def close() = synchronized {
+ isOpen = false
+
+ // what to do about outstanding requests?
+ var service = null: ChannelService[Req, Rep]
+ do {
+ service = channelServices.poll()
+ if (service ne null)
+ service.close()
+ } while (service ne null)
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.