This repository has been archived by the owner on May 22, 2019. It is now read-only.
/
RoutingNode.scala
134 lines (105 loc) · 4.08 KB
/
RoutingNode.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package com.twitter.gizzard.shards
import java.lang.reflect.UndeclaredThrowableException
import java.util.concurrent.{ExecutionException, TimeoutException}
import com.twitter.util.{Try, Return, Throw}
import com.twitter.logging.Logger
abstract class RoutingNodeFactory[T] {
def instantiate(shardInfo: ShardInfo, weight: Int, children: Seq[RoutingNode[T]]): RoutingNode[T]
def materialize(shardInfo: ShardInfo) {}
}
// Turn case class or other generic constructors into node factories.
class ConstructorRoutingNodeFactory[T](constructor: (ShardInfo, Int, Seq[RoutingNode[T]]) => RoutingNode[T])
extends RoutingNodeFactory[T] {
def instantiate(info: ShardInfo, weight: Int, children: Seq[RoutingNode[T]]) = constructor(info, weight, children)
}
protected[shards] object RoutingNode {
// XXX: use real behavior once ShardStatus lands
sealed trait Behavior
case object Allow extends Behavior
case object Deny extends Behavior
case object Ignore extends Behavior
case class Leaf[T](info: ShardInfo, readBehavior: Behavior, writeBehavior: Behavior, shard: T)
}
abstract class RoutingNode[T] {
import RoutingNode._
def shardType = shardInfo.className // XXX: replace with some other thing
def shardInfo: ShardInfo
def weight: Int
def children: Seq[RoutingNode[T]]
protected[shards] def collectedShards(readOnly: Boolean): Seq[Leaf[T]]
protected val log = Logger.get
protected val exceptionLog = Logger.get("exception")
def shardInfos: Seq[ShardInfo] = children flatMap { _.shardInfos }
protected def nodeSetFromCollected(readOnly: Boolean) = {
val m = collectedShards(readOnly) groupBy { l =>
if (readOnly) l.readBehavior else l.writeBehavior
}
val active = m.getOrElse(Allow, Nil) map { l => (l.info, l.shard) }
val blocked = m.getOrElse(Deny, Nil) map { _.info }
new NodeSet(shardInfo, active, blocked)
}
def read = nodeSetFromCollected(true)
def write = nodeSetFromCollected(false)
@deprecated("use read.all instead")
def readAllOperation[A](f: T => A): Seq[Either[Throwable,A]] = read.all(f) map { f => Try(f()) } map {
case Return(r) => Right(r)
case Throw(e) => Left(e)
}
@deprecated("use read.any instead")
def readOperation[A](f: T => A): A = read.tryAny { (id, shard) =>
Try(f(shard)) onFailure { e => logException(e, shard, id) }
} apply
@deprecated("use write.all instead")
def writeOperation[A](f: T => A): A = {
var rv: Option[A] = None
write foreach { s => rv = Some(f(s)) }
rv.getOrElse(throw new ShardBlackHoleException(shardInfo.id))
}
@deprecated("reimplement using read.iterator instead")
def rebuildableReadOperation[A](f: T => Option[A])(rebuild: (T, T) => Unit): Option[A] = {
val iter = read.iterator
var everSuccessful = false
var toRebuild: List[T] = Nil
while (iter.hasNext) {
val (id, shard) = iter.next
try {
val result = f(shard)
everSuccessful = true
if (result.isEmpty) {
toRebuild = shard :: toRebuild
} else {
toRebuild.foreach(rebuild(shard, _))
return result
}
} catch {
case e => logException(e, shard, id)
}
}
if (everSuccessful) {
None
} else {
throw new ShardOfflineException(shardInfo.id)
}
}
protected def logException(e: Throwable, shard: T, id: ShardId) {
val normalized = normalizeException(e, id)
exceptionLog.warning(e, "Error on %s: %s", id, e)
}
protected def normalizeException(ex: Throwable, shardId: ShardId): Throwable = ex match {
case e: ExecutionException => normalizeException(e.getCause, shardId)
// fondly known as JavaOutrageException
case e: UndeclaredThrowableException => normalizeException(e.getCause, shardId)
case e: TimeoutException => new ReplicatingShardTimeoutException(shardId, e)
case e => e
}
// equals overrides
override def equals(other: Any) = other match {
case n: RoutingNode[_] => {
(shardInfo == n.shardInfo) &&
(weight == n.weight) &&
(children == n.children)
}
case _ => false
}
override def hashCode() = children.hashCode
}