diff --git a/config/development.scala b/config/development.scala index 2260b05e..3ec06391 100644 --- a/config/development.scala +++ b/config/development.scala @@ -124,6 +124,8 @@ new FlockDB { httpPort = Some(9990) } + override def zooKeeperSettings = Some(ZooKeeperSettings("localhost:2181", "/twitter/flock")) + loggers = List(new LoggerConfig { level = Some(Level.INFO) handlers = List( @@ -136,5 +138,9 @@ new FlockDB { } } ) + }, new LoggerConfig { + node = "filtered_jobs" + useParents = true + level = Level.INFO }) } diff --git a/project/build/FlockDBProject.scala b/project/build/FlockDBProject.scala index a1535677..2d34ae08 100644 --- a/project/build/FlockDBProject.scala +++ b/project/build/FlockDBProject.scala @@ -11,6 +11,15 @@ with SubversionPublisher { val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1" val gizzard = "com.twitter" % "gizzard" % "3.0.0-beta29" + val zookeeper = "org.apache.zookeeper" % "zookeeper" % "3.3.4" + val commonZk = "com.twitter.common" % "zookeeper" % "0.0.31" + + override def ivyXML = + + + + + val asm = "asm" % "asm" % "1.5.3" % "test" val cglib = "cglib" % "cglib" % "2.2" % "test" diff --git a/src/main/scala/com/twitter/flockdb/Edge.scala b/src/main/scala/com/twitter/flockdb/Edge.scala index 4e078acc..98b1268c 100644 --- a/src/main/scala/com/twitter/flockdb/Edge.scala +++ b/src/main/scala/com/twitter/flockdb/Edge.scala @@ -33,11 +33,11 @@ case class Edge(sourceId: Long, destinationId: Long, position: Long, updatedAtSe val updatedAt = Time.fromSeconds(updatedAtSeconds) - def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int) = { - scheduler.put(priority, toJob(tableId, forwardingManager)) + def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int, jobFilter: JobFilter) = { + scheduler.put(priority, toJob(tableId, forwardingManager, jobFilter)) } - def toJob(tableId: Int, forwardingManager: ForwardingManager) = { + def toJob(tableId: Int, forwardingManager: ForwardingManager, jobFilter: JobFilter) = { new Single( sourceId, tableId, @@ -46,7 +46,8 @@ case class Edge(sourceId: Long, destinationId: Long, position: Long, updatedAtSe state, updatedAt, forwardingManager, - OrderedUuidGenerator + OrderedUuidGenerator, + jobFilter ) } diff --git a/src/main/scala/com/twitter/flockdb/EdgesService.scala b/src/main/scala/com/twitter/flockdb/EdgesService.scala index e1493e16..bd9ab97a 100644 --- a/src/main/scala/com/twitter/flockdb/EdgesService.scala +++ b/src/main/scala/com/twitter/flockdb/EdgesService.scala @@ -31,6 +31,7 @@ import thrift.FlockException class EdgesService( forwardingManager: ForwardingManager, schedule: PrioritizingJobScheduler, + jobFilter: JobFilter, future: Future, intersectionQueryConfig: config.IntersectionQuery, aggregateJobsPageSize: Int) { @@ -38,7 +39,7 @@ class EdgesService( private val log = Logger.get(getClass.getName) private val exceptionLog = Logger.get("exception") private val selectCompiler = new SelectCompiler(forwardingManager, intersectionQueryConfig) - private var executeCompiler = new ExecuteCompiler(schedule, forwardingManager, aggregateJobsPageSize) + private var executeCompiler = new ExecuteCompiler(schedule, forwardingManager, jobFilter, aggregateJobsPageSize) def shutdown() { schedule.shutdown() diff --git a/src/main/scala/com/twitter/flockdb/FlockDB.scala b/src/main/scala/com/twitter/flockdb/FlockDB.scala index 094217ad..93b1b07d 100644 --- a/src/main/scala/com/twitter/flockdb/FlockDB.scala +++ b/src/main/scala/com/twitter/flockdb/FlockDB.scala @@ -16,6 +16,9 @@ package com.twitter.flockdb +import com.twitter.common.net.InetSocketAddressHelper +import com.twitter.common.quantity.Amount +import com.twitter.common.zookeeper.{ZooKeeperClient, ZooKeeperUtils} import com.twitter.util.Duration import com.twitter.ostrich.admin.Service import com.twitter.querulous.StatsCollector @@ -28,6 +31,8 @@ import com.twitter.flockdb.config.{FlockDB => FlockDBConfig} class FlockDB(config: FlockDBConfig) extends GizzardServer(config) with Service { + val FILTER_SET_ZK_PATH = "/filters" + object FlockExceptionWrappingProxyFactory extends ExceptionHandlingProxyFactory[thrift.FlockDB.Iface]({ (flock, e) => e match { case _: thrift.FlockException => @@ -48,6 +53,27 @@ class FlockDB(config: FlockDBConfig) extends GizzardServer(config) with Service override def addGauge(name: String)(gauge: => Double) { Stats.addGauge(name)(gauge) } } + val zkClientAndBasePath: Option[(ZooKeeperClient, String)] = + config.zooKeeperSettings.map { + zkConfig => { + log.info("Using ZooKeeper server " + zkConfig.server) + val zkServer = InetSocketAddressHelper.parse(zkConfig.server) + val zkClient = new ZooKeeperClient(ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT, zkServer) + (zkClient, zkConfig.basePath) + } + } + + val jobFilter: JobFilter = { + zkClientAndBasePath match { + case Some((zkClient, zkBasePath)) => { + val filterPath = zkBasePath + FILTER_SET_ZK_PATH + log.info("Using ZooKeeperSetFilter with path " + filterPath) + new ZooKeeperSetFilter(zkClient, filterPath) + } + case None => NoOpFilter + } + } + val jobPriorities = List(Priority.Low, Priority.Medium, Priority.High).map(_.id) val copyPriority = Priority.Medium.id @@ -76,8 +102,8 @@ class FlockDB(config: FlockDBConfig) extends GizzardServer(config) with Service val forwardingManager = new ForwardingManager(nameServer.multiTableForwarder[Shard]) - jobCodec += ("single.Single".r, new jobs.single.SingleJobParser(forwardingManager, OrderedUuidGenerator)) - jobCodec += ("multi.Multi".r, new jobs.multi.MultiJobParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize)) + jobCodec += ("single.Single".r, new jobs.single.SingleJobParser(forwardingManager, OrderedUuidGenerator, jobFilter)) + jobCodec += ("multi.Multi".r, new jobs.multi.MultiJobParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, jobFilter)) jobCodec += ("jobs\\.(Copy|Migrate)".r, new jobs.CopyParser(nameServer, jobScheduler(Priority.Medium.id))) jobCodec += ("jobs\\.(MetadataCopy|MetadataMigrate)".r, new jobs.MetadataCopyParser(nameServer, jobScheduler(Priority.Medium.id))) @@ -85,19 +111,20 @@ class FlockDB(config: FlockDBConfig) extends GizzardServer(config) with Service // XXX: remove when old tagged jobs no longer exist. import jobs.LegacySingleJobParser import jobs.LegacyMultiJobParser - jobCodec += ("single.Add".r, LegacySingleJobParser.Add(forwardingManager, OrderedUuidGenerator)) - jobCodec += ("single.Remove".r, LegacySingleJobParser.Remove(forwardingManager, OrderedUuidGenerator)) - jobCodec += ("single.Archive".r, LegacySingleJobParser.Archive(forwardingManager, OrderedUuidGenerator)) - jobCodec += ("single.Negate".r, LegacySingleJobParser.Negate(forwardingManager, OrderedUuidGenerator)) - jobCodec += ("multi.Archive".r, LegacyMultiJobParser.Archive(forwardingManager, jobScheduler, config.aggregateJobsPageSize)) - jobCodec += ("multi.Unarchive".r, LegacyMultiJobParser.Unarchive(forwardingManager, jobScheduler, config.aggregateJobsPageSize)) - jobCodec += ("multi.RemoveAll".r, LegacyMultiJobParser.RemoveAll(forwardingManager, jobScheduler, config.aggregateJobsPageSize)) - jobCodec += ("multi.Negate".r, LegacyMultiJobParser.Negate(forwardingManager, jobScheduler, config.aggregateJobsPageSize)) + jobCodec += ("single.Add".r, LegacySingleJobParser.Add(forwardingManager, OrderedUuidGenerator, jobFilter)) + jobCodec += ("single.Remove".r, LegacySingleJobParser.Remove(forwardingManager, OrderedUuidGenerator, jobFilter)) + jobCodec += ("single.Archive".r, LegacySingleJobParser.Archive(forwardingManager, OrderedUuidGenerator, jobFilter)) + jobCodec += ("single.Negate".r, LegacySingleJobParser.Negate(forwardingManager, OrderedUuidGenerator, jobFilter)) + jobCodec += ("multi.Archive".r, LegacyMultiJobParser.Archive(forwardingManager, jobScheduler, jobFilter, config.aggregateJobsPageSize)) + jobCodec += ("multi.Unarchive".r, LegacyMultiJobParser.Unarchive(forwardingManager, jobScheduler, jobFilter, config.aggregateJobsPageSize)) + jobCodec += ("multi.RemoveAll".r, LegacyMultiJobParser.RemoveAll(forwardingManager, jobScheduler, jobFilter, config.aggregateJobsPageSize)) + jobCodec += ("multi.Negate".r, LegacyMultiJobParser.Negate(forwardingManager, jobScheduler, jobFilter, config.aggregateJobsPageSize)) val flockService = { val edges = new EdgesService( forwardingManager, jobScheduler, + jobFilter, config.readFuture("readFuture"), config.intersectionQuery, config.aggregateJobsPageSize diff --git a/src/main/scala/com/twitter/flockdb/JobFilter.scala b/src/main/scala/com/twitter/flockdb/JobFilter.scala new file mode 100644 index 00000000..44b3cb01 --- /dev/null +++ b/src/main/scala/com/twitter/flockdb/JobFilter.scala @@ -0,0 +1,88 @@ +package com.twitter.flockdb + +import com.twitter.logging.Logger +import com.twitter.common.base.Function +import com.twitter.common.zookeeper.{ZooKeeperClient, ZooKeeperMap, ZooKeeperUtils} +import com.twitter.gizzard.Stats +import java.util.{Set => JSet} +import org.apache.zookeeper.ZooDefs +import scala.collection.JavaConversions + +trait JobFilter { + /** + * Returns true iff the job with the given sourceId, destID, + * and graphId parameters should be executed. + */ + def apply(sourceId: Long, destId: Long, graphId: Int): Boolean +} + +/** + * Default no-op filter. All operations pass. + */ +object NoOpFilter extends JobFilter { + def apply(sourceId: Long, destId: Long, graphId: Int): Boolean = true +} + +/** + * A filter based on an independently-updated Java set. + * + * Set entries should have the form 'SOURCE_ID:DEST_ID:GRAPH_ID', each of + * which specifies either a user ID or a wildcard (*). + * A wildcard may be specified for At most one of SOURCE_ID and DEST_ID. + * + * Ex. Filter all writes from user 1234 on graph 5: "1234:*:5". + * Filter all writes to user 2345 on all graphs: "*:2345:*". + * Filter writes on edges between users 100 and 200 on all graphs: "100:200:*". + */ +class SetFilter(set: JSet[String]) extends JobFilter { + private val WILDCARD = "*" + private val FILTERS = Seq( + (true, true, true), + (true, true, false), + (true, false, true), + (false, true, true), + (false, true, false), + (true, false, false)) + + Stats.addGauge("active-filters") { set.size() } + + def apply(sourceId: Long, destId: Long, graphId: Int): Boolean = { + def filterKey(filter: (Boolean, Boolean, Boolean)) = { + "%s:%s:%s".format( + if (filter._1) sourceId.toString else WILDCARD, + if (filter._2) destId.toString else WILDCARD, + if (filter._3) graphId.toString else WILDCARD + ) + } + + for (filter <- FILTERS) { + if (set.contains(filterKey(filter))) { + Stats.incr("edges-filtered") + return false + } + } + Stats.incr("edges-passed") + true + } +} + +/** + * A filter based on a ZooKeeperMap. Filters are stored as node keys under the given zkPath. + * The Zookeeper node values are not used. + */ +class ZooKeeperSetFilter(zkClient: ZooKeeperClient, zkPath: String) extends JobFilter { + val log = Logger.get + + private val deserializer = new Function[Array[Byte], Unit]() { + override def apply(data : Array[Byte]) = () + } + + ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, zkPath) + private val zkMap: ZooKeeperMap[Unit] = ZooKeeperMap.create(zkClient, zkPath, deserializer) + log.info("Initial filter set: " + ( JavaConversions.asScalaSet(zkMap.keySet()).mkString(", "))) + + private val setFilter = new SetFilter(zkMap.keySet()) + + def apply(sourceId: Long, destId: Long, graphId: Int): Boolean = + setFilter(sourceId, destId, graphId) +} diff --git a/src/main/scala/com/twitter/flockdb/Main.scala b/src/main/scala/com/twitter/flockdb/Main.scala index 1781ba42..085a778e 100644 --- a/src/main/scala/com/twitter/flockdb/Main.scala +++ b/src/main/scala/com/twitter/flockdb/Main.scala @@ -26,6 +26,7 @@ object Main { val service = new FlockDB(config) ServiceTracker.register(service) + log.info("Starting Flock service") service.start() } catch { diff --git a/src/main/scala/com/twitter/flockdb/Metadata.scala b/src/main/scala/com/twitter/flockdb/Metadata.scala index ee21d526..969bea09 100644 --- a/src/main/scala/com/twitter/flockdb/Metadata.scala +++ b/src/main/scala/com/twitter/flockdb/Metadata.scala @@ -52,7 +52,8 @@ case class Metadata(sourceId: Long, state: State, count: Int, updatedAtSeconds: tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, - priority: Int + priority: Int, + jobFilter: JobFilter ) = { val job = new Multi( sourceId, @@ -63,7 +64,8 @@ case class Metadata(sourceId: Long, state: State, count: Int, updatedAtSeconds: Priority.Medium, 500, forwardingManager, - scheduler + scheduler, + jobFilter ) scheduler.put(priority, job) diff --git a/src/main/scala/com/twitter/flockdb/config/FlockDB.scala b/src/main/scala/com/twitter/flockdb/config/FlockDB.scala index b0776819..8a4f4f8b 100644 --- a/src/main/scala/com/twitter/flockdb/config/FlockDB.scala +++ b/src/main/scala/com/twitter/flockdb/config/FlockDB.scala @@ -6,7 +6,7 @@ import com.twitter.querulous.config.{Connection, QueryEvaluator} import com.twitter.util.TimeConversions._ import com.twitter.flockdb.queries.QueryTree import com.twitter.flockdb.queries - +import java.net.InetSocketAddress trait FlockDBServer extends TServer { var name = "flockdb_edges" @@ -37,4 +37,7 @@ trait FlockDB extends GizzardServer { def readFuture: Future def adminConfig: AdminServiceConfig + + case class ZooKeeperSettings(server: String, basePath: String) + def zooKeeperSettings: Option[ZooKeeperSettings] = None } diff --git a/src/main/scala/com/twitter/flockdb/jobs/Legacy.scala b/src/main/scala/com/twitter/flockdb/jobs/Legacy.scala index 09186991..fe141142 100644 --- a/src/main/scala/com/twitter/flockdb/jobs/Legacy.scala +++ b/src/main/scala/com/twitter/flockdb/jobs/Legacy.scala @@ -20,7 +20,7 @@ import com.twitter.logging.Logger import com.twitter.util.Time import com.twitter.gizzard.scheduler._ import com.twitter.gizzard.shards._ -import com.twitter.flockdb.{State, ForwardingManager, Cursor, UuidGenerator, Direction, Priority} +import com.twitter.flockdb.{State, ForwardingManager, Cursor, UuidGenerator, Direction, Priority, JobFilter} import com.twitter.flockdb.conversions.Numeric._ import com.twitter.flockdb.jobs.single.Single import com.twitter.flockdb.jobs.multi.Multi @@ -30,20 +30,20 @@ import com.twitter.flockdb.jobs.multi.Multi // XXX: remove once we're off of the old format, or factor out common code with above. object LegacySingleJobParser { - def Add(forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator) = { - new LegacySingleJobParser(forwardingManager, uuidGenerator, State.Normal) + def Add(forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator, jobFilter: JobFilter) = { + new LegacySingleJobParser(forwardingManager, uuidGenerator, jobFilter, State.Normal) } - def Negate(forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator) = { - new LegacySingleJobParser(forwardingManager, uuidGenerator, State.Negative) + def Negate(forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator, jobFilter: JobFilter) = { + new LegacySingleJobParser(forwardingManager, uuidGenerator, jobFilter, State.Negative) } - def Archive(forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator) = { - new LegacySingleJobParser(forwardingManager, uuidGenerator, State.Archived) + def Archive(forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator, jobFilter: JobFilter) = { + new LegacySingleJobParser(forwardingManager, uuidGenerator, jobFilter, State.Archived) } - def Remove(forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator) = { - new LegacySingleJobParser(forwardingManager, uuidGenerator, State.Removed) + def Remove(forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator, jobFilter: JobFilter) = { + new LegacySingleJobParser(forwardingManager, uuidGenerator, jobFilter, State.Removed) } } @@ -51,39 +51,44 @@ object LegacyMultiJobParser { def Archive( forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, + jobFilter: JobFilter, aggregateJobPageSize: Int ) = { - new LegacyMultiJobParser(forwardingManager, scheduler, aggregateJobPageSize, State.Archived) + new LegacyMultiJobParser(forwardingManager, scheduler, jobFilter, aggregateJobPageSize, State.Archived) } def Unarchive( forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, + jobFilter: JobFilter, aggregateJobPageSize: Int ) = { - new LegacyMultiJobParser(forwardingManager, scheduler, aggregateJobPageSize, State.Normal) + new LegacyMultiJobParser(forwardingManager, scheduler, jobFilter, aggregateJobPageSize, State.Normal) } def RemoveAll( forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, + jobFilter: JobFilter, aggregateJobPageSize: Int ) = { - new LegacyMultiJobParser(forwardingManager, scheduler, aggregateJobPageSize, State.Removed) + new LegacyMultiJobParser(forwardingManager, scheduler, jobFilter, aggregateJobPageSize, State.Removed) } def Negate( forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, + jobFilter: JobFilter, aggregateJobPageSize: Int ) = { - new LegacyMultiJobParser(forwardingManager, scheduler, aggregateJobPageSize, State.Negative) + new LegacyMultiJobParser(forwardingManager, scheduler, jobFilter, aggregateJobPageSize, State.Negative) } } class LegacySingleJobParser( forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator, + jobFilter: JobFilter, state: State) extends JsonJobParser { @@ -100,7 +105,8 @@ extends JsonJobParser { state, // ONLY DIFFERENCE FROM SingleJobParser Time.fromSeconds(casted("updated_at").toInt), forwardingManager, - uuidGenerator + uuidGenerator, + jobFilter ) } } @@ -108,6 +114,7 @@ extends JsonJobParser { class LegacyMultiJobParser( forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, + jobFilter: JobFilter, aggregateJobPageSize: Int, state: State) extends JsonJobParser { @@ -124,7 +131,8 @@ extends JsonJobParser { Priority(casted.get("priority").map(_.toInt).getOrElse(Priority.Low.id)), aggregateJobPageSize, forwardingManager, - scheduler + scheduler, + jobFilter ) } } diff --git a/src/main/scala/com/twitter/flockdb/jobs/multi/Multi.scala b/src/main/scala/com/twitter/flockdb/jobs/multi/Multi.scala index 8ae68eba..0c3cbc75 100644 --- a/src/main/scala/com/twitter/flockdb/jobs/multi/Multi.scala +++ b/src/main/scala/com/twitter/flockdb/jobs/multi/Multi.scala @@ -20,7 +20,7 @@ import com.twitter.gizzard.scheduler._ import com.twitter.gizzard.shards.ShardBlackHoleException import com.twitter.util.Time import com.twitter.util.TimeConversions._ -import com.twitter.flockdb.{State, ForwardingManager, Cursor, Priority, Direction} +import com.twitter.flockdb.{State, ForwardingManager, Cursor, Priority, Direction, JobFilter} import com.twitter.flockdb.conversions.Numeric._ import com.twitter.flockdb.shards.Shard import com.twitter.flockdb.jobs.single.Single @@ -29,7 +29,8 @@ import com.twitter.flockdb.jobs.single.Single class MultiJobParser( forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, - aggregateJobPageSize: Int) + aggregateJobPageSize: Int, + jobFilter: JobFilter) extends JsonJobParser { def apply(attributes: Map[String, Any]): JsonJob = { @@ -45,7 +46,8 @@ extends JsonJobParser { aggregateJobPageSize, casted.get("cursor").map( c => Cursor(c.toLong)).getOrElse(Cursor.Start), forwardingManager, - scheduler + scheduler, + jobFilter ) } } @@ -60,7 +62,8 @@ class Multi( aggregateJobPageSize: Int, var cursor: Cursor, forwardingManager: ForwardingManager, - scheduler: PrioritizingJobScheduler) + scheduler: PrioritizingJobScheduler, + jobFilter: JobFilter) extends JsonJob { def this( @@ -72,7 +75,8 @@ extends JsonJob { priority: Priority.Value, aggregateJobPageSize: Int, forwardingManager: ForwardingManager, - scheduler: PrioritizingJobScheduler + scheduler: PrioritizingJobScheduler, + jobFilter: JobFilter ) = { this( sourceId, @@ -84,7 +88,8 @@ extends JsonJob { aggregateJobPageSize, Cursor.Start, forwardingManager, - scheduler + scheduler, + jobFilter ) } @@ -126,7 +131,7 @@ extends JsonJob { // XXX: since this job gets immediately serialized, pass null for forwardingManager and uuidGenerator. protected def singleEdgeJob(sourceId: Long, graphId: Int, destinationId: Long, state: State) = { - new Single(sourceId, graphId, destinationId, updatedAt.inMillis, state, updatedAt, null, null) + new Single(sourceId, graphId, destinationId, updatedAt.inMillis, state, updatedAt, null, null, jobFilter) } protected def updateMetadata(shard: Shard, state: State) = state match { diff --git a/src/main/scala/com/twitter/flockdb/jobs/single/Single.scala b/src/main/scala/com/twitter/flockdb/jobs/single/Single.scala index ab628542..1fce1cf4 100644 --- a/src/main/scala/com/twitter/flockdb/jobs/single/Single.scala +++ b/src/main/scala/com/twitter/flockdb/jobs/single/Single.scala @@ -18,10 +18,12 @@ package com.twitter.flockdb.jobs.single import com.twitter.logging.Logger import com.twitter.util.{Time, Return, Throw} +import com.twitter.gizzard.config import com.twitter.gizzard.scheduler._ import com.twitter.gizzard.shards._ +import com.twitter.gizzard.Stats import com.twitter.conversions.time._ -import com.twitter.flockdb.{State, ForwardingManager, Cursor, UuidGenerator, Direction} +import com.twitter.flockdb.{State, ForwardingManager, Cursor, UuidGenerator, Direction, JobFilter} import com.twitter.flockdb.conversions.Numeric._ import com.twitter.flockdb.shards.Shard import com.twitter.flockdb.shards.LockingNodeSet._ @@ -29,7 +31,8 @@ import com.twitter.flockdb.shards.LockingNodeSet._ class SingleJobParser( forwardingManager: ForwardingManager, - uuidGenerator: UuidGenerator) + uuidGenerator: UuidGenerator, + jobFilter: JobFilter) extends JsonJobParser { def log = Logger.get @@ -57,11 +60,18 @@ extends JsonJobParser { Time.fromSeconds(casted("updated_at").toInt), forwardingManager, uuidGenerator, + jobFilter, writeSuccesses.toList ) } } +// TODO: move the filteredJobsQueue into a field in the gizzard JobScheduler. +object Single { + val filteredJobsLogger = new config.JsonJobLogger{ name = "filtered_jobs" } + val filteredJobsQueue: JobConsumer = filteredJobsLogger() +} + class Single( sourceId: Long, graphId: Int, @@ -71,6 +81,7 @@ class Single( updatedAt: Time, forwardingManager: ForwardingManager, uuidGenerator: UuidGenerator, + jobFilter: JobFilter, var successes: List[ShardId] = Nil) extends JsonJob { @@ -91,7 +102,12 @@ extends JsonJob { } } - def apply() = { + def apply() : Unit = { + if (!jobFilter(sourceId, destinationId, graphId)) { + Stats.incr("single-jobs-filtered") + Single.filteredJobsQueue.put(this) + return + } val forward = forwardingManager.findNode(sourceId, graphId, Direction.Forward).write val backward = forwardingManager.findNode(destinationId, graphId, Direction.Backward).write val uuid = uuidGenerator(position) diff --git a/src/main/scala/com/twitter/flockdb/queries/ExecuteCompiler.scala b/src/main/scala/com/twitter/flockdb/queries/ExecuteCompiler.scala index 1db1457e..6fb43cf1 100644 --- a/src/main/scala/com/twitter/flockdb/queries/ExecuteCompiler.scala +++ b/src/main/scala/com/twitter/flockdb/queries/ExecuteCompiler.scala @@ -29,7 +29,7 @@ import jobs.multi.Multi import operations.{ExecuteOperations, ExecuteOperationType} -class ExecuteCompiler(scheduler: PrioritizingJobScheduler, forwardingManager: ForwardingManager, aggregateJobPageSize: Int) { +class ExecuteCompiler(scheduler: PrioritizingJobScheduler, forwardingManager: ForwardingManager, jobFilter: JobFilter, aggregateJobPageSize: Int) { @throws(classOf[ShardException]) def apply(program: ExecuteOperations) { val now = Time.now @@ -62,7 +62,8 @@ class ExecuteCompiler(scheduler: PrioritizingJobScheduler, forwardingManager: Fo state, time, null, - null + null, + jobFilter ) } { new Multi( @@ -74,7 +75,8 @@ class ExecuteCompiler(scheduler: PrioritizingJobScheduler, forwardingManager: Fo program.priority, aggregateJobPageSize, null, - null + null, + jobFilter ) } } diff --git a/src/test/scala/com/twitter/flockdb/integration/FlockFixRegressionSpec.scala b/src/test/scala/com/twitter/flockdb/integration/FlockFixRegressionSpec.scala index b2c7b59a..c6587dcc 100644 --- a/src/test/scala/com/twitter/flockdb/integration/FlockFixRegressionSpec.scala +++ b/src/test/scala/com/twitter/flockdb/integration/FlockFixRegressionSpec.scala @@ -64,7 +64,7 @@ class FlockFixRegressionSpec extends IntegrationSpecification { Thread.sleep(1000) - val job = new Multi(alice, FOLLOWS, Direction.Forward, State.Normal, Time.now, flockdb.Priority.High, pageSize, flock.forwardingManager, flock.jobScheduler) + val job = new Multi(alice, FOLLOWS, Direction.Forward, State.Normal, Time.now, flockdb.Priority.High, pageSize, flock.forwardingManager, flock.jobScheduler, NoOpFilter) job() alicesFollowings().size must eventually(be(10)) diff --git a/src/test/scala/com/twitter/flockdb/integration/OptimisticLockRegressionSpec.scala b/src/test/scala/com/twitter/flockdb/integration/OptimisticLockRegressionSpec.scala index 3fbdac07..8c5152bb 100644 --- a/src/test/scala/com/twitter/flockdb/integration/OptimisticLockRegressionSpec.scala +++ b/src/test/scala/com/twitter/flockdb/integration/OptimisticLockRegressionSpec.scala @@ -53,7 +53,7 @@ class OptimisticLockRegressionSpec extends IntegrationSpecification() { val errors = scheduler.errorQueue // No thrift api for this, so this is the best I know how to do. - scheduler.put(new Single(1, FOLLOWS, 5106, 123456, State.Normal, Time.now, flock.forwardingManager, OrderedUuidGenerator)) + scheduler.put(new Single(1, FOLLOWS, 5106, 123456, State.Normal, Time.now, flock.forwardingManager, OrderedUuidGenerator, NoOpFilter)) flockService.execute(Select(1, FOLLOWS, ()).archive.toThrift) diff --git a/src/test/scala/com/twitter/flockdb/unit/EdgeSpec.scala b/src/test/scala/com/twitter/flockdb/unit/EdgeSpec.scala index 93fde6d9..eed9f8d9 100644 --- a/src/test/scala/com/twitter/flockdb/unit/EdgeSpec.scala +++ b/src/test/scala/com/twitter/flockdb/unit/EdgeSpec.scala @@ -33,7 +33,7 @@ object EdgeSpec extends ConfiguredSpecification with JMocker with ClassMocker { "Edge" should { "becomes correct job" in { val edge = new Edge(source, dest, pos, now, count, State.Normal) - edge.toJob(graph, forwardingManager) mustEqual new Single(source, graph, dest, pos, State.Normal, now, forwardingManager, OrderedUuidGenerator) + edge.toJob(graph, forwardingManager, NoOpFilter) mustEqual new Single(source, graph, dest, pos, State.Normal, now, forwardingManager, OrderedUuidGenerator, NoOpFilter) } } } diff --git a/src/test/scala/com/twitter/flockdb/unit/EdgesSpec.scala b/src/test/scala/com/twitter/flockdb/unit/EdgesSpec.scala index d12fefde..5af6bf74 100644 --- a/src/test/scala/com/twitter/flockdb/unit/EdgesSpec.scala +++ b/src/test/scala/com/twitter/flockdb/unit/EdgesSpec.scala @@ -46,11 +46,11 @@ object EdgesSpec extends ConfiguredSpecification with JMocker with ClassMocker { val shard = mock[Shard] val scheduler = mock[PrioritizingJobScheduler] val future = mock[Future] - val flock = new FlockDBThriftAdapter(new EdgesService(forwardingManager, scheduler, future, config.intersectionQuery, config.aggregateJobsPageSize), null) + val flock = new FlockDBThriftAdapter(new EdgesService(forwardingManager, scheduler, NoOpFilter, future, config.intersectionQuery, config.aggregateJobsPageSize), null) "add" in { Time.withCurrentTimeFrozen { time => - val job = new Single(bob, FOLLOWS, mary, Time.now.inMillis, State.Normal, Time.now, null, null) + val job = new Single(bob, FOLLOWS, mary, Time.now.inMillis, State.Normal, Time.now, null, null, NoOpFilter) expect { one(forwardingManager).find(0, FOLLOWS, Direction.Forward) one(scheduler).put(will(beEqual(Priority.High.id)), nestedJob.capture) @@ -62,7 +62,7 @@ object EdgesSpec extends ConfiguredSpecification with JMocker with ClassMocker { "add_at" in { Time.withCurrentTimeFrozen { time => - val job = new Single(bob, FOLLOWS, mary, Time.now.inMillis, State.Normal, Time.now, null, null) + val job = new Single(bob, FOLLOWS, mary, Time.now.inMillis, State.Normal, Time.now, null, null, NoOpFilter) expect { one(forwardingManager).find(0, FOLLOWS, Direction.Forward) one(scheduler).put(will(beEqual(Priority.High.id)), nestedJob.capture) @@ -74,7 +74,7 @@ object EdgesSpec extends ConfiguredSpecification with JMocker with ClassMocker { "remove" in { Time.withCurrentTimeFrozen { time => - val job = new Single(bob, FOLLOWS, mary, Time.now.inMillis, State.Removed, Time.now, null, null) + val job = new Single(bob, FOLLOWS, mary, Time.now.inMillis, State.Removed, Time.now, null, null, NoOpFilter) expect { one(forwardingManager).find(0, FOLLOWS, Direction.Forward) one(scheduler).put(will(beEqual(Priority.High.id)), nestedJob.capture) @@ -86,7 +86,7 @@ object EdgesSpec extends ConfiguredSpecification with JMocker with ClassMocker { "remove_at" in { Time.withCurrentTimeFrozen { time => - val job = new Single(bob, FOLLOWS, mary, Time.now.inMillis, State.Removed, Time.now, null, null) + val job = new Single(bob, FOLLOWS, mary, Time.now.inMillis, State.Removed, Time.now, null, null, NoOpFilter) expect { one(forwardingManager).find(0, FOLLOWS, Direction.Forward) one(scheduler).put(will(beEqual(Priority.High.id)), nestedJob.capture) diff --git a/src/test/scala/com/twitter/flockdb/unit/ExecuteCompilerSpec.scala b/src/test/scala/com/twitter/flockdb/unit/ExecuteCompilerSpec.scala index be409a81..e2620082 100644 --- a/src/test/scala/com/twitter/flockdb/unit/ExecuteCompilerSpec.scala +++ b/src/test/scala/com/twitter/flockdb/unit/ExecuteCompilerSpec.scala @@ -57,7 +57,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla doBefore { scheduler = mock[PrioritizingJobScheduler] forwardingManager = mock[ForwardingManager] - executeCompiler = new ExecuteCompiler(scheduler, forwardingManager, config.aggregateJobsPageSize) + executeCompiler = new ExecuteCompiler(scheduler, forwardingManager, NoOpFilter, config.aggregateJobsPageSize) } "without execute_at present" in { @@ -68,7 +68,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Normal, Time.now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Normal, Time.now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -80,7 +80,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(alice, FOLLOWS, bob, Time.now.inMillis, State.Normal, Time.now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(alice, FOLLOWS, bob, Time.now.inMillis, State.Normal, Time.now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -101,7 +101,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Normal, now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Normal, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -111,7 +111,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(bob, FOLLOWS, alice, now.inMillis, State.Normal, now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(bob, FOLLOWS, alice, now.inMillis, State.Normal, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -123,7 +123,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Forward, State.Normal, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Forward, State.Normal, now, Priority.Low, config.aggregateJobsPageSize, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -133,7 +133,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Backward, State.Normal, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Backward, State.Normal, now, Priority.Low, config.aggregateJobsPageSize, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -146,8 +146,8 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla } executeCompiler(program) jsonMatching(List( - new Single(alice, FOLLOWS, bob, now.inMillis, State.Normal, now, null, null), - new Single(alice, FOLLOWS, carl, now.inMillis, State.Normal, now, null, null)), nestedJob.captured.jobs) + new Single(alice, FOLLOWS, bob, now.inMillis, State.Normal, now, null, null, NoOpFilter), + new Single(alice, FOLLOWS, carl, now.inMillis, State.Normal, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -158,8 +158,8 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla } executeCompiler(program) jsonMatching(List( - new Single(bob, FOLLOWS, alice, now.inMillis, State.Normal, now, null, null), - new Single(carl, FOLLOWS, alice, now.inMillis, State.Normal, now, null, null)), nestedJob.captured.jobs) + new Single(bob, FOLLOWS, alice, now.inMillis, State.Normal, now, null, null, NoOpFilter), + new Single(carl, FOLLOWS, alice, now.inMillis, State.Normal, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } } @@ -173,7 +173,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Removed, now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Removed, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -183,7 +183,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(bob, FOLLOWS, alice, now.inMillis, State.Removed, now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(bob, FOLLOWS, alice, now.inMillis, State.Removed, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -195,7 +195,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Forward, State.Removed, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Forward, State.Removed, now, Priority.Low, config.aggregateJobsPageSize, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -205,7 +205,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Backward, State.Removed, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Backward, State.Removed, now, Priority.Low, config.aggregateJobsPageSize, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -218,8 +218,8 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla } executeCompiler(program) jsonMatching(List( - new Single(alice, FOLLOWS, bob, now.inMillis, State.Removed, now, null, null), - new Single(alice, FOLLOWS, carl, now.inMillis, State.Removed, now, null, null)), nestedJob.captured.jobs) + new Single(alice, FOLLOWS, bob, now.inMillis, State.Removed, now, null, null, NoOpFilter), + new Single(alice, FOLLOWS, carl, now.inMillis, State.Removed, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -230,8 +230,8 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla } executeCompiler(program) jsonMatching(List( - new Single(bob, FOLLOWS, alice, now.inMillis, State.Removed, now, null, null), - new Single(carl, FOLLOWS, alice, now.inMillis, State.Removed, now, null, null)), nestedJob.captured.jobs) + new Single(bob, FOLLOWS, alice, now.inMillis, State.Removed, now, null, null, NoOpFilter), + new Single(carl, FOLLOWS, alice, now.inMillis, State.Removed, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } } @@ -245,7 +245,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Archived, now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Archived, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -255,7 +255,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(bob, FOLLOWS, alice, now.inMillis, State.Archived, now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(bob, FOLLOWS, alice, now.inMillis, State.Archived, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -267,7 +267,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Forward, State.Archived, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Forward, State.Archived, now, Priority.Low, config.aggregateJobsPageSize, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -277,7 +277,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Backward, State.Archived, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Backward, State.Archived, now, Priority.Low, config.aggregateJobsPageSize, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -290,8 +290,8 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla } executeCompiler(program) jsonMatching(List( - new Single(alice, FOLLOWS, bob, now.inMillis, State.Archived, now, null, null), - new Single(alice, FOLLOWS, carl, now.inMillis, State.Archived, now, null, null)), nestedJob.captured.jobs) + new Single(alice, FOLLOWS, bob, now.inMillis, State.Archived, now, null, null, NoOpFilter), + new Single(alice, FOLLOWS, carl, now.inMillis, State.Archived, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -302,8 +302,8 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla } executeCompiler(program) jsonMatching(List( - new Single(bob, FOLLOWS, alice, now.inMillis, State.Archived, now, null, null), - new Single(carl, FOLLOWS, alice, now.inMillis, State.Archived, now, null, null)), nestedJob.captured.jobs) + new Single(bob, FOLLOWS, alice, now.inMillis, State.Archived, now, null, null, NoOpFilter), + new Single(carl, FOLLOWS, alice, now.inMillis, State.Archived, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } } @@ -317,7 +317,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Negative, now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(alice, FOLLOWS, bob, now.inMillis, State.Negative, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -327,7 +327,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Single(bob, FOLLOWS, alice, now.inMillis, State.Negative, now, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Single(bob, FOLLOWS, alice, now.inMillis, State.Negative, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -339,7 +339,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Forward, State.Negative, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Forward, State.Negative, now, Priority.Low, config.aggregateJobsPageSize, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -349,7 +349,7 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla one(scheduler).put(will(beEqual(Priority.Low.id)), nestedJob.capture) } executeCompiler(program) - jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Backward, State.Negative, now, Priority.Low, config.aggregateJobsPageSize, null, null)), nestedJob.captured.jobs) + jsonMatching(List(new Multi(alice, FOLLOWS, Direction.Backward, State.Negative, now, Priority.Low, config.aggregateJobsPageSize, null, null, NoOpFilter)), nestedJob.captured.jobs) } } @@ -362,8 +362,8 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla } executeCompiler(program) jsonMatching(List( - new Single(alice, FOLLOWS, bob, now.inMillis, State.Negative, now, null, null), - new Single(alice, FOLLOWS, carl, now.inMillis, State.Negative, now, null, null)), nestedJob.captured.jobs) + new Single(alice, FOLLOWS, bob, now.inMillis, State.Negative, now, null, null, NoOpFilter), + new Single(alice, FOLLOWS, carl, now.inMillis, State.Negative, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } "backward" >> { @@ -374,8 +374,8 @@ object ExecuteCompilerSpec extends ConfiguredSpecification with JMocker with Cla } executeCompiler(program) jsonMatching(List( - new Single(bob, FOLLOWS, alice, now.inMillis, State.Negative, now, null, null), - new Single(carl, FOLLOWS, alice, now.inMillis, State.Negative, now, null, null)), nestedJob.captured.jobs) + new Single(bob, FOLLOWS, alice, now.inMillis, State.Negative, now, null, null, NoOpFilter), + new Single(carl, FOLLOWS, alice, now.inMillis, State.Negative, now, null, null, NoOpFilter)), nestedJob.captured.jobs) } } } diff --git a/src/test/scala/com/twitter/flockdb/unit/JobFilterSpec.scala b/src/test/scala/com/twitter/flockdb/unit/JobFilterSpec.scala new file mode 100644 index 00000000..ea7aafe8 --- /dev/null +++ b/src/test/scala/com/twitter/flockdb/unit/JobFilterSpec.scala @@ -0,0 +1,93 @@ +package com.twitter.flockdb + +import org.specs.Specification +import scala.collection.JavaConversions._ + +object JobFilterSpec extends Specification { + + "NoOpFilter" should { + "always return true" in { + val filter = NoOpFilter + filter(1, 2, 3) mustEqual true + } + } + + "SetFilter" should { + "return true when no filters are configured" in { + val filter = new SetFilter(Set[String]()) + filter(1, 2, 3) mustEqual true + } + + "apply filters" in { + val filter = new SetFilter(Set("1:2:3", "4:5:*", "6:*:7", "8:*:*", "*:9:10", "*:11:*")) + + // Fully-specified filter. + filter(1, 2, 3) mustEqual false + filter(1, 2, 4) mustEqual true + filter(1, 3, 4) mustEqual true + filter(2, 1, 3) mustEqual true + + // Source and destination filter on any graph. + filter(4, 5, 1) mustEqual false + filter(4, 5, 2) mustEqual false + filter(4, 1, 1) mustEqual true + + // Source filter on a specific graph. + filter(6, 1, 7) mustEqual false + filter(6, 2, 7) mustEqual false + filter(6, 1, 8) mustEqual true + + // Source filter on any graph. + filter(8, 1, 1) mustEqual false + filter(8, 1, 2) mustEqual false + filter(8, 2, 1) mustEqual false + filter(8, 2, 2) mustEqual false + filter(7, 1, 1) mustEqual true + filter(7, 1, 2) mustEqual true + filter(7, 2, 1) mustEqual true + filter(7, 2, 2) mustEqual true + + // Destination filter on a specific graph. + filter(1, 9, 10) mustEqual false + filter(2, 9, 10) mustEqual false + filter(1, 9, 1) mustEqual true + + // Destination filter on any graph. + filter(1, 11, 1) mustEqual false + filter(1, 11, 2) mustEqual false + filter(2, 11, 1) mustEqual false + filter(2, 11, 2) mustEqual false + filter(1, 12, 1) mustEqual true + filter(1, 12, 2) mustEqual true + filter(1, 12, 1) mustEqual true + filter(1, 12, 2) mustEqual true + } + + "not support filtering on all sources and destinations" in { + // Invalid filter on all sources and destinations for a single graph. + val filter = new SetFilter(Set("*:*:1")) + filter(1, 1, 1) mustEqual true + filter(1, 2, 1) mustEqual true + filter(2, 1, 1) mustEqual true + + // Invalid filter on everything. + val filter2 = new SetFilter(Set("*:*:*")) + filter2(1, 1, 1) mustEqual true + filter2(1, 2, 1) mustEqual true + filter2(2, 1, 1) mustEqual true + filter2(1, 1, 2) mustEqual true + filter2(1, 2, 2) mustEqual true + filter2(2, 1, 2) mustEqual true + } + + "ignore invalid filters" in { + val filter = new SetFilter(Set("!@#$%", "1:*", "*:2", "7:8:9")) + filter(1, 2, 3) mustEqual true + filter(3, 2, 1) mustEqual true + filter(2, 2, 2) mustEqual true + filter(1, 1, 1) mustEqual true + filter(9, 8, 7) mustEqual true + filter(7, 8, 9) mustEqual false + } + } +} diff --git a/src/test/scala/com/twitter/flockdb/unit/JobSpec.scala b/src/test/scala/com/twitter/flockdb/unit/JobSpec.scala index b87a2d20..9c164aa7 100644 --- a/src/test/scala/com/twitter/flockdb/unit/JobSpec.scala +++ b/src/test/scala/com/twitter/flockdb/unit/JobSpec.scala @@ -59,7 +59,7 @@ class JobSpec extends ConfiguredSpecification with JMocker with ClassMocker { ) = { desc in { Time.withCurrentTimeFrozen { time => - val job = new Single(bob, FOLLOWS, mary, 1, jobState, Time.now, forwardingManager, uuidGenerator) + val job = new Single(bob, FOLLOWS, mary, 1, jobState, Time.now, forwardingManager, uuidGenerator, NoOpFilter) expect { allowing(forwardingManager).findNode(bob, FOLLOWS, Forward) willReturn shards(0) @@ -98,7 +98,7 @@ class JobSpec extends ConfiguredSpecification with JMocker with ClassMocker { "Single" should { "toJson" in { Time.withCurrentTimeFrozen { time => - val job = new Single(bob, FOLLOWS, mary, 1, State.Normal, Time.now, forwardingManager, uuidGenerator) + val job = new Single(bob, FOLLOWS, mary, 1, State.Normal, Time.now, forwardingManager, uuidGenerator, NoOpFilter) val json = job.toJson json mustMatch "Single" json mustMatch "\"source_id\":" + bob @@ -111,7 +111,7 @@ class JobSpec extends ConfiguredSpecification with JMocker with ClassMocker { "toJson with successes" in { Time.withCurrentTimeFrozen { time => - val job = new Single(bob, FOLLOWS, mary, 1, State.Normal, Time.now, forwardingManager, uuidGenerator, List(ShardId("host", "prefix"))) + val job = new Single(bob, FOLLOWS, mary, 1, State.Normal, Time.now, forwardingManager, uuidGenerator, NoOpFilter, List(ShardId("host", "prefix"))) val json = job.toJson json mustMatch "Single" json mustMatch "\"source_id\":" + bob @@ -150,7 +150,7 @@ class JobSpec extends ConfiguredSpecification with JMocker with ClassMocker { "Multi" should { "toJson" in { Time.withCurrentTimeFrozen { time => - val job = new Multi(bob, FOLLOWS, Direction.Forward, State.Normal, Time.now, Priority.Low, 500, null, null) + val job = new Multi(bob, FOLLOWS, Direction.Forward, State.Normal, Time.now, Priority.Low, 500, null, null, NoOpFilter) val json = job.toJson json mustMatch "Multi" json mustMatch "\"source_id\":" + bob diff --git a/src/test/scala/com/twitter/flockdb/unit/LegacyJobParserSpec.scala b/src/test/scala/com/twitter/flockdb/unit/LegacyJobParserSpec.scala index cbc6a17d..095352d2 100644 --- a/src/test/scala/com/twitter/flockdb/unit/LegacyJobParserSpec.scala +++ b/src/test/scala/com/twitter/flockdb/unit/LegacyJobParserSpec.scala @@ -19,7 +19,7 @@ package com.twitter.flockdb.unit import com.twitter.util.Time import com.twitter.gizzard.scheduler.JsonCodec import com.twitter.flockdb.ConfiguredSpecification -import com.twitter.flockdb.{Direction, State, Priority} +import com.twitter.flockdb.{Direction, State, Priority, NoOpFilter} import com.twitter.flockdb.jobs.single.Single import com.twitter.flockdb.jobs.multi.Multi import com.twitter.flockdb.jobs._ @@ -30,14 +30,14 @@ class LegacyJobParserSpec extends ConfiguredSpecification { val updatedAt = Time.fromSeconds(1111) val codec = new JsonCodec(_ => ()) - codec += ("com.twitter.flockdb.jobs.single.Add".r, LegacySingleJobParser.Add(null, null)) - codec += ("com.twitter.flockdb.jobs.single.Remove".r, LegacySingleJobParser.Remove(null, null)) - codec += ("com.twitter.flockdb.jobs.single.Negate".r, LegacySingleJobParser.Negate(null, null)) - codec += ("com.twitter.flockdb.jobs.single.Archive".r, LegacySingleJobParser.Archive(null, null)) - codec += ("com.twitter.flockdb.jobs.multi.Archive".r, LegacyMultiJobParser.Archive(null, null, 500)) - codec += ("com.twitter.flockdb.jobs.multi.Unarchive".r, LegacyMultiJobParser.Unarchive(null, null, 500)) - codec += ("com.twitter.flockdb.jobs.multi.RemoveAll".r, LegacyMultiJobParser.RemoveAll(null, null, 500)) - codec += ("com.twitter.flockdb.jobs.multi.Negate".r, LegacyMultiJobParser.Negate(null, null, 500)) + codec += ("com.twitter.flockdb.jobs.single.Add".r, LegacySingleJobParser.Add(null, null, NoOpFilter)) + codec += ("com.twitter.flockdb.jobs.single.Remove".r, LegacySingleJobParser.Remove(null, null, NoOpFilter)) + codec += ("com.twitter.flockdb.jobs.single.Negate".r, LegacySingleJobParser.Negate(null, null, NoOpFilter)) + codec += ("com.twitter.flockdb.jobs.single.Archive".r, LegacySingleJobParser.Archive(null, null, NoOpFilter)) + codec += ("com.twitter.flockdb.jobs.multi.Archive".r, LegacyMultiJobParser.Archive(null, null, NoOpFilter, 500)) + codec += ("com.twitter.flockdb.jobs.multi.Unarchive".r, LegacyMultiJobParser.Unarchive(null, null, NoOpFilter, 500)) + codec += ("com.twitter.flockdb.jobs.multi.RemoveAll".r, LegacyMultiJobParser.RemoveAll(null, null, NoOpFilter, 500)) + codec += ("com.twitter.flockdb.jobs.multi.Negate".r, LegacyMultiJobParser.Negate(null, null, NoOpFilter, 500)) "LegacySingleJobParser" should { "correctly generate a new style job from an old serialized Add job" in { @@ -51,7 +51,7 @@ class LegacyJobParserSpec extends ConfiguredSpecification { ) ) - codec.inflate(map) mustEqual new Single(22, 1, 11, 1111, State.Normal, updatedAt, null, null) + codec.inflate(map) mustEqual new Single(22, 1, 11, 1111, State.Normal, updatedAt, null, null, NoOpFilter) } "correctly generate a new style job from an old serialized Remove job" in { @@ -65,7 +65,7 @@ class LegacyJobParserSpec extends ConfiguredSpecification { ) ) - codec.inflate(map) mustEqual new Single(22, 1, 11, 1111, State.Removed, updatedAt, null, null) + codec.inflate(map) mustEqual new Single(22, 1, 11, 1111, State.Removed, updatedAt, null, null, NoOpFilter) } "correctly generate a new style job from an old serialized Negate job" in { @@ -79,7 +79,7 @@ class LegacyJobParserSpec extends ConfiguredSpecification { ) ) - codec.inflate(map) mustEqual new Single(22, 1, 11, 1111, State.Negative, updatedAt, null, null) + codec.inflate(map) mustEqual new Single(22, 1, 11, 1111, State.Negative, updatedAt, null, null, NoOpFilter) } "correctly generate a new style job from an old serialized Archive job" in { @@ -93,7 +93,7 @@ class LegacyJobParserSpec extends ConfiguredSpecification { ) ) - codec.inflate(map) mustEqual new Single(22, 1, 11, 1111, State.Archived, updatedAt, null, null) + codec.inflate(map) mustEqual new Single(22, 1, 11, 1111, State.Archived, updatedAt, null, null, NoOpFilter) } } @@ -109,7 +109,7 @@ class LegacyJobParserSpec extends ConfiguredSpecification { ) ) - val job = new Multi(22, 1, Direction.Forward, State.Archived, updatedAt, Priority.Low, 500, null, null) + val job = new Multi(22, 1, Direction.Forward, State.Archived, updatedAt, Priority.Low, 500, null, null, NoOpFilter) codec.inflate(map) mustEqual job } @@ -125,7 +125,7 @@ class LegacyJobParserSpec extends ConfiguredSpecification { ) ) - val job = new Multi(22, 1, Direction.Forward, State.Normal, updatedAt, Priority.Low, 500, null, null) + val job = new Multi(22, 1, Direction.Forward, State.Normal, updatedAt, Priority.Low, 500, null, null, NoOpFilter) codec.inflate(map) mustEqual job } @@ -141,7 +141,7 @@ class LegacyJobParserSpec extends ConfiguredSpecification { ) ) - val job = new Multi(22, 1, Direction.Forward, State.Removed, updatedAt, Priority.Low, 500, null, null) + val job = new Multi(22, 1, Direction.Forward, State.Removed, updatedAt, Priority.Low, 500, null, null, NoOpFilter) codec.inflate(map) mustEqual job } @@ -157,7 +157,7 @@ class LegacyJobParserSpec extends ConfiguredSpecification { ) ) - val job = new Multi(22, 1, Direction.Forward, State.Negative, updatedAt, Priority.Low, 500, null, null) + val job = new Multi(22, 1, Direction.Forward, State.Negative, updatedAt, Priority.Low, 500, null, null, NoOpFilter) codec.inflate(map) mustEqual job }