Skip to content

Loading…

allow normal edges to have their position changed;\nthis requires Multi o #55

Open
wants to merge 1 commit into from

2 participants

@nkallen

allow normal edges to have their position changed;
this requires Multi operations to select full edges and propagate positions so that we don't reorder when we archive/unarchive;
furthermore, this requires position-decoding (uuid unapply) to be available in all bulk jobs so this was a hellish DI propagation;
a longer-term refactoring is to have a true uuid generator that doesn't require retries;
then, the uuid generator would only be needed in the execute compiler

Nick Kallen allow normal edges to have their position changed;\nthis requires Mul…
…ti operations to select full edges and propagate positions so that we don't reorder when we archive/unarchive;\nfurthermore, this requires position-decoding (uuid unapply) to be available in all bulk jobs so this was a hellish DI propagation;\na longer-term refactoring is to have a true uuid generator that doesn't require retries;

then, the uuid generator would only be needed in the execute compiler
4a3e5f3
@eaceaser eaceaser commented on the diff
src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
@@ -242,10 +236,12 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
}
def selectEdges(sourceId: Long, states: Seq[State], count: Int, cursor: Cursor) = {
+ require(!states.isEmpty, "must provide some states")
@eaceaser
eaceaser added a note

can we provide default behavior when the list is empty? we tend to not rely on runtime assertions.

@nkallen
nkallen added a note

I can default to all states or just normal, your pick. (note: It was broken before, I'm just making the error message nicer).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 2, 2011
  1. allow normal edges to have their position changed;\nthis requires Mul…

    Nick Kallen committed
    …ti operations to select full edges and propagate positions so that we don't reorder when we archive/unarchive;\nfurthermore, this requires position-decoding (uuid unapply) to be available in all bulk jobs so this was a hellish DI propagation;\na longer-term refactoring is to have a true uuid generator that doesn't require retries;
    
    then, the uuid generator would only be needed in the execute compiler
This page is out of date. Refresh to see the latest.
View
5 src/main/scala/com/twitter/flockdb/EdgesService.scala
@@ -34,11 +34,12 @@ class EdgesService(val nameServer: NameServer[shards.Shard],
val schedule: PrioritizingJobScheduler,
future: Future,
intersectionQueryConfig: config.IntersectionQuery,
- aggregateJobsPageSize: Int) {
+ aggregateJobsPageSize: Int,
+ uuidGenerator: UuidGenerator) {
private val log = Logger.get(getClass.getName)
private val selectCompiler = new SelectCompiler(forwardingManager, intersectionQueryConfig)
- private var executeCompiler = new ExecuteCompiler(schedule, forwardingManager, aggregateJobsPageSize)
+ private var executeCompiler = new ExecuteCompiler(schedule, forwardingManager, aggregateJobsPageSize, uuidGenerator)
def shutdown() {
schedule.shutdown()
View
18 src/main/scala/com/twitter/flockdb/FlockDB.scala
@@ -70,8 +70,8 @@ class FlockDB(config: FlockDBConfig, w3c: W3CStats) extends GizzardServer[shards
val copyPriority = Priority.Medium.id
val copyFactory = new jobs.CopyFactory(nameServer, jobScheduler(Priority.Medium.id))
- override val repairFactory = new jobs.RepairFactory(nameServer, jobScheduler)
- override val diffFactory = new jobs.DiffFactory(nameServer, jobScheduler)
+ override val repairFactory = new jobs.RepairFactory(nameServer, jobScheduler, OrderedUuidGenerator)
+ override val diffFactory = new jobs.DiffFactory(nameServer, jobScheduler, OrderedUuidGenerator)
val dbQueryEvaluatorFactory = config.edgesQueryEvaluator(stats)
val materializingQueryEvaluatorFactory = config.materializingQueryEvaluator(stats)
@@ -87,23 +87,23 @@ class FlockDB(config: FlockDBConfig, w3c: W3CStats) extends GizzardServer[shards
jobCodec += ("single.Remove".r, new jobs.single.RemoveParser(forwardingManager, OrderedUuidGenerator))
jobCodec += ("single.Archive".r, new jobs.single.ArchiveParser(forwardingManager, OrderedUuidGenerator))
jobCodec += ("single.Negate".r, new jobs.single.NegateParser(forwardingManager, OrderedUuidGenerator))
- jobCodec += ("multi.Archive".r, new jobs.multi.ArchiveParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
- jobCodec += ("multi.Unarchive".r, new jobs.multi.UnarchiveParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
- jobCodec += ("multi.RemoveAll".r, new jobs.multi.RemoveAllParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
- jobCodec += ("multi.Negate".r, new jobs.multi.NegateParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
+ jobCodec += ("multi.Archive".r, new jobs.multi.ArchiveParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, OrderedUuidGenerator))
+ jobCodec += ("multi.Unarchive".r, new jobs.multi.UnarchiveParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, OrderedUuidGenerator))
+ jobCodec += ("multi.RemoveAll".r, new jobs.multi.RemoveAllParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, OrderedUuidGenerator))
+ jobCodec += ("multi.Negate".r, new jobs.multi.NegateParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, OrderedUuidGenerator))
jobCodec += ("jobs\\.(Copy|Migrate)".r, new jobs.CopyParser(nameServer, jobScheduler(Priority.Medium.id)))
jobCodec += ("jobs\\.(MetadataCopy|MetadataMigrate)".r, new jobs.MetadataCopyParser(nameServer, jobScheduler(Priority.Medium.id)))
jobCodec += ("jobs.Repair".r, new jobs.RepairParser(nameServer, jobScheduler))
- jobCodec += ("jobs.MetadataRepair".r, new jobs.MetadataRepairParser(nameServer, jobScheduler))
+ jobCodec += ("jobs.MetadataRepair".r, new jobs.MetadataRepairParser(nameServer, jobScheduler, OrderedUuidGenerator))
jobCodec += ("jobs.Diff".r, new jobs.DiffParser(nameServer, jobScheduler))
- jobCodec += ("jobs.MetadataDiff".r, new jobs.MetadataDiffParser(nameServer, jobScheduler))
+ jobCodec += ("jobs.MetadataDiff".r, new jobs.MetadataDiffParser(nameServer, jobScheduler, OrderedUuidGenerator))
val flockService = {
val future = config.readFuture("readFuture")
- val edges = new EdgesService(nameServer, forwardingManager, copyFactory, jobScheduler, future, config.intersectionQuery, config.aggregateJobsPageSize)
+ val edges = new EdgesService(nameServer, forwardingManager, copyFactory, jobScheduler, future, config.intersectionQuery, config.aggregateJobsPageSize, OrderedUuidGenerator)
val scheduler = jobScheduler
new FlockDBThriftAdapter(edges, scheduler)
}
View
5 src/main/scala/com/twitter/flockdb/Metadata.scala
@@ -47,7 +47,7 @@ case class Metadata(sourceId: Long, state: State, count: Int, updatedAtSeconds:
def max(other: Metadata) = if (this > other) this else other
- def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int) = {
+ def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int, uuidGenerator: UuidGenerator) = {
val job = state match {
case State.Normal => Unarchive
case State.Removed => RemoveAll
@@ -55,7 +55,8 @@ case class Metadata(sourceId: Long, state: State, count: Int, updatedAtSeconds:
case State.Negative => Negate
}
- scheduler.put(priority, job(sourceId, tableId, if (tableId > 0) Direction.Forward else Direction.Backward, updatedAt, Priority.Medium, 500, forwardingManager, scheduler))
+ scheduler.put(
+ priority, job(sourceId, tableId, if (tableId > 0) Direction.Forward else Direction.Backward, updatedAt, Priority.Medium, 500, forwardingManager, scheduler, uuidGenerator))
}
def similar(other: Metadata) = {
View
16 src/main/scala/com/twitter/flockdb/jobs/Diff.scala
@@ -29,10 +29,10 @@ import com.twitter.gizzard.shards.{ShardDatabaseTimeoutException, ShardTimeoutEx
import collection.mutable.ListBuffer
import shards.{Shard}
-class DiffFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
+class DiffFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends RepairJobFactory[Shard] {
override def apply(shardIds: Seq[ShardId]) = {
- new MetadataDiff(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler)
+ new MetadataDiff(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler, uuidGenerator)
}
}
@@ -69,17 +69,17 @@ class Diff(shardIds: Seq[ShardId], cursor: Repair.RepairCursor, count: Int,
}
}
-class MetadataDiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
- extends MetadataRepairParser(nameServer, scheduler) {
+class MetadataDiffParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends MetadataRepairParser(nameServer, scheduler, uuidGenerator) {
override def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
- new MetadataDiff(shardIds, cursor, count, nameServer, scheduler)
+ new MetadataDiff(shardIds, cursor, count, nameServer, scheduler, uuidGenerator)
}
}
class MetadataDiff(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor, count: Int,
- nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
- extends MetadataRepair(shardIds, cursor, count, nameServer, scheduler) {
+ nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends MetadataRepair(shardIds, cursor, count, nameServer, scheduler, uuidGenerator) {
private val log = Logger.get(getClass.getName)
@@ -96,7 +96,7 @@ class MetadataDiff(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor,
override def nextRepair(lowestCursor: MetadataRepair.RepairCursor) = {
Some(lowestCursor match {
case MetadataRepair.END => new Diff(shardIds, Repair.START, Repair.COUNT, nameServer, scheduler)
- case _ => new MetadataDiff(shardIds, lowestCursor, count, nameServer, scheduler)
+ case _ => new MetadataDiff(shardIds, lowestCursor, count, nameServer, scheduler, uuidGenerator)
})
}
}
View
16 src/main/scala/com/twitter/flockdb/jobs/Repair.scala
@@ -37,10 +37,10 @@ object Repair {
val PRIORITY = Priority.Medium.id
}
-class RepairFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
+class RepairFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends RepairJobFactory[Shard] {
def apply(shardIds: Seq[ShardId]) = {
- new MetadataRepair(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler)
+ new MetadataRepair(shardIds, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler, uuidGenerator)
}
}
@@ -131,16 +131,16 @@ object MetadataRepair {
val PRIORITY = Priority.Medium.id
}
-class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
+class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends RepairJobParser[Shard] {
def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int) = {
val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
- new MetadataRepair(shardIds, cursor, count, nameServer, scheduler)
+ new MetadataRepair(shardIds, cursor, count, nameServer, scheduler, uuidGenerator)
}
}
class MetadataRepair(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor, count: Int,
- nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler)
+ nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends MultiShardRepair[Shard, Metadata, MetadataRepair.RepairCursor](shardIds, cursor, count, nameServer, scheduler, Repair.PRIORITY) {
private val log = Logger.get(getClass.getName)
@@ -150,16 +150,16 @@ class MetadataRepair(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor
def nextRepair(lowestCursor: MetadataRepair.RepairCursor) = {
Some(lowestCursor match {
case MetadataRepair.END => new Repair(shardIds, Repair.START, Repair.COUNT, nameServer, scheduler)
- case _ => new MetadataRepair(shardIds, lowestCursor, count, nameServer, scheduler)
+ case _ => new MetadataRepair(shardIds, lowestCursor, count, nameServer, scheduler, uuidGenerator)
})
}
def scheduleDifferent(list: (Shard, ListBuffer[Metadata], MetadataRepair.RepairCursor), tableId: Int, item: Metadata) = {
- item.schedule(tableId, forwardingManager, scheduler, priority)
+ item.schedule(tableId, forwardingManager, scheduler, priority, uuidGenerator)
}
def scheduleMissing(list: (Shard, ListBuffer[Metadata], MetadataRepair.RepairCursor), tableId: Int, item: Metadata) = {
- if (item.state != State.Normal || item.count != 0) item.schedule(tableId, forwardingManager, scheduler, priority)
+ if (item.state != State.Normal || item.count != 0) item.schedule(tableId, forwardingManager, scheduler, priority, uuidGenerator)
}
def scheduleBulk(otherShards: Seq[Shard], items: Seq[Metadata]) = {
View
62 src/main/scala/com/twitter/flockdb/jobs/multi/Multi.scala
@@ -25,6 +25,7 @@ import com.twitter.util.TimeConversions._
import net.lag.configgy.Configgy
import conversions.Numeric._
import shards.Shard
+import State._
abstract class MultiJobParser extends JsonJobParser {
def apply(attributes: Map[String, Any]): JsonJob = {
@@ -40,33 +41,33 @@ abstract class MultiJobParser extends JsonJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value): Multi
}
-class ArchiveParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int) extends MultiJobParser {
+class ArchiveParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int, uuidGenerator: UuidGenerator) extends MultiJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value) = {
- new Archive(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler)
+ new Archive(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator)
}
}
-class UnarchiveParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int) extends MultiJobParser {
+class UnarchiveParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int, uuidGenerator: UuidGenerator) extends MultiJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value) = {
- new Unarchive(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler)
+ new Unarchive(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator)
}
}
-class RemoveAllParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int) extends MultiJobParser {
+class RemoveAllParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int, uuidGenerator: UuidGenerator) extends MultiJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value) = {
- new RemoveAll(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler)
+ new RemoveAll(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator)
}
}
-class NegateParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int) extends MultiJobParser {
+class NegateParser(forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, aggregateJobPageSize: Int, uuidGenerator: UuidGenerator) extends MultiJobParser {
protected def createJob(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time, priority: Priority.Value) = {
- new Negate(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler)
+ new Negate(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator)
}
}
abstract class Multi(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
extends JsonJob {
private val config = Configgy.config
@@ -82,12 +83,15 @@ abstract class Multi(sourceId: Long, graphId: Int, direction: Direction, updated
case e: ShardBlackHoleException =>
return
}
+ val states = Seq(Normal, Archived, Negative) // Removed edges are never bulk-updated
while (cursor != Cursor.End) {
- val resultWindow = forwardShard.selectIncludingArchived(sourceId, aggregateJobPageSize, cursor)
+ val resultWindow = forwardShard.selectEdges(sourceId, states, aggregateJobPageSize, cursor)
- val chunkOfTasks = resultWindow.map { destinationId =>
+ val chunkOfTasks = resultWindow.map { edge =>
+ val destinationId = edge.destinationId
val (a, b) = if (direction == Direction.Backward) (destinationId, sourceId) else (sourceId, destinationId)
- update(a, graphId, b)
+ val uuidGenerator(uuid) = edge.position
+ update(a, graphId, b, uuid)
}
scheduler.put(priority.id, new JsonNestedJob(chunkOfTasks))
@@ -95,43 +99,43 @@ abstract class Multi(sourceId: Long, graphId: Int, direction: Direction, updated
}
}
- protected def update(sourceId: Long, graphId: Int, destinationId: Long): JsonJob
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long): JsonJob
protected def updateMetadata(shard: Shard)
}
case class Archive(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
- extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler) {
- protected def update(sourceId: Long, graphId: Int, destinationId: Long) =
- new jobs.single.Archive(sourceId, graphId, destinationId, updatedAt.inMillis, updatedAt, null, null)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator) {
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long) =
+ new jobs.single.Archive(sourceId, graphId, destinationId, position, updatedAt, null, null)
protected def updateMetadata(shard: Shard) = shard.archive(sourceId, updatedAt)
}
case class Unarchive(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
- extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler) {
- protected def update(sourceId: Long, graphId: Int, destinationId: Long) = {
- new jobs.single.Add(sourceId, graphId, destinationId, updatedAt.inMillis, updatedAt, null, null)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator) {
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long) = {
+ new jobs.single.Add(sourceId, graphId, destinationId, position, updatedAt, null, null)
}
protected def updateMetadata(shard: Shard) = shard.add(sourceId, updatedAt)
}
case class RemoveAll(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
- extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler) {
- protected def update(sourceId: Long, graphId: Int, destinationId: Long) =
- new jobs.single.Remove(sourceId, graphId, destinationId, updatedAt.inMillis, updatedAt, null, null)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator) {
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long) =
+ new jobs.single.Remove(sourceId, graphId, destinationId, position, updatedAt, null, null)
protected def updateMetadata(shard: Shard) = shard.remove(sourceId, updatedAt)
}
case class Negate(sourceId: Long, graphId: Int, direction: Direction, updatedAt: Time,
priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager,
- scheduler: PrioritizingJobScheduler)
- extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler) {
- protected def update(sourceId: Long, graphId: Int, destinationId: Long) =
- new jobs.single.Negate(sourceId, graphId, destinationId, updatedAt.inMillis, updatedAt, null, null)
+ scheduler: PrioritizingJobScheduler, uuidGenerator: UuidGenerator)
+ extends Multi(sourceId, graphId, direction, updatedAt, priority, aggregateJobPageSize, forwardingManager, scheduler, uuidGenerator) {
+ protected def update(sourceId: Long, graphId: Int, destinationId: Long, position: Long) =
+ new jobs.single.Negate(sourceId, graphId, destinationId, position, updatedAt, null, null)
protected def updateMetadata(shard: Shard) = shard.negate(sourceId, updatedAt)
}
View
1 src/main/scala/com/twitter/flockdb/jobs/single/Single.scala
@@ -26,7 +26,6 @@ import shards.Shard
case class NodePair(sourceId: Long, destinationId: Long)
-
abstract class SingleJobParser extends JsonJobParser {
def apply(attributes: Map[String, Any]): JsonJob = {
val casted = attributes.asInstanceOf[Map[String, AnyVal]]
View
10 src/main/scala/com/twitter/flockdb/queries/ExecuteCompiler.scala
@@ -28,7 +28,7 @@ import jobs.multi
import operations.{ExecuteOperations, ExecuteOperationType}
-class ExecuteCompiler(scheduler: PrioritizingJobScheduler, forwardingManager: ForwardingManager, aggregateJobPageSize: Int) {
+class ExecuteCompiler(scheduler: PrioritizingJobScheduler, forwardingManager: ForwardingManager, aggregateJobPageSize: Int, uuidGenerator: UuidGenerator) {
@throws(classOf[ShardException])
def apply(program: ExecuteOperations) {
val now = Time.now
@@ -49,25 +49,25 @@ class ExecuteCompiler(scheduler: PrioritizingJobScheduler, forwardingManager: Fo
processDestinations(term) { (sourceId, destinationId) =>
single.Add(sourceId, term.graphId, destinationId, position, time, null, null)
} {
- multi.Unarchive(term.sourceId, term.graphId, Direction(term.isForward), time, program.priority, aggregateJobPageSize, null, null)
+ multi.Unarchive(term.sourceId, term.graphId, Direction(term.isForward), time, program.priority, aggregateJobPageSize, null, null, uuidGenerator)
}
case ExecuteOperationType.Remove =>
processDestinations(term) { (sourceId, destinationId) =>
single.Remove(sourceId, term.graphId, destinationId, position, time, null, null)
} {
- multi.RemoveAll(term.sourceId, term.graphId, Direction(term.isForward), time, program.priority, aggregateJobPageSize, null, null)
+ multi.RemoveAll(term.sourceId, term.graphId, Direction(term.isForward), time, program.priority, aggregateJobPageSize, null, null, uuidGenerator)
}
case ExecuteOperationType.Archive =>
processDestinations(term) { (sourceId, destinationId) =>
single.Archive(sourceId, term.graphId, destinationId, position, time, null, null)
} {
- multi.Archive(term.sourceId, term.graphId, Direction(term.isForward), time, program.priority, aggregateJobPageSize, null, null)
+ multi.Archive(term.sourceId, term.graphId, Direction(term.isForward), time, program.priority, aggregateJobPageSize, null, null, uuidGenerator)
}
case ExecuteOperationType.Negate =>
processDestinations(term) { (sourceId, destinationId) =>
single.Negate(sourceId, term.graphId, destinationId, position, time, null, null)
} {
- multi.Negate(term.sourceId, term.graphId, Direction(term.isForward), time, program.priority, aggregateJobPageSize, null, null)
+ multi.Negate(term.sourceId, term.graphId, Direction(term.isForward), time, program.priority, aggregateJobPageSize, null, null, uuidGenerator)
}
case n =>
throw new InvalidQueryException("Unknown operation " + n)
View
1 src/main/scala/com/twitter/flockdb/shards/ReadWriteShardAdapter.scala
@@ -24,7 +24,6 @@ import com.twitter.util.TimeConversions._
class ReadWriteShardAdapter(shard: shards.ReadWriteShard[Shard])
extends shards.ReadWriteShardAdapter(shard) with Shard with Optimism {
- def selectIncludingArchived(sourceId: Long, count: Int, cursor: Cursor) = shard.readOperation(_.selectIncludingArchived(sourceId, count, cursor))
def intersect(sourceId: Long, states: Seq[State], destinationIds: Seq[Long]) = shard.readOperation(_.intersect(sourceId, states, destinationIds))
def intersectEdges(sourceId: Long, states: Seq[State], destinationIds: Seq[Long]) = shard.readOperation(_.intersectEdges(sourceId, states, destinationIds))
def getMetadata(sourceId: Long) = shard.readOperation(_.getMetadata(sourceId))
View
1 src/main/scala/com/twitter/flockdb/shards/Shard.scala
@@ -39,7 +39,6 @@ trait Shard extends shards.Shard {
@throws(classOf[shards.ShardException]) def selectAll(cursor: (Cursor, Cursor), count: Int): (Seq[Edge], (Cursor, Cursor))
@throws(classOf[shards.ShardException]) def selectAllMetadata(cursor: Cursor, count: Int): (Seq[Metadata], Cursor)
- @throws(classOf[shards.ShardException]) def selectIncludingArchived(sourceId: Long, count: Int, cursor: Cursor): ResultWindow[Long]
@throws(classOf[shards.ShardException]) def selectByDestinationId(sourceId: Long, states: Seq[State], count: Int, cursor: Cursor): ResultWindow[Long]
@throws(classOf[shards.ShardException]) def selectByPosition(sourceId: Long, states: Seq[State], count: Int, cursor: Cursor): ResultWindow[Long]
@throws(classOf[shards.ShardException]) def selectEdges(sourceId: Long, states: Seq[State], count: Int, cursor: Cursor): ResultWindow[Edge]
View
16 src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
@@ -202,12 +202,6 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
List(sourceId, states.map(_.id).toList): _*)
}
- def selectIncludingArchived(sourceId: Long, count: Int, cursor: Cursor) = {
- select(SelectModify, "destination_id", "unique_source_id_destination_id", count, cursor,
- "source_id = ? AND state != ?",
- sourceId, Removed.id)
- }
-
def selectByPosition(sourceId: Long, states: Seq[State], count: Int, cursor: Cursor) = {
select("position", "PRIMARY", count, cursor,
"source_id = ? AND state IN (?)",
@@ -242,10 +236,12 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
}
def selectEdges(sourceId: Long, states: Seq[State], count: Int, cursor: Cursor) = {
+ require(!states.isEmpty, "must provide some states")
@eaceaser
eaceaser added a note

can we provide default behavior when the list is empty? we tend to not rely on runtime assertions.

@nkallen
nkallen added a note

I can default to all states or just normal, your pick. (note: It was broken before, I'm just making the error message nicer).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
val conditions = "source_id = ? AND state IN (?)"
val order = if (cursor < Cursor.Start) "ASC" else "DESC"
val inequality = if (order == "DESC") "<" else ">"
- val args = sourceId :: states.map(_.id).toList
+ val args = sourceId :: List(states.map(_.id))
val (edgesQuery, args1) = query("*", "position", "PRIMARY", count + 1, cursor, order, inequality, conditions, args)
val (continueCursorQuery, args2) = query("*", "position", "PRIMARY", 1, cursor, opposite(order), opposite(inequality), conditions, args)
@@ -395,11 +391,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
oldEdge: Edge): Int = {
if ((oldEdge.updatedAtSeconds == edge.updatedAtSeconds) && (oldEdge.state max edge.state) != edge.state) return 0
- val updatedRows = if (
- oldEdge.state != Archived && // Only update position when coming from removed or negated into normal
- oldEdge.state != Normal &&
- edge.state == Normal
- ) {
+ val updatedRows = if (edge.state == Normal) { // only update position if Normal (current business rule)
transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " +
"position = ?, count = 0, state = ? " +
"WHERE source_id = ? AND destination_id = ? AND " +
View
2 src/test/scala/com/twitter/flockdb/integration/FlockFixRegressionSpec.scala
@@ -65,7 +65,7 @@ class FlockFixRegressionSpec extends IntegrationSpecification {
Thread.sleep(1000)
- val job = Unarchive(alice, FOLLOWS, Direction.Forward, Time.now, flockdb.Priority.High, pageSize, flock.edges.forwardingManager, flock.edges.schedule)
+ val job = Unarchive(alice, FOLLOWS, Direction.Forward, Time.now, flockdb.Priority.High, pageSize, flock.edges.forwardingManager, flock.edges.schedule, OrderedUuidGenerator)
job()
alicesFollowings().size must eventually(be(10))
View
2 src/test/scala/com/twitter/flockdb/unit/EdgesSpec.scala
@@ -50,7 +50,7 @@ object EdgesSpec extends ConfiguredSpecification with JMocker with ClassMocker {
val scheduler = mock[PrioritizingJobScheduler]
val future = mock[Future]
val copyFactory = mock[CopyJobFactory[Shard]]
- val flock = new FlockDBThriftAdapter(new EdgesService(nameServer, forwardingManager, copyFactory, scheduler, future, config.intersectionQuery, config.aggregateJobsPageSize), null)
+ val flock = new FlockDBThriftAdapter(new EdgesService(nameServer, forwardingManager, copyFactory, scheduler, future, config.intersectionQuery, config.aggregateJobsPageSize, IdentityUuidGenerator), null)
"add" in {
Time.withCurrentTimeFrozen { time =>
View
18 src/test/scala/com/twitter/flockdb/unit/ExecuteCompilerSpec.scala
@@ -57,7 +57,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla
doBefore {
scheduler = mock[PrioritizingJobScheduler]
forwardingManager = mock[ForwardingManager]
- executeCompiler = new ExecuteCompiler(scheduler, forwardingManager, config.aggregateJobsPageSize)
+ executeCompiler = new ExecuteCompiler(scheduler, forwardingManager, config.aggregateJobsPageSize, IdentityUuidGenerator)
}
"without execute_at present" in {
@@ -123,7 +123,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla
one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture)
}
executeCompiler(program)
- jsonMatching(List(multi.Unarchive(alice, FOLLOWS, Direction.Forward, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs)
+ jsonMatching(List(multi.Unarchive(alice, FOLLOWS, Direction.Forward, now, Priority.Low, config.aggregateJobsPageSize, null, null, IdentityUuidGenerator)), nestedJob.captured.jobs)
}
"backward" >> {
@@ -133,7 +133,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla
one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture)
}
executeCompiler(program)
- jsonMatching(List(multi.Unarchive(alice, FOLLOWS, Direction.Backward, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs)
+ jsonMatching(List(multi.Unarchive(alice, FOLLOWS, Direction.Backward, now, Priority.Low, config.aggregateJobsPageSize, null, null, IdentityUuidGenerator)), nestedJob.captured.jobs)
}
}
@@ -195,7 +195,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla
one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture)
}
executeCompiler(program)
- jsonMatching(List(multi.RemoveAll(alice, FOLLOWS, Direction.Forward, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs)
+ jsonMatching(List(multi.RemoveAll(alice, FOLLOWS, Direction.Forward, now, Priority.Low, config.aggregateJobsPageSize, null, null, IdentityUuidGenerator)), nestedJob.captured.jobs)
}
"backward" >> {
@@ -205,7 +205,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla
one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture)
}
executeCompiler(program)
- jsonMatching(List(multi.RemoveAll(alice, FOLLOWS, Direction.Backward, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs)
+ jsonMatching(List(multi.RemoveAll(alice, FOLLOWS, Direction.Backward, now, Priority.Low, config.aggregateJobsPageSize, null, null, IdentityUuidGenerator)), nestedJob.captured.jobs)
}
}
@@ -267,7 +267,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla
one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture)
}
executeCompiler(program)
- jsonMatching(List(multi.Archive(alice, FOLLOWS, Direction.Forward, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs)
+ jsonMatching(List(multi.Archive(alice, FOLLOWS, Direction.Forward, now, Priority.Low, config.aggregateJobsPageSize, null, null, IdentityUuidGenerator)), nestedJob.captured.jobs)
}
"backward" >> {
@@ -277,7 +277,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla
one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture)
}
executeCompiler(program)
- jsonMatching(List(multi.Archive(alice, FOLLOWS, Direction.Backward, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs)
+ jsonMatching(List(multi.Archive(alice, FOLLOWS, Direction.Backward, now, Priority.Low, config.aggregateJobsPageSize, null, null, IdentityUuidGenerator)), nestedJob.captured.jobs)
}
}
@@ -339,7 +339,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla
one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture)
}
executeCompiler(program)
- jsonMatching(List(multi.Negate(alice, FOLLOWS, Direction.Forward, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs)
+ jsonMatching(List(multi.Negate(alice, FOLLOWS, Direction.Forward, now, Priority.Low, config.aggregateJobsPageSize, null, null, IdentityUuidGenerator)), nestedJob.captured.jobs)
}
"backward" >> {
@@ -349,7 +349,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla
one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture)
}
executeCompiler(program)
- jsonMatching(List(multi.Negate(alice, FOLLOWS, Direction.Backward, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs)
+ jsonMatching(List(multi.Negate(alice, FOLLOWS, Direction.Backward, now, Priority.Low, config.aggregateJobsPageSize, null, null, IdentityUuidGenerator)), nestedJob.captured.jobs)
}
}
View
10 src/test/scala/com/twitter/flockdb/unit/JobSpec.scala
@@ -170,8 +170,8 @@ class JobSpec extends ConfiguredSpecification with JMocker with ClassMocker {
// Input Before After Resulting
// Job Bob Mary Bob Mary Job
test("normal archive", Archived, Normal, Normal, Normal, Normal, Archived, _.apply)
- test("archive removed", Archived, Normal, Removed, Normal, Removed, Removed, _.apply)
- test("archive removed", Archived, Removed, Normal, Removed, Normal, Removed, _.apply)
+ test("NOT archive removed", Archived, Normal, Removed, Normal, Removed, Removed, _.apply)
+ test("NOT archive removed", Archived, Removed, Normal, Removed, Normal, Removed, _.apply)
"toJson" in {
@@ -198,7 +198,7 @@ class JobSpec extends ConfiguredSpecification with JMocker with ClassMocker {
"toJson" in {
Time.withCurrentTimeFrozen { time =>
- val job = new jobs.multi.Archive(bob, FOLLOWS, Direction.Forward, Time.now, Priority.Low, config.aggregateJobsPageSize, forwardingManager, scheduler)
+ val job = new jobs.multi.Archive(bob, FOLLOWS, Direction.Forward, Time.now, Priority.Low, config.aggregateJobsPageSize, forwardingManager, scheduler, IdentityUuidGenerator)
val json = job.toJson
json mustMatch "Archive"
json mustMatch "\"source_id\":" + bob
@@ -220,7 +220,7 @@ class JobSpec extends ConfiguredSpecification with JMocker with ClassMocker {
"toJson" in {
Time.withCurrentTimeFrozen { time =>
- val job = new Unarchive(bob, FOLLOWS, Direction.Forward, Time.now, Priority.Low, config.aggregateJobsPageSize, forwardingManager, scheduler)
+ val job = new Unarchive(bob, FOLLOWS, Direction.Forward, Time.now, Priority.Low, config.aggregateJobsPageSize, forwardingManager, scheduler, IdentityUuidGenerator)
val json = job.toJson
json mustMatch "Unarchive"
json mustMatch "\"source_id\":" + bob
@@ -242,7 +242,7 @@ class JobSpec extends ConfiguredSpecification with JMocker with ClassMocker {
"toJson" in {
Time.withCurrentTimeFrozen { time =>
- val job = RemoveAll(bob, FOLLOWS, Direction.Backward, Time.now, Priority.Low, config.aggregateJobsPageSize, forwardingManager, scheduler)
+ val job = RemoveAll(bob, FOLLOWS, Direction.Backward, Time.now, Priority.Low, config.aggregateJobsPageSize, forwardingManager, scheduler, IdentityUuidGenerator)
val json = job.toJson
json mustMatch "RemoveAll"
json mustMatch "\"source_id\":" + bob
View
19 src/test/scala/com/twitter/flockdb/unit/SqlShardSpec.scala
@@ -295,21 +295,15 @@ class SqlShardSpec extends IntegrationSpecification with JMocker {
}
}
- "includingArchived" >> {
- shard.add(alice, bob, 1, now)
- shard.add(alice, carl, 2, now)
- shard.add(carl, darcy, 1, now)
- shard.archive(alice, earl, 1, now)
-
- shard.selectIncludingArchived(alice, 5, Cursor.Start).toThrift mustEqual new Results(List[Long](earl, carl, bob).pack, Cursor.End.position, Cursor.End.position)
- }
-
"get edge objects" >> {
shard.add(alice, bob, 3, now)
shard.add(alice, carl, 5, now)
+ shard.remove(alice, alice, 7, now)
val aliceBob = new Edge(alice, bob, 3, now, 0, State.Normal).toThrift
val aliceCarl = new Edge(alice, carl, 5, now, 0, State.Normal).toThrift
+ val aliceAlice = new Edge(alice, alice, 7, now, 0, State.Removed).toThrift
+
shard.selectEdges(alice, List(State.Normal), 1, Cursor.Start).toEdgeResults mustEqual new EdgeResults(List(aliceCarl), 5, Cursor.End.position)
shard.selectEdges(alice, List(State.Normal), 5, Cursor.Start).toEdgeResults mustEqual new EdgeResults(List(aliceCarl, aliceBob), Cursor.End.position, Cursor.End.position)
shard.selectEdges(alice, List(State.Normal), 1, Cursor(5)).toEdgeResults mustEqual new EdgeResults(List(aliceBob), Cursor.End.position, -3)
@@ -317,6 +311,8 @@ class SqlShardSpec extends IntegrationSpecification with JMocker {
shard.selectEdges(alice, List(State.Normal), 3, Cursor(4)).toEdgeResults mustEqual new EdgeResults(List(aliceBob), Cursor.End.position, -3)
shard.selectEdges(bob, List(State.Normal), 5, Cursor.Start).toEdgeResults mustEqual new EdgeResults(List[thrift.Edge](), Cursor.End.position, Cursor.End.position)
+ shard.selectEdges(alice, List(State.Normal, State.Removed), 3, Cursor.Start).toEdgeResults mustEqual new EdgeResults(List(aliceAlice, aliceCarl, aliceBob), Cursor.End.position, Cursor.End.position)
+
shard.selectEdges(alice, List(State.Normal), 1, Cursor(-5)).toEdgeResults mustEqual new EdgeResults(List[thrift.Edge](), Cursor.End.position, Cursor.End.position)
shard.selectEdges(alice, List(State.Normal), 1, Cursor(-3)).toEdgeResults mustEqual new EdgeResults(List(aliceCarl), 5, Cursor.End.position)
shard.selectEdges(alice, List(State.Normal), 1, Cursor(-4)).toEdgeResults mustEqual new EdgeResults(List(aliceCarl), 5, Cursor.End.position)
@@ -347,19 +343,18 @@ class SqlShardSpec extends IntegrationSpecification with JMocker {
"when the row already exists" >> {
"when the already-existing row is older than the row to be inserted" >> {
- // Flock-fix redefines a re-insert as a no-op
"when the already existing row is not deleted" >> {
shard.add(alice, bob, 1, now)
shard.add(alice, bob, 2, now + 10.seconds)
- shard.get(alice, bob) mustEqual Some(new Edge(alice, bob, 1, now + 10.seconds, 0, State.Normal))
+ shard.get(alice, bob) mustEqual Some(new Edge(alice, bob, 2, now + 10.seconds, 0, State.Normal))
}
"when the already existing row is not archived" >> {
shard.archive(alice, bob, 1, now)
shard.add(alice, bob, 2, now + 10.seconds)
- shard.get(alice, bob) mustEqual Some(new Edge(alice, bob, 1, now + 10.seconds, 0, State.Normal))
+ shard.get(alice, bob) mustEqual Some(new Edge(alice, bob, 2, now + 10.seconds, 0, State.Normal))
}
}
Something went wrong with that request. Please try again.