Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
allow write-only shards to make progress with edge writes.
Browse files Browse the repository at this point in the history
  • Loading branch information
freels committed Oct 21, 2011
1 parent ab0b883 commit 8900674
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
10 changes: 5 additions & 5 deletions src/main/scala/com/twitter/flockdb/jobs/single/Single.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.twitter.conversions.time._
import com.twitter.flockdb.{State, ForwardingManager, Cursor, UuidGenerator, Direction}
import com.twitter.flockdb.conversions.Numeric._
import com.twitter.flockdb.shards.Shard
import com.twitter.flockdb.shards.LockingRoutingNode._
import com.twitter.flockdb.shards.LockingNodeSet._


class SingleJobParser(
Expand Down Expand Up @@ -92,8 +92,8 @@ extends JsonJob {
}

def apply() = {
val forward = forwardingManager.findNode(sourceId, graphId, Direction.Forward)
val backward = forwardingManager.findNode(destinationId, graphId, Direction.Backward)
val forward = forwardingManager.findNode(sourceId, graphId, Direction.Forward).write
val backward = forwardingManager.findNode(destinationId, graphId, Direction.Backward).write
val uuid = uuidGenerator(position)

var currSuccesses: List[ShardId] = Nil
Expand All @@ -102,8 +102,8 @@ extends JsonJob {
forward.optimistically(sourceId) { left =>
backward.optimistically(destinationId) { right =>
val state = left max right max preferredState
val forwardResults = writeToShard(forward.write, sourceId, destinationId, uuid, state)
val backwardResults = writeToShard(backward.write, destinationId, sourceId, uuid, state)
val forwardResults = writeToShard(forward, sourceId, destinationId, uuid, state)
val backwardResults = writeToShard(backward, destinationId, sourceId, uuid, state)

List(forwardResults, backwardResults) foreach {
_ foreach {
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/twitter/flockdb/shards/Optimism.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package shards

import com.twitter.util.{Time, Try, Return, Throw}
import com.twitter.logging.Logger
import com.twitter.gizzard.shards.{ShardException, RoutingNode}
import com.twitter.gizzard.shards.{ShardException, NodeSet}

class OptimisticLockException(message: String) extends ShardException(message)

Expand Down Expand Up @@ -113,10 +113,10 @@ trait OptimisticStateMonitor {
}
}

object LockingRoutingNode {
implicit def asLockingRoutingNode(n: RoutingNode[Shard]) = new LockingRoutingNode(n)
object LockingNodeSet {
implicit def asLockingNodeSet(n: NodeSet[Shard]) = new LockingNodeSet(n)
}

class LockingRoutingNode(node: RoutingNode[Shard]) extends OptimisticStateMonitor {
def getMetadatas(id: Long) = node.read.all { _.getMetadataForWrite(id) }
class LockingNodeSet(node: NodeSet[Shard]) extends OptimisticStateMonitor {
def getMetadatas(id: Long) = node.all { _.getMetadataForWrite(id) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ class BlackHoleLockingRegressionSpec extends IntegrationSpecification {
flock.nameServer.reload()

val rootQueryEvaluator = config.edgesQueryEvaluator()(config.databaseConnection.withoutDatabase)
//rootQueryEvaluator.execute("DROP DATABASE IF EXISTS " + config.databaseConnection.database)
val queryEvaluator = config.edgesQueryEvaluator()(config.databaseConnection)
val queryEvaluator = config.edgesQueryEvaluator()(config.databaseConnection)

for (graph <- (1 until 10)) {
Seq("forward", "backward").foreach { direction =>
Expand Down Expand Up @@ -130,7 +129,7 @@ class BlackHoleLockingRegressionSpec extends IntegrationSpecification {

val scheduler = flock.jobScheduler(flockdb.Priority.High.id)
val errors = scheduler.errorQueue
errors.size must eventually(be(10))
alicesFollowings.size must eventually(be(10))
}
}

Expand Down

0 comments on commit 8900674

Please sign in to comment.