Permalink
Browse files

track state in a Multi job to prevent replay of multiple pages of work

  • Loading branch information...
1 parent 8fd19b3 commit 886f78f5f0948ac59a45123e9d653e4a06ebdf36 @freels freels committed Aug 31, 2011
Showing with 39 additions and 4 deletions.
  1. +39 −4 src/main/scala/com/twitter/flockdb/jobs/multi/Multi.scala
@@ -43,38 +43,65 @@ extends JsonJobParser {
Time.fromSeconds(casted("updated_at").toInt),
Priority(casted.get("priority").map(_.toInt).getOrElse(Priority.Low.id)),
aggregateJobPageSize,
+ casted.get("cursor").map( c => Cursor(c.toLong)).getOrElse(Cursor.Start),
forwardingManager,
scheduler
)
}
}
-case class Multi(
+class Multi(
sourceId: Long,
graphId: Int,
direction: Direction,
preferredState: State,
updatedAt: Time,
priority: Priority.Value,
aggregateJobPageSize: Int,
+ var cursor: Cursor,
forwardingManager: ForwardingManager,
scheduler: PrioritizingJobScheduler)
extends JsonJob {
+ def this(
+ sourceId: Long,
+ graphId: Int,
+ direction: Direction,
+ preferredState: State,
+ updatedAt: Time,
+ priority: Priority.Value,
+ aggregateJobPageSize: Int,
+ forwardingManager: ForwardingManager,
+ scheduler: PrioritizingJobScheduler
+ ) = {
+ this(
+ sourceId,
+ graphId,
+ direction,
+ preferredState,
+ updatedAt,
+ priority,
+ aggregateJobPageSize,
+ Cursor.Start,
+ forwardingManager,
+ scheduler
+ )
+ }
+
def toMap = Map(
"source_id" -> sourceId,
"updated_at" -> updatedAt.inSeconds,
"graph_id" -> graphId,
"direction" -> direction.id,
"priority" -> priority.id,
- "state" -> preferredState.id
+ "state" -> preferredState.id,
+ "cursor" -> cursor.position
)
def apply() {
- var cursor = Cursor.Start
val forwardShard = forwardingManager.find(sourceId, graphId, direction)
- try {
+ if (cursor == Cursor.Start) try {
updateMetadata(forwardShard, preferredState)
} catch {
case e: ShardBlackHoleException => return
@@ -90,6 +117,9 @@ extends JsonJob {
scheduler.put(priority.id, new JsonNestedJob(chunkOfTasks))
+ // "commit" the current iteration by saving the next cursor.
+ // if the job blows up in the next round, it will be re-serialized
+ // with this cursor.
cursor = resultWindow.nextCursor
}
}
@@ -105,4 +135,9 @@ extends JsonJob {
case State.Archived => shard.archive(sourceId, updatedAt)
case State.Negative => shard.negate(sourceId, updatedAt)
}
+
+ override def equals(o: Any) = o match {
+ case o: Multi => this.toMap == o.toMap
+ case _ => false
+ }
}

0 comments on commit 886f78f

Please sign in to comment.