allow normal edges to have their position changed;\nthis requires Multi o #55

Closed
wants to merge 1 commit into
from
Jump to file or symbol
Failed to load files and symbols.
+96 −106
Diff settings

Always

Just for now

@@ -34,11 +34,12 @@ class EdgesService(val nameServer: NameServer[shards.Shard],
val schedule: PrioritizingJobScheduler,
future: Future,
intersectionQueryConfig: config.IntersectionQuery,
- aggregateJobsPageSize: Int) {
+ aggregateJobsPageSize: Int,
+ uuidGenerator: UuidGenerator) {
private val log = Logger.get(getClass.getName)
private val selectCompiler = new SelectCompiler(forwardingManager, intersectionQueryConfig)
- private var executeCompiler = new ExecuteCompiler(schedule, forwardingManager, aggregateJobsPageSize)
+ private var executeCompiler = new ExecuteCompiler(schedule, forwardingManager, aggregateJobsPageSize, uuidGenerator)
def shutdown() {
schedule.shutdown()
@@ -70,8 +70,8 @@ class FlockDB(config: FlockDBConfig, w3c: W3CStats) extends GizzardServer[shards
val copyPriority = Priority.Medium.id
val copyFactory = new jobs.CopyFactory(nameServer, jobScheduler(Priority.Medium.id))
- override val repairFactory = new jobs.RepairFactory(nameServer, jobScheduler)
- override val diffFactory = new jobs.DiffFactory(nameServer, jobScheduler)
+ override val repairFactory = new jobs.RepairFactory(nameServer, jobScheduler, OrderedUuidGenerator)
+ override val diffFactory = new jobs.DiffFactory(nameServer, jobScheduler, OrderedUuidGenerator)
val dbQueryEvaluatorFactory = config.edgesQueryEvaluator(stats)
val materializingQueryEvaluatorFactory = config.materializingQueryEvaluator(stats)
@@ -87,23 +87,23 @@ class FlockDB(config: FlockDBConfig, w3c: W3CStats) extends GizzardServer[shards
jobCodec += ("single.Remove".r, new jobs.single.RemoveParser(forwardingManager, OrderedUuidGenerator))
jobCodec += ("single.Archive".r, new jobs.single.ArchiveParser(forwardingManager, OrderedUuidGenerator))
jobCodec += ("single.Negate".r, new jobs.single.NegateParser(forwardingManager, OrderedUuidGenerator))
- jobCodec += ("multi.Archive".r, new jobs.multi.ArchiveParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
- jobCodec += ("multi.Unarchive".r, new jobs.multi.UnarchiveParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
- jobCodec += ("multi.RemoveAll".r, new jobs.multi.RemoveAllParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
- jobCodec += ("multi.Negate".r, new jobs.multi.NegateParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
+ jobCodec += ("multi.Archive".r, new jobs.multi.ArchiveParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, OrderedUuidGenerator))
+ jobCodec += ("multi.Unarchive".r, new jobs.multi.UnarchiveParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, OrderedUuidGenerator))
+ jobCodec += ("multi.RemoveAll".r, new jobs.multi.RemoveAllParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, OrderedUuidGenerator))
+ jobCodec += ("multi.Negate".r, new jobs.multi.NegateParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, OrderedUuidGenerator))
jobCodec += ("jobs\\.(Copy|Migrate)".r, new jobs.CopyParser(nameServer, jobScheduler(Priority.Medium.id)))
jobCodec += ("jobs\\.(MetadataCopy|MetadataMigrate)".r, new jobs.MetadataCopyParser(nameServer, jobScheduler(Priority.Medium.id)))
jobCodec += ("jobs.Repair".r, new jobs.RepairParser(nameServer, jobScheduler))
- jobCodec += ("jobs.MetadataRepair".r, new jobs.MetadataRepairParser(nameServer, jobScheduler))
+ jobCodec += ("jobs.MetadataRepair".r, new jobs.MetadataRepairParser(nameServer, jobScheduler, OrderedUuidGenerator))
jobCodec += ("jobs.Diff".r, new jobs.DiffParser(nameServer, jobScheduler))
- jobCodec += ("jobs.MetadataDiff".r, new jobs.MetadataDiffParser(nameServer, jobScheduler))
+ jobCodec += ("jobs.MetadataDiff".r, new jobs.MetadataDiffParser(nameServer, jobScheduler, OrderedUuidGenerator))
val flockService = {
val future = config.readFuture("readFuture")
- val edges = new EdgesService(nameServer, forwardingManager, copyFactory, jobScheduler, future, config.intersectionQuery, config.aggregateJobsPageSize)
+ val edges = new EdgesService(nameServer, forwardingManager, copyFactory, jobScheduler, future, config.intersectionQuery, config.aggregateJobsPageSize, OrderedUuidGenerator)
val scheduler = jobScheduler
new FlockDBThriftAdapter(edges, scheduler)
}
@@ -47,15 +47,16 @@ case class Metadata(sourceId: Long, state: State, count: Int, updatedAtSeconds:
def max(other: Metadata) = if (this > other) this else other
- def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int) = {
+ def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int, uuidGenerator: UuidGenerator) = {
val job = state match {
case State.Normal => Unarchive
case State.Removed => RemoveAll
case State.Archived => Archive
case State.Negative => Negate
}
- scheduler.put(priority, job(sourceId, tableId, if (tableId > 0) Direction.Forward else Direction.Backward, updatedAt, Priority.Medium, 500, forwardingManager, scheduler))
+ scheduler.put(
+ priority, job(sourceId, tableId, if (tableId > 0) Direction.Forward else Direction.Backward, updatedAt, Priority.Medium, 500, forwardingManager, scheduler, uuidGenerator))
}
def similar(other: Metadata) = {
@@ -29,10 +29,10 @@ import com.twitter.gizzard.shards.{ShardDatabaseTimeoutException, ShardTimeoutEx
import collection.mutable.ListBuffer
import shards.{Shard}
-class DiffFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
+class DiffFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends RepairJobFactory[Shard] {
override def apply(shardIds: Seq[ShardId]) = {
- new MetadataDiff(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler)
+ new MetadataDiff(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler, uuidGenerator)
}
}
@@ -69,17 +69,17 @@ class Diff(shardIds: Seq[ShardId], cursor: Repair.RepairCursor, count: Int,
}
}
-class MetadataDiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
- extends MetadataRepairParser(nameServer, scheduler) {
+class MetadataDiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends MetadataRepairParser(nameServer, scheduler, uuidGenerator) {
override def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
- new MetadataDiff(shardIds, cursor, count, nameServer, scheduler)
+ new MetadataDiff(shardIds, cursor, count, nameServer, scheduler, uuidGenerator)
}
}
class MetadataDiff(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor, count: Int,
- nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
- extends MetadataRepair(shardIds, cursor, count, nameServer, scheduler) {
+ nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends MetadataRepair(shardIds, cursor, count, nameServer, scheduler, uuidGenerator) {
private val log = Logger.get(getClass.getName)
@@ -96,7 +96,7 @@ class MetadataDiff(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor,
override def nextRepair(lowestCursor: MetadataRepair.RepairCursor) = {
Some(lowestCursor match {
case MetadataRepair.END => new Diff(shardIds, Repair.START, Repair.COUNT, nameServer, scheduler)
- case _ => new MetadataDiff(shardIds, lowestCursor, count, nameServer, scheduler)
+ case _ => new MetadataDiff(shardIds, lowestCursor, count, nameServer, scheduler, uuidGenerator)
})
}
}
@@ -37,10 +37,10 @@ object Repair {
val PRIORITY = Priority.Medium.id
}
-class RepairFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
+class RepairFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends RepairJobFactory[Shard] {
def apply(shardIds: Seq[ShardId]) = {
- new MetadataRepair(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler)
+ new MetadataRepair(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler, uuidGenerator)
}
}
@@ -131,16 +131,16 @@ object MetadataRepair {
val PRIORITY = Priority.Medium.id
}
-class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
+class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends RepairJobParser[Shard] {
def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
- new MetadataRepair(shardIds, cursor, count, nameServer, scheduler)
+ new MetadataRepair(shardIds, cursor, count, nameServer, scheduler, uuidGenerator)
}
}
class MetadataRepair(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor, count: Int,
- nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
+ nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends MultiShardRepair[Shard, Metadata, MetadataRepair.RepairCursor](shardIds, cursor, count, nameServer, scheduler, Repair.PRIORITY) {
private val log = Logger.get(getClass.getName)
@@ -150,16 +150,16 @@ class MetadataRepair(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor
def nextRepair(lowestCursor: MetadataRepair.RepairCursor) = {
Some(lowestCursor match {
case MetadataRepair.END => new Repair(shardIds, Repair.START, Repair.COUNT, nameServer, scheduler)
- case _ => new MetadataRepair(shardIds, lowestCursor, count, nameServer, scheduler)
+ case _ => new MetadataRepair(shardIds, lowestCursor, count, nameServer, scheduler, uuidGenerator)
})
}
def scheduleDifferent(list: (Shard, ListBuffer[Metadata], MetadataRepair.RepairCursor), tableId: Int, item: Metadata) = {
- item.schedule(tableId, forwardingManager, scheduler, priority)
+ item.schedule(tableId, forwardingManager, scheduler, priority, uuidGenerator)
}
def scheduleMissing(list: (Shard, ListBuffer[Metadata], MetadataRepair.RepairCursor), tableId: Int, item: Metadata) = {
- if (item.state != State.Normal || item.count != 0) item.schedule(tableId, forwardingManager, scheduler, priority)
+ if (item.state != State.Normal || item.count != 0) item.schedule(tableId, forwardingManager, scheduler, priority, uuidGenerator)
}
def scheduleBulk(otherShards: Seq[Shard], items: Seq[Metadata]) = {
@@ -25,6 +25,7 @@ import com.twitter.util.TimeConversions._
import net.lag.configgy.Configgy
import conversions.Numeric._
import shards.Shard
+import State._
abstract class MultiJobParser extends JsonJobParser {
def apply(attributes: Map[String, Any]): JsonJob = {
@@ -40,33 +41,33 @@ abstract class MultiJobParser extends JsonJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value): Multi
}
-class ArchiveParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int) extends MultiJobParser {
+class ArchiveParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int, uuidGenerator: UuidGenerator) extends MultiJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value) = {
- new Archive(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler)
+ new Archive(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator)
}
}
-class UnarchiveParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int) extends MultiJobParser {
+class UnarchiveParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int, uuidGenerator: UuidGenerator) extends MultiJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value) = {
- new Unarchive(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler)
+ new Unarchive(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator)
}
}
-class RemoveAllParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int) extends MultiJobParser {
+class RemoveAllParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int, uuidGenerator: UuidGenerator) extends MultiJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value) = {
- new RemoveAll(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler)
+ new RemoveAll(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator)
}
}
-class NegateParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int) extends MultiJobParser {
+class NegateParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int, uuidGenerator: UuidGenerator) extends MultiJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value) = {
- new Negate(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler)
+ new Negate(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator)
}
}
abstract class Multi(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends JsonJob {
private val config = Configgy.config
@@ -82,56 +83,59 @@ abstract class Multi(sourceId: Long, graphId: Int, direction: Direction, updated
case e: ShardBlackHoleException =>
return
}
+ val states = Seq(Normal, Archived, Negative) // Removed edges are never bulk-updated
while (cursor != Cursor.End) {
- val resultWindow = forwardShard.selectIncludingArchived(sourceId, aggregateJobPageSize, cursor)
+ val resultWindow = forwardShard.selectEdges(sourceId, states, aggregateJobPageSize, cursor)
- val chunkOfTasks = resultWindow.map { destinationId =>
+ val chunkOfTasks = resultWindow.map { edge =>
+ val destinationId = edge.destinationId
val (a, b) = if (direction == Direction.Backward) (destinationId, sourceId) else (sourceId, destinationId)
- update(a, graphId, b)
+ val uuidGenerator(uuid) = edge.position
+ update(a, graphId, b, uuid)
}
scheduler.put(priority.id, new JsonNestedJob(chunkOfTasks))
cursor = resultWindow.nextCursor
}
}
- protected def update(sourceId: Long, graphId: Int, destinationId: Long): JsonJob
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long): JsonJob
protected def updateMetadata(shard: Shard)
}
case class Archive(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
- extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler) {
- protected def update(sourceId: Long, graphId: Int, destinationId: Long) =
- new jobs.single.Archive(sourceId, graphId, destinationId, updatedAt.inMillis, updatedAt, null, null)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator) {
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long) =
+ new jobs.single.Archive(sourceId, graphId, destinationId, position, updatedAt, null, null)
protected def updateMetadata(shard: Shard) = shard.archive(sourceId, updatedAt)
}
case class Unarchive(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
- extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler) {
- protected def update(sourceId: Long, graphId: Int, destinationId: Long) = {
- new jobs.single.Add(sourceId, graphId, destinationId, updatedAt.inMillis, updatedAt, null, null)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator) {
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long) = {
+ new jobs.single.Add(sourceId, graphId, destinationId, position, updatedAt, null, null)
}
protected def updateMetadata(shard: Shard) = shard.add(sourceId, updatedAt)
}
case class RemoveAll(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
- extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler) {
- protected def update(sourceId: Long, graphId: Int, destinationId: Long) =
- new jobs.single.Remove(sourceId, graphId, destinationId, updatedAt.inMillis, updatedAt, null, null)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator) {
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long) =
+ new jobs.single.Remove(sourceId, graphId, destinationId, position, updatedAt, null, null)
protected def updateMetadata(shard: Shard) = shard.remove(sourceId, updatedAt)
}
case class Negate(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
- extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler) {
- protected def update(sourceId: Long, graphId: Int, destinationId: Long) =
- new jobs.single.Negate(sourceId, graphId, destinationId, updatedAt.inMillis, updatedAt, null, null)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator) {
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long) =
+ new jobs.single.Negate(sourceId, graphId, destinationId, position, updatedAt, null, null)
protected def updateMetadata(shard: Shard) = shard.negate(sourceId, updatedAt)
}
Oops, something went wrong.