Permalink
Browse files

allow write-only shards to make progress with edge writes.

  • Loading branch information...
1 parent ab0b883 commit 8900674cb02503bd4e7853aab5c699888ccaeff9 @freels freels committed Oct 21, 2011
@@ -24,7 +24,7 @@ import com.twitter.conversions.time._
import com.twitter.flockdb.{State, ForwardingManager, Cursor, UuidGenerator, Direction}
import com.twitter.flockdb.conversions.Numeric._
import com.twitter.flockdb.shards.Shard
-import com.twitter.flockdb.shards.LockingRoutingNode._
+import com.twitter.flockdb.shards.LockingNodeSet._
class SingleJobParser(
@@ -92,8 +92,8 @@ extends JsonJob {
}
def apply() = {
- val forward = forwardingManager.findNode(sourceId, graphId, Direction.Forward)
- val backward = forwardingManager.findNode(destinationId, graphId, Direction.Backward)
+ val forward = forwardingManager.findNode(sourceId, graphId, Direction.Forward).write
+ val backward = forwardingManager.findNode(destinationId, graphId, Direction.Backward).write
val uuid = uuidGenerator(position)
var currSuccesses: List[ShardId] = Nil
@@ -102,8 +102,8 @@ extends JsonJob {
forward.optimistically(sourceId) { left =>
backward.optimistically(destinationId) { right =>
val state = left max right max preferredState
- val forwardResults = writeToShard(forward.write, sourceId, destinationId, uuid, state)
- val backwardResults = writeToShard(backward.write, destinationId, sourceId, uuid, state)
+ val forwardResults = writeToShard(forward, sourceId, destinationId, uuid, state)
+ val backwardResults = writeToShard(backward, destinationId, sourceId, uuid, state)
List(forwardResults, backwardResults) foreach {
_ foreach {
@@ -3,7 +3,7 @@ package shards
import com.twitter.util.{Time, Try, Return, Throw}
import com.twitter.logging.Logger
-import com.twitter.gizzard.shards.{ShardException, RoutingNode}
+import com.twitter.gizzard.shards.{ShardException, NodeSet}
class OptimisticLockException(message: String) extends ShardException(message)
@@ -113,10 +113,10 @@ trait OptimisticStateMonitor {
}
}
-object LockingRoutingNode {
- implicit def asLockingRoutingNode(n: RoutingNode[Shard]) = new LockingRoutingNode(n)
+object LockingNodeSet {
+ implicit def asLockingNodeSet(n: NodeSet[Shard]) = new LockingNodeSet(n)
}
-class LockingRoutingNode(node: RoutingNode[Shard]) extends OptimisticStateMonitor {
- def getMetadatas(id: Long) = node.read.all { _.getMetadataForWrite(id) }
+class LockingNodeSet(node: NodeSet[Shard]) extends OptimisticStateMonitor {
+ def getMetadatas(id: Long) = node.all { _.getMetadataForWrite(id) }
}
@@ -37,8 +37,7 @@ class BlackHoleLockingRegressionSpec extends IntegrationSpecification {
flock.nameServer.reload()
val rootQueryEvaluator = config.edgesQueryEvaluator()(config.databaseConnection.withoutDatabase)
- //rootQueryEvaluator.execute("DROP DATABASE IF EXISTS " + config.databaseConnection.database)
- val queryEvaluator = config.edgesQueryEvaluator()(config.databaseConnection)
+ val queryEvaluator = config.edgesQueryEvaluator()(config.databaseConnection)
for (graph <- (1 until 10)) {
Seq("forward", "backward").foreach { direction =>
@@ -130,7 +129,7 @@ class BlackHoleLockingRegressionSpec extends IntegrationSpecification {
val scheduler = flock.jobScheduler(flockdb.Priority.High.id)
val errors = scheduler.errorQueue
- errors.size must eventually(be(10))
+ alicesFollowings.size must eventually(be(10))
}
}

0 comments on commit 8900674

Please sign in to comment.