Permalink
Browse files

make exceptions go to logs, ostrich, thrift appropriately

  • Loading branch information...
1 parent 9bc8391 commit f70aa0832f11f10b85c67df495630719eed2f00e @fizx fizx committed Oct 18, 2010
Showing with 58 additions and 24 deletions.
  1. +58 −24 src/main/scala/com/twitter/flockdb/EdgesService.scala
@@ -24,15 +24,18 @@ import com.twitter.gizzard.shards.ShardBlackHoleException
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.results.{Cursor, ResultWindow}
import operations.{ExecuteOperations, SelectOperation}
+import com.twitter.ostrich.Stats
import queries._
import thrift.FlockException
-
+import net.lag.logging.Logger
class EdgesService(val nameServer: NameServer[shards.Shard],
val forwardingManager: ForwardingManager,
val copyFactory: CopyFactory[shards.Shard],
val schedule: PrioritizingJobScheduler,
future: Future, replicationFuture: Future) {
+
+ private val log = Logger.get(getClass.getName)
private val selectCompiler = new SelectCompiler(forwardingManager)
private val executeCompiler = new ExecuteCompiler(schedule, forwardingManager)
@@ -43,52 +46,83 @@ class EdgesService(val nameServer: NameServer[shards.Shard],
}
def contains(sourceId: Long, graphId: Int, destinationId: Long): Boolean = {
- forwardingManager.find(sourceId, graphId, Direction.Forward).get(sourceId, destinationId).map { edge =>
- edge.state == State.Normal || edge.state == State.Negative
- }.getOrElse(false)
+ rethrowExceptionsAsThrift {
+ forwardingManager.find(sourceId, graphId, Direction.Forward).get(sourceId, destinationId).map { edge =>
+ edge.state == State.Normal || edge.state == State.Negative
+ }.getOrElse(false)
+ }
}
def get(sourceId: Long, graphId: Int, destinationId: Long): Edge = {
- forwardingManager.find(sourceId, graphId, Direction.Forward).get(sourceId, destinationId).getOrElse {
- throw new FlockException("Record not found: (%d, %d, %d)".format(sourceId, graphId, destinationId))
+ rethrowExceptionsAsThrift {
+ forwardingManager.find(sourceId, graphId, Direction.Forward).get(sourceId, destinationId).getOrElse {
+ throw new FlockException("Record not found: (%d, %d, %d)".format(sourceId, graphId, destinationId))
+ }
}
}
def select(query: SelectQuery): ResultWindow[Long] = select(List(query)).first
def select(queries: Seq[SelectQuery]): Seq[ResultWindow[Long]] = {
- queries.parallel(future).map { query =>
- try {
- selectCompiler(query.operations).select(query.page)
- } catch {
- case e: ShardBlackHoleException =>
- throw new FlockException("Shard is blackholed: " + e)
+ rethrowExceptionsAsThrift {
+ queries.parallel(future).map { query =>
+ try {
+ selectCompiler(query.operations).select(query.page)
+ } catch {
+ case e: ShardBlackHoleException =>
+ throw new FlockException("Shard is blackholed: " + e)
+ }
}
}
}
def selectEdges(queries: Seq[EdgeQuery]): Seq[ResultWindow[Edge]] = {
- queries.parallel(future).map { query =>
- val term = query.term
- val shard = forwardingManager.find(term.sourceId, term.graphId, Direction(term.isForward))
- val states = if (term.states.isEmpty) List(State.Normal) else term.states
+ rethrowExceptionsAsThrift {
+ queries.parallel(future).map { query =>
+ val term = query.term
+ val shard = forwardingManager.find(term.sourceId, term.graphId, Direction(term.isForward))
+ val states = if (term.states.isEmpty) List(State.Normal) else term.states
- if (term.destinationIds.isDefined) {
- val results = shard.intersectEdges(term.sourceId, states, term.destinationIds.get)
- new ResultWindow(results.map { edge => (edge, Cursor(edge.destinationId)) }, query.page.count, query.page.cursor)
- } else {
- shard.selectEdges(term.sourceId, states, query.page.count, query.page.cursor)
+ if (term.destinationIds.isDefined) {
+ val results = shard.intersectEdges(term.sourceId, states, term.destinationIds.get)
+ new ResultWindow(results.map { edge => (edge, Cursor(edge.destinationId)) }, query.page.count, query.page.cursor)
+ } else {
+ shard.selectEdges(term.sourceId, states, query.page.count, query.page.cursor)
+ }
}
}
}
def execute(operations: ExecuteOperations) {
- executeCompiler(operations)
+ rethrowExceptionsAsThrift {
+ executeCompiler(operations)
+ }
}
def count(queries: Seq[Seq[SelectOperation]]): Seq[Int] = {
- queries.parallel(future).map { query =>
- selectCompiler(query).sizeEstimate
+ rethrowExceptionsAsThrift {
+ queries.parallel(future).map { query =>
+ selectCompiler(query).sizeEstimate
+ }
+ }
+ }
+
+ private def countAndRethrow(e: Throwable) = {
+ Stats.incr(e.getClass.getName)
+ throw(new FlockException(e.getMessage))
+ }
+
+ private def rethrowExceptionsAsThrift[A](block: => A): A = {
+ try {
+ block
+ } catch {
+ case e: FlockException => throw(e)
+ case e: com.twitter.gizzard.shards.ShardTimeoutException => countAndRethrow(e)
+ case e: com.twitter.gizzard.shards.ShardDatabaseTimeoutException => countAndRethrow(e)
+ case e: Throwable => {
+ log.error(e, "Unhandled error in EdgesService")
+ throw(new FlockException(e.getMessage))
+ }
}
}
}

0 comments on commit f70aa08

Please sign in to comment.