Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
refactor single job internals
Browse files Browse the repository at this point in the history
  • Loading branch information
freels committed Aug 31, 2011
1 parent 30624f3 commit eea85eb
Showing 1 changed file with 25 additions and 22 deletions.
47 changes: 25 additions & 22 deletions src/main/scala/com/twitter/flockdb/jobs/single/Single.scala
Expand Up @@ -17,7 +17,7 @@
package com.twitter.flockdb.jobs.single

import com.twitter.logging.Logger
import com.twitter.util.Time
import com.twitter.util.{Time, Return, Throw}
import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.shards._
import com.twitter.conversions.time._
Expand Down Expand Up @@ -96,37 +96,40 @@ extends JsonJob {
val backward = forwardingManager.findNode(destinationId, graphId, Direction.Backward)
val uuid = uuidGenerator(position)

var currSuccesses: List[ShardId] = Nil
var currErrs: List[Throwable] = Nil

forward.optimistically(sourceId) { left =>
backward.optimistically(destinationId) { right =>
write(forward.write, backward.write, uuid, left max right max preferredState)
val state = left max right max preferredState
val forwardResults = writeToShard(forward.write, sourceId, destinationId, uuid, state)
val backwardResults = writeToShard(backward.write, destinationId, sourceId, uuid, state)

List(forwardResults, backwardResults) foreach {
_ foreach {
case Return(id) => currSuccesses = id :: currSuccesses
case Throw(e) => currErrs = e :: currErrs
}
}
}
}

// add successful writes here, since we are only successful if an optimistic lock exception is not raised.
successes = successes ++ currSuccesses

currErrs.headOption foreach { e => throw e }
}

def writeToShard(shard: NodeSet[Shard], sourceId: Long, destinationId: Long, uuid: Long, state: State) = {
try {
def writeToShard(shards: NodeSet[Shard], sourceId: Long, destinationId: Long, uuid: Long, state: State) = {
shards.all { (shardId, shard) =>
state match {
case State.Normal => shard.foreach { _.add(sourceId, destinationId, uuid, updatedAt) }
case State.Removed => shard.foreach { _.remove(sourceId, destinationId, uuid, updatedAt) }
case State.Archived => shard.foreach { _.archive(sourceId, destinationId, uuid, updatedAt) }
case State.Negative => shard.foreach { _.negate(sourceId, destinationId, uuid, updatedAt) }
case State.Normal => shard.add(sourceId, destinationId, uuid, updatedAt)
case State.Removed => shard.remove(sourceId, destinationId, uuid, updatedAt)
case State.Archived => shard.archive(sourceId, destinationId, uuid, updatedAt)
case State.Negative => shard.negate(sourceId, destinationId, uuid, updatedAt)
}

None
} catch {
case e => Some(e)
}
}

def write(forward: NodeSet[Shard], backward: NodeSet[Shard], uuid: Long, state: State) {
val forwardErr = writeToShard(forward, sourceId, destinationId, uuid, state)
val backwardErr = writeToShard(backward, destinationId, sourceId, uuid, state)

// just eat ShardBlackHoleExceptions for either way, but throw any other
List(forwardErr, backwardErr).flatMap(_.toList).foreach {
case e: ShardBlackHoleException => ()
case e => throw e
shardId
}
}
}

0 comments on commit eea85eb

Please sign in to comment.