Permalink
Browse files

check graph ids before throwing an execute into the worker pool. if t…

…here are no forwardings for that graph, throw a flock exception immediately.
  • Loading branch information...
1 parent a18ca26 commit 13c9c327b54ad605a99a83592f60690e01eac18a Robey Pointer committed Jul 6, 2010
@@ -33,7 +33,7 @@ class EdgesService(val nameServer: NameServer[shards.Shard],
val schedule: PrioritizingJobScheduler,
future: Future, replicationFuture: Future) {
private val selectCompiler = new SelectCompiler(forwardingManager)
- private val executeCompiler = new ExecuteCompiler(schedule)
+ private val executeCompiler = new ExecuteCompiler(schedule, forwardingManager)
def shutdown() {
schedule.shutdown()
@@ -23,7 +23,7 @@ import com.twitter.gizzard.Future
import com.twitter.gizzard.jobs._
import com.twitter.gizzard.scheduler.{KestrelMessageQueue, JobScheduler, PrioritizingJobScheduler}
import com.twitter.gizzard.nameserver
-import com.twitter.gizzard.shards.{ShardInfo, ReplicatingShard}
+import com.twitter.gizzard.shards.{ShardException, ShardInfo, ReplicatingShard}
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.results.{Cursor, ResultWindow}
import com.twitter.ostrich.{Stats, W3CStats}
@@ -124,7 +124,12 @@ class FlockDB(val edges: EdgesService) extends thrift.FlockDB.Iface {
}
def execute(operations: thrift.ExecuteOperations) = {
- edges.execute(operations.fromThrift)
+ try {
+ edges.execute(operations.fromThrift)
+ } catch {
+ case e: ShardException =>
+ throw new FlockException(e.toString)
+ }
}
@deprecated
@@ -23,6 +23,7 @@ import shards.Shard
class ForwardingManager(nameServer: NameServer[Shard]) {
+ @throws(classOf[ShardException])
def find(sourceId: Long, graphId: Int, direction: Direction) = {
nameServer.findCurrentForwarding(translate(graphId, direction), sourceId)
}
@@ -35,6 +36,7 @@ class ForwardingManager(nameServer: NameServer[Shard]) {
}
}
+ @throws(classOf[ShardException])
def findCurrentForwarding(tableId: List[Int], id: Long): Shard = {
find(id, tableId(0), if (tableId(1) > 0) Direction.Forward else Direction.Backward)
}
@@ -19,6 +19,7 @@ package com.twitter.flockdb.queries
import scala.collection.mutable
import com.twitter.gizzard.jobs.{Schedulable, SchedulableWithTasks}
import com.twitter.gizzard.scheduler.PrioritizingJobScheduler
+import com.twitter.gizzard.shards.ShardException
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._
@@ -27,7 +28,8 @@ import jobs.multi
import flockdb.operations.{ExecuteOperations, ExecuteOperationType}
-class ExecuteCompiler(schedule: PrioritizingJobScheduler) {
+class ExecuteCompiler(schedule: PrioritizingJobScheduler, forwardingManager: ForwardingManager) {
+ @throws(classOf[ShardException])
def apply(program: ExecuteOperations) {
val now = Time.now
val operations = program.operations
@@ -39,6 +41,9 @@ class ExecuteCompiler(schedule: PrioritizingJobScheduler) {
val time = program.executeAt.map { x => Time(x.seconds) }.getOrElse(Time.now)
val position = op.position.getOrElse(Time.now.inMillis)
+ // force an exception for nonexistent graphs
+ forwardingManager.find(0, term.graphId, Direction.Forward)
+
results ++= (op.operationType match {
case ExecuteOperationType.Add =>
processDestinations(term) { (sourceId, destinationId) =>
@@ -34,6 +34,7 @@ class EdgesSpec extends ConfiguredSpecification with EdgesDatabase {
import StaticEdges._
val FOLLOWS = 1
+ val BORKEN = 900
val alice = 1L
val bob = 2L
@@ -50,18 +51,24 @@ class EdgesSpec extends ConfiguredSpecification with EdgesDatabase {
}
"add" in {
- flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
- val term = new QueryTerm(alice, FOLLOWS, true)
- term.setDestination_ids(List[Long](bob).pack)
- term.setState_ids(List[Int](State.Normal.id).toJavaList)
- val op = new SelectOperation(SelectOperationType.SimpleQuery)
- op.setTerm(term)
- val page = new Page(1, Cursor.Start.position)
- flock.select(List(op).toJavaList, page).ids.size must eventually(be_>(0))
- Time.advance(1.second)
- flock.execute(Select(alice, FOLLOWS, bob).remove.toThrift)
- flock.select(List(op).toJavaList, page).ids.size must eventually(be_==(0))
- flock.count(Select(alice, FOLLOWS, Nil).toThrift) mustEqual 0
+ "existing graph" in {
+ flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
+ val term = new QueryTerm(alice, FOLLOWS, true)
+ term.setDestination_ids(List[Long](bob).pack)
+ term.setState_ids(List[Int](State.Normal.id).toJavaList)
+ val op = new SelectOperation(SelectOperationType.SimpleQuery)
+ op.setTerm(term)
+ val page = new Page(1, Cursor.Start.position)
+ flock.select(List(op).toJavaList, page).ids.size must eventually(be_>(0))
+ Time.advance(1.second)
+ flock.execute(Select(alice, FOLLOWS, bob).remove.toThrift)
+ flock.select(List(op).toJavaList, page).ids.size must eventually(be_==(0))
+ flock.count(Select(alice, FOLLOWS, Nil).toThrift) mustEqual 0
+ }
+
+ "nonexistent graph" in {
+ flock.execute(Select(alice, BORKEN, bob).add.toThrift) must throwA[FlockException]
+ }
}
"remove" in {
@@ -53,6 +53,7 @@ object EdgesSpec extends ConfiguredSpecification with JMocker with ClassMocker {
Time.freeze()
val job = Add(bob, FOLLOWS, mary, Time.now.inMillis, Time.now)
expect {
+ one(forwardingManager).find(0, FOLLOWS, Direction.Forward)
one(scheduler).apply(Priority.High.id, new SchedulableWithTasks(List(job)))
}
flock.execute(Select(bob, FOLLOWS, mary).add.toThrift)
@@ -61,6 +62,7 @@ object EdgesSpec extends ConfiguredSpecification with JMocker with ClassMocker {
"add_at" in {
val job = Add(bob, FOLLOWS, mary, Time.now.inMillis, Time.now)
expect {
+ one(forwardingManager).find(0, FOLLOWS, Direction.Forward)
one(scheduler).apply(Priority.High.id, new SchedulableWithTasks(List(job)))
}
flock.execute(Select(bob, FOLLOWS, mary).addAt(Time.now).toThrift)
@@ -70,6 +72,7 @@ object EdgesSpec extends ConfiguredSpecification with JMocker with ClassMocker {
Time.freeze()
val job = Remove(bob, FOLLOWS, mary, Time.now.inMillis, Time.now)
expect {
+ one(forwardingManager).find(0, FOLLOWS, Direction.Forward)
one(scheduler).apply(Priority.High.id, new SchedulableWithTasks(List(job)))
}
flock.execute(Select(bob, FOLLOWS, mary).remove.toThrift)
@@ -78,6 +81,7 @@ object EdgesSpec extends ConfiguredSpecification with JMocker with ClassMocker {
"remove_at" in {
val job = Remove(bob, FOLLOWS, mary, Time.now.inMillis, Time.now)
expect {
+ one(forwardingManager).find(0, FOLLOWS, Direction.Forward)
one(scheduler).apply(Priority.High.id, new SchedulableWithTasks(List(job)))
}
flock.execute(Select(bob, FOLLOWS, mary).removeAt(Time.now).toThrift)
Oops, something went wrong.

0 comments on commit 13c9c32

Please sign in to comment.