Permalink
Browse files

Some musings on how to refactor the overloaded concept of a 'shard' into

nodes that deal with routing and nodes that perform db operations.
  • Loading branch information...
1 parent 5f7b97e commit 9aaa3e93922c64f641703ab3f48083fe0c6b28cc Benjy Weinberger committed Feb 16, 2011
@@ -0,0 +1,126 @@
+package com.twitter.gizzard.routing
+
+import java.lang.reflect.UndeclaredThrowableException
+import java.util.concurrent.{ExecutionException, TimeoutException, TimeUnit}
+import scala.collection.mutable
+import com.twitter.gizzard.thrift.conversions.Sequences._
+import net.lag.logging.Logger
+
+
+class ReplicatingNode(val id: RoutingNodeId,
+ val weight: Int,
+ val children: Seq[RoutingNode],
+ val loadBalancer: (() => Seq[RoutingNode]),
+ val future: Option[Future]) extends RoutingNode {
+
+ def readOperation(op: ReadOperation): OperationResult = failover(_.readOperation(op), children)
+
+ def readAllOperation(op: ReadOperation): Seq[OperationResult] = fanout(_.readAllOperation(op), children)
+
+ def rebuildableReadOperation(op: RebuildableReadOperation, state: RebuildState): RebuildState =
+ rebuildableFailover(_.rebuildableReadOperation(op, _), state, children)
+
+ def writeOperation(op: WriteOperation): OperationResult = {
+ fanout(_.writeOperation(op)::Nil, children).map {
+ case ex: ExceptionResult => return ex // Return the first failure, if any.
+ case answer: AnswerResult => answer
+ }.firstOption.getOrElse(throw new NodeBlackHoleException(id))
+ }
+
+ lazy val log = Logger.get
+
+ protected def unwrapException(exception: Throwable): Throwable = {
+ exception match {
+ case e: ExecutionException => unwrapException(e.getCause)
+ case e: UndeclaredThrowableException => unwrapException(e.getCause) // fondly known as JavaOutrageException
+ case e => e
+ }
+ }
+
+ protected def fanoutFuture(method: RoutingNode => Seq[OperationResult], replicas: Seq[RoutingNode], future: Future) = {
+ val results = new mutable.ArrayBuffer[OperationResult]()
+
+ replicas.map { replica => (replica, future(method(replica))) }.foreach { case (replica, futureTask) =>
+ try {
+ results ++= futureTask.get(future.timeout.inMillis, TimeUnit.MILLISECONDS)
+ } catch {
+ case e: Exception =>
+ unwrapException(e) match {
+ case e: NodeBlackHoleException => // nothing.
+ case e: TimeoutException => results += ExceptionResult(new ReplicationTimeoutException(replica.id, e))
+ case e => results += ExceptionResult(e)
+ }
+ }
+ }
+
+ results
+ }
+
+ protected def fanoutSerial(method: RoutingNode => Seq[OperationResult], replicas: Seq[RoutingNode]) = {
+ val results = new mutable.ArrayBuffer[OperationResult]()
+
+ replicas.foreach { replica =>
+ try {
+ results ++= method(replica)
+ } catch {
+ case e: NodeBlackHoleException => // nothing.
+ case e: TimeoutException => results += ExceptionResult(new ReplicationTimeoutException(replica.id, e))
+ case e => results += ExceptionResult(e)
+ }
+ }
+
+ results
+ }
+
+ protected def fanout(method: RoutingNode => Seq[OperationResult], replicas: Seq[RoutingNode]): Seq[OperationResult] = {
+ future match {
+ case None => fanoutSerial(method, replicas)
+ case Some(f) => fanoutFuture(method, replicas, f)
+ }
+ }
+
+ protected def failover(method: RoutingNode => OperationResult, replicas: Seq[RoutingNode]): OperationResult = {
+ replicas match {
+ case Seq() =>
+ throw new NodeOfflineException(id)
+ case Seq(replica, remainder @ _*) =>
+ try {
+ method(replica)
+ } catch {
+ case e: NodeRejectedOperationException =>
+ failover(method, remainder)
+ case e: RoutingException =>
+ log.warning(e, "Error on %s: %s", replica.id, e)
+ failover(method, remainder)
+ }
+ }
+ }
+
+ protected def rebuildableFailover(method: (RoutingNode, RebuildState) => RebuildState,
+ state: RebuildState,
+ replicas: Seq[RoutingNode]): RebuildState = {
+ replicas match {
+ case Seq() =>
+ if (state.everSuccessful) {
+ state
+ } else {
+ throw new NodeOfflineException(id)
+ }
+ case Seq(replica, remainder @ _*) =>
+ try {
+ val newState = method(replica, state)
+ newState.result match {
+ case None =>
+ rebuildableFailover(method, new RebuildState(None, replica :: newState.toRebuild, true), remainder)
+ case Some(answer) => newState
+ }
+ } catch {
+ case e: NodeRejectedOperationException =>
+ rebuildableFailover(method, state, remainder)
+ case e: RoutingException =>
+ log.warning(e, "Error on %s: %s", id, e)
+ rebuildableFailover(method, state, remainder)
+ }
+ }
+ }
+}
@@ -0,0 +1,51 @@
+package com.twitter.gizzard.routing
+
+
+/**
+ * Base class for all routing exceptions. Can also be used directly.
+ */
+class RoutingException(description: String, cause: Throwable) extends Exception(description, cause) {
+ def this(description: String) = this(description, null)
+}
+
+/**
+ * Routing exceptions that aren't interesting as stack traces. They refer to matter-of-course
+ * timeouts or rejections that aren't code errors. Any "cause" exception is thrown away, and
+ * no stack trace is filled in.
+ */
+class NormalRoutingException(description: String, nodeId: RoutingNodeId, cause: Throwable) extends
+ RoutingException(description + ": " + nodeId, null) {
+ def this(description: String, nodeId: RoutingNodeId) = this(description, nodeId, null)
+
+ override def fillInStackTrace() = this
+}
+
+/**
+ * Node refused to do the operation, possibly because it's blocked. This is not a retryable error.
+ *
+ * Often this exception is used to signal a ReplicatingNode that it should try another replica,
+ * because this node is read-only, write-only, or blocked (offline).
+ */
+class NodeRejectedOperationException(description: String, nodeId: RoutingNodeId) extends
+ NormalRoutingException(description, nodeId)
+
+/**
+ * Node cannot do the operation because all possible child nodes are unavailable. This is not a retryable error.
+ */
+class NodeOfflineException(nodeId: RoutingNodeId) extends
+ NormalRoutingException("All child nodes are down for node", nodeId)
+
+/**
+ * Node would like to be skipped for reads & writes. If all child nodes of a node do this then
+ * the write is "thrown away" and the exception is passed up.
+ */
+class NodeBlackHoleException(nodeId: RoutingNodeId) extends
+ NormalRoutingException("Node is blackholed", nodeId, null)
+
+/**
+ * A replicating node timed out while waiting for a response to a write request to one of the
+ * replica nodes. This is a "future" timeout and indicates that the replication future timeout
+ * is lower than your per-database write timeout. It only occurs when doing parallel writes.
+ */
+class ReplicationTimeoutException(nodeId: RoutingNodeId, ex: Throwable)
+ extends NormalRoutingException("Timeout waiting for write to node", nodeId, ex)
@@ -0,0 +1,43 @@
+package com.twitter.gizzard.routing
+
+trait RoutableOperation {
+}
+
+trait ReadOperation extends RoutableOperation {
+}
+
+trait WriteOperation extends RoutableOperation {
+}
+
+trait RebuildableReadOperation extends RoutableOperation {
+}
+
+trait OperationResult {
+}
+
+case class ExceptionResult(ex: Throwable) extends OperationResult {
+}
+
+case class AnswerResult[A](answer: A) extends OperationResult {
+}
+
+case class RoutingNodeId(id: String) {
+}
+
+case class RebuildState(val result: Option[OperationResult],
+ val toRebuild: List[RoutingNode],
+ everSuccessful: Boolean)
+
+trait RoutingNode {
+ def id: RoutingNodeId
+ def weight: Int
+ def children: Seq[RoutingNode]
+
+ def readOperation(op: ReadOperation): OperationResult
+ def readAllOperation(op: ReadOperation): Seq[OperationResult]
+ def rebuildableReadOperation(op: RebuildableReadOperation, state: RebuildState): RebuildState
+
+ def writeOperation(op: WriteOperation): OperationResult
+}
+
+
@@ -0,0 +1,17 @@
+package com.twitter.gizzard.routing
+
+// The client-facing entry point to the gizzard routing tree. Adapts the client-facing API to
+// the RoutingNode internal API (usually trivially).
+class RoutingTree(val root: RoutingNode) {
+
+ def readOperation(op: ReadOperation): OperationResult = root.readOperation(op)
+
+ def readAllOperation(op: ReadOperation): Seq[OperationResult] = root.readAllOperation(op)
+
+ def rebuildableReadOperation(op: RebuildableReadOperation): OperationResult = {
+ def finalState = root.rebuildableReadOperation(op, new RebuildState(None, Nil, false))
+ finalState.result.getOrElse(throw new NodeOfflineException(root.id))
+ }
+
+ def writeOperation(op: WriteOperation): OperationResult = root.writeOperation(op)
+}
Oops, something went wrong. Retry.

0 comments on commit 9aaa3e9

Please sign in to comment.