Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Implement Zookeeper-based job filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
John Corwin committed Mar 1, 2012
1 parent 0f6c8d2 commit 2a40641
Show file tree
Hide file tree
Showing 21 changed files with 372 additions and 110 deletions.
6 changes: 6 additions & 0 deletions config/development.scala
Expand Up @@ -124,6 +124,8 @@ new FlockDB {
httpPort = Some(9990)
}

override def zooKeeperSettings = Some(ZooKeeperSettings("localhost:2181", "/twitter/flock"))

loggers = List(new LoggerConfig {
level = Some(Level.INFO)
handlers = List(
Expand All @@ -136,5 +138,9 @@ new FlockDB {
}
}
)
}, new LoggerConfig {
node = "filtered_jobs"
useParents = true
level = Level.INFO
})
}
9 changes: 9 additions & 0 deletions project/build/FlockDBProject.scala
Expand Up @@ -11,6 +11,15 @@ with SubversionPublisher {
val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1"

val gizzard = "com.twitter" % "gizzard" % "3.0.0-beta29"
val zookeeper = "org.apache.zookeeper" % "zookeeper" % "3.3.4"
val commonZk = "com.twitter.common" % "zookeeper" % "0.0.31"

This comment has been minimized.

Copy link
@jsirois

jsirois Mar 1, 2012

See http://twitter.github.com/commons/apidocs/#com.twitter.common.zookeeper.ZooKeeperClient - the fat zookeeper jar is now broken up into components. the twitter-commons docs are stale by a few days too, and you may want to check http://maven.twttr.com directly for latest revs of each or use the internal apidocs site which has up2date versions.


override def ivyXML =
<dependencies>
<exclude org="com.sun.jmx" module="jmxri" />
<exclude org="com.sun.jdmk" module="jmxtools" />
<exclude org="javax.jms" module="jms" />
</dependencies>

val asm = "asm" % "asm" % "1.5.3" % "test"
val cglib = "cglib" % "cglib" % "2.2" % "test"
Expand Down
9 changes: 5 additions & 4 deletions src/main/scala/com/twitter/flockdb/Edge.scala
Expand Up @@ -33,11 +33,11 @@ case class Edge(sourceId: Long, destinationId: Long, position: Long, updatedAtSe

val updatedAt = Time.fromSeconds(updatedAtSeconds)

def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int) = {
scheduler.put(priority, toJob(tableId, forwardingManager))
def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler, priority: Int, jobFilter: JobFilter) = {
scheduler.put(priority, toJob(tableId, forwardingManager, jobFilter))
}

def toJob(tableId: Int, forwardingManager: ForwardingManager) = {
def toJob(tableId: Int, forwardingManager: ForwardingManager, jobFilter: JobFilter) = {
new Single(
sourceId,
tableId,
Expand All @@ -46,7 +46,8 @@ case class Edge(sourceId: Long, destinationId: Long, position: Long, updatedAtSe
state,
updatedAt,
forwardingManager,
OrderedUuidGenerator
OrderedUuidGenerator,
jobFilter
)
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/twitter/flockdb/EdgesService.scala
Expand Up @@ -31,14 +31,15 @@ import thrift.FlockException
class EdgesService(
forwardingManager: ForwardingManager,
schedule: PrioritizingJobScheduler,
jobFilter: JobFilter,
future: Future,
intersectionQueryConfig: config.IntersectionQuery,
aggregateJobsPageSize: Int) {

private val log = Logger.get(getClass.getName)
private val exceptionLog = Logger.get("exception")
private val selectCompiler = new SelectCompiler(forwardingManager, intersectionQueryConfig)
private var executeCompiler = new ExecuteCompiler(schedule, forwardingManager, aggregateJobsPageSize)
private var executeCompiler = new ExecuteCompiler(schedule, forwardingManager, jobFilter, aggregateJobsPageSize)

def shutdown() {
schedule.shutdown()
Expand Down
47 changes: 37 additions & 10 deletions src/main/scala/com/twitter/flockdb/FlockDB.scala
Expand Up @@ -16,6 +16,9 @@

package com.twitter.flockdb

import com.twitter.common.net.InetSocketAddressHelper
import com.twitter.common.quantity.Amount
import com.twitter.common.zookeeper.{ZooKeeperClient, ZooKeeperUtils}
import com.twitter.util.Duration
import com.twitter.ostrich.admin.Service
import com.twitter.querulous.StatsCollector
Expand All @@ -28,6 +31,8 @@ import com.twitter.flockdb.config.{FlockDB => FlockDBConfig}


class FlockDB(config: FlockDBConfig) extends GizzardServer(config) with Service {
val FILTER_SET_ZK_PATH = "/filters"

object FlockExceptionWrappingProxyFactory extends ExceptionHandlingProxyFactory[thrift.FlockDB.Iface]({ (flock, e) =>
e match {
case _: thrift.FlockException =>
Expand All @@ -48,6 +53,27 @@ class FlockDB(config: FlockDBConfig) extends GizzardServer(config) with Service
override def addGauge(name: String)(gauge: => Double) { Stats.addGauge(name)(gauge) }
}

val zkClientAndBasePath: Option[(ZooKeeperClient, String)] =
config.zooKeeperSettings.map {
zkConfig => {
log.info("Using ZooKeeper server " + zkConfig.server)
val zkServer = InetSocketAddressHelper.parse(zkConfig.server)
val zkClient = new ZooKeeperClient(ZooKeeperUtils.DEFAULT_ZK_SESSION_TIMEOUT, zkServer)
(zkClient, zkConfig.basePath)
}
}

val jobFilter: JobFilter = {
zkClientAndBasePath match {
case Some((zkClient, zkBasePath)) => {
val filterPath = zkBasePath + FILTER_SET_ZK_PATH
log.info("Using ZooKeeperSetFilter with path " + filterPath)
new ZooKeeperSetFilter(zkClient, filterPath)
}
case None => NoOpFilter
}
}

val jobPriorities = List(Priority.Low, Priority.Medium, Priority.High).map(_.id)
val copyPriority = Priority.Medium.id

Expand Down Expand Up @@ -76,28 +102,29 @@ class FlockDB(config: FlockDBConfig) extends GizzardServer(config) with Service

val forwardingManager = new ForwardingManager(nameServer.multiTableForwarder[Shard])

jobCodec += ("single.Single".r, new jobs.single.SingleJobParser(forwardingManager, OrderedUuidGenerator))
jobCodec += ("multi.Multi".r, new jobs.multi.MultiJobParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
jobCodec += ("single.Single".r, new jobs.single.SingleJobParser(forwardingManager, OrderedUuidGenerator, jobFilter))
jobCodec += ("multi.Multi".r, new jobs.multi.MultiJobParser(forwardingManager, jobScheduler, config.aggregateJobsPageSize, jobFilter))

jobCodec += ("jobs\\.(Copy|Migrate)".r, new jobs.CopyParser(nameServer, jobScheduler(Priority.Medium.id)))
jobCodec += ("jobs\\.(MetadataCopy|MetadataMigrate)".r, new jobs.MetadataCopyParser(nameServer, jobScheduler(Priority.Medium.id)))

// XXX: remove when old tagged jobs no longer exist.
import jobs.LegacySingleJobParser
import jobs.LegacyMultiJobParser
jobCodec += ("single.Add".r, LegacySingleJobParser.Add(forwardingManager, OrderedUuidGenerator))
jobCodec += ("single.Remove".r, LegacySingleJobParser.Remove(forwardingManager, OrderedUuidGenerator))
jobCodec += ("single.Archive".r, LegacySingleJobParser.Archive(forwardingManager, OrderedUuidGenerator))
jobCodec += ("single.Negate".r, LegacySingleJobParser.Negate(forwardingManager, OrderedUuidGenerator))
jobCodec += ("multi.Archive".r, LegacyMultiJobParser.Archive(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
jobCodec += ("multi.Unarchive".r, LegacyMultiJobParser.Unarchive(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
jobCodec += ("multi.RemoveAll".r, LegacyMultiJobParser.RemoveAll(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
jobCodec += ("multi.Negate".r, LegacyMultiJobParser.Negate(forwardingManager, jobScheduler, config.aggregateJobsPageSize))
jobCodec += ("single.Add".r, LegacySingleJobParser.Add(forwardingManager, OrderedUuidGenerator, jobFilter))
jobCodec += ("single.Remove".r, LegacySingleJobParser.Remove(forwardingManager, OrderedUuidGenerator, jobFilter))
jobCodec += ("single.Archive".r, LegacySingleJobParser.Archive(forwardingManager, OrderedUuidGenerator, jobFilter))
jobCodec += ("single.Negate".r, LegacySingleJobParser.Negate(forwardingManager, OrderedUuidGenerator, jobFilter))
jobCodec += ("multi.Archive".r, LegacyMultiJobParser.Archive(forwardingManager, jobScheduler, jobFilter, config.aggregateJobsPageSize))
jobCodec += ("multi.Unarchive".r, LegacyMultiJobParser.Unarchive(forwardingManager, jobScheduler, jobFilter, config.aggregateJobsPageSize))
jobCodec += ("multi.RemoveAll".r, LegacyMultiJobParser.RemoveAll(forwardingManager, jobScheduler, jobFilter, config.aggregateJobsPageSize))
jobCodec += ("multi.Negate".r, LegacyMultiJobParser.Negate(forwardingManager, jobScheduler, jobFilter, config.aggregateJobsPageSize))

val flockService = {
val edges = new EdgesService(
forwardingManager,
jobScheduler,
jobFilter,
config.readFuture("readFuture"),
config.intersectionQuery,
config.aggregateJobsPageSize
Expand Down
88 changes: 88 additions & 0 deletions src/main/scala/com/twitter/flockdb/JobFilter.scala
@@ -0,0 +1,88 @@
package com.twitter.flockdb

import com.twitter.logging.Logger
import com.twitter.common.base.Function
import com.twitter.common.zookeeper.{ZooKeeperClient, ZooKeeperMap, ZooKeeperUtils}
import com.twitter.gizzard.Stats
import java.util.{Set => JSet}
import org.apache.zookeeper.ZooDefs
import scala.collection.JavaConversions

trait JobFilter {
/**
* Returns true iff the job with the given sourceId, destID,
* and graphId parameters should be executed.
*/
def apply(sourceId: Long, destId: Long, graphId: Int): Boolean
}

/**
* Default no-op filter. All operations pass.
*/
object NoOpFilter extends JobFilter {
def apply(sourceId: Long, destId: Long, graphId: Int): Boolean = true
}

/**
* A filter based on an independently-updated Java set.
*
* Set entries should have the form 'SOURCE_ID:DEST_ID:GRAPH_ID', each of
* which specifies either a user ID or a wildcard (*).
* A wildcard may be specified for At most one of SOURCE_ID and DEST_ID.
*
* Ex. Filter all writes from user 1234 on graph 5: "1234:*:5".
* Filter all writes to user 2345 on all graphs: "*:2345:*".
* Filter writes on edges between users 100 and 200 on all graphs: "100:200:*".
*/
class SetFilter(set: JSet[String]) extends JobFilter {
private val WILDCARD = "*"
private val FILTERS = Seq(
(true, true, true),
(true, true, false),
(true, false, true),
(false, true, true),
(false, true, false),
(true, false, false))

Stats.addGauge("active-filters") { set.size() }

def apply(sourceId: Long, destId: Long, graphId: Int): Boolean = {
def filterKey(filter: (Boolean, Boolean, Boolean)) = {
"%s:%s:%s".format(
if (filter._1) sourceId.toString else WILDCARD,
if (filter._2) destId.toString else WILDCARD,
if (filter._3) graphId.toString else WILDCARD
)
}

for (filter <- FILTERS) {
if (set.contains(filterKey(filter))) {
Stats.incr("edges-filtered")
return false
}
}
Stats.incr("edges-passed")
true
}
}

/**
* A filter based on a ZooKeeperMap. Filters are stored as node keys under the given zkPath.
* The Zookeeper node values are not used.
*/
class ZooKeeperSetFilter(zkClient: ZooKeeperClient, zkPath: String) extends JobFilter {
val log = Logger.get

private val deserializer = new Function[Array[Byte], Unit]() {
override def apply(data : Array[Byte]) = ()
}

ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, zkPath)
private val zkMap: ZooKeeperMap[Unit] = ZooKeeperMap.create(zkClient, zkPath, deserializer)
log.info("Initial filter set: " + ( JavaConversions.asScalaSet(zkMap.keySet()).mkString(", ")))

private val setFilter = new SetFilter(zkMap.keySet())

def apply(sourceId: Long, destId: Long, graphId: Int): Boolean =
setFilter(sourceId, destId, graphId)
}
1 change: 1 addition & 0 deletions src/main/scala/com/twitter/flockdb/Main.scala
Expand Up @@ -26,6 +26,7 @@ object Main {
val service = new FlockDB(config)

ServiceTracker.register(service)
log.info("Starting Flock service")
service.start()

} catch {
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/twitter/flockdb/Metadata.scala
Expand Up @@ -52,7 +52,8 @@ case class Metadata(sourceId: Long, state: State, count: Int, updatedAtSeconds:
tableId: Int,
forwardingManager: ForwardingManager,
scheduler: PrioritizingJobScheduler,
priority: Int
priority: Int,
jobFilter: JobFilter
) = {
val job = new Multi(
sourceId,
Expand All @@ -63,7 +64,8 @@ case class Metadata(sourceId: Long, state: State, count: Int, updatedAtSeconds:
Priority.Medium,
500,
forwardingManager,
scheduler
scheduler,
jobFilter
)

scheduler.put(priority, job)
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/twitter/flockdb/config/FlockDB.scala
Expand Up @@ -6,7 +6,7 @@ import com.twitter.querulous.config.{Connection, QueryEvaluator}
import com.twitter.util.TimeConversions._
import com.twitter.flockdb.queries.QueryTree
import com.twitter.flockdb.queries

import java.net.InetSocketAddress

trait FlockDBServer extends TServer {
var name = "flockdb_edges"
Expand Down Expand Up @@ -37,4 +37,7 @@ trait FlockDB extends GizzardServer {
def readFuture: Future

def adminConfig: AdminServiceConfig

case class ZooKeeperSettings(server: String, basePath: String)
def zooKeeperSettings: Option[ZooKeeperSettings] = None
}

0 comments on commit 2a40641

Please sign in to comment.