Implement Zookeeper-based job filtering #80

Open
wants to merge 7 commits into from

2 participants

@jcorwin

No description provided.

@jsirois

See http://twitter.github.com/commons/apidocs/#com.twitter.common.zookeeper.ZooKeeperClient - the fat zookeeper jar is now broken up into components. the twitter-commons docs are stale by a few days too, and you may want to check http://maven.twttr.com directly for latest revs of each or use the internal apidocs site which has up2date versions.

@rvpgithub rvpgithub commented on the diff Mar 1, 2012
config/development.scala
@@ -124,6 +124,8 @@ new FlockDB {
httpPort = Some(9990)
}
+ override def zooKeeperSettings = Some(ZooKeeperSettings("localhost:2181", "/twitter/flock"))

Seems like this path should be set in a specific instance of flockdb and not here?

@jcorwin
jcorwin added a note Mar 1, 2012

We do have a separate specification for the production config.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rvpgithub rvpgithub and 1 other commented on an outdated diff Mar 1, 2012
src/main/scala/com/twitter/flockdb/JobFilter.scala
+object NoOpFilter extends JobFilter {
+ def apply(sourceId: Long, destId: Long, graphId: Int): Boolean = true
+}
+
+/**
+ * A filter based on an independently-updated Java set.
+ *
+ * Set entries should have the form 'SOURCE_ID:DEST_ID:GRAPH_ID', each of
+ * which specifies either a user ID or a wildcard (*).
+ * A wildcard may be specified for At most one of SOURCE_ID and DEST_ID.
+ *
+ * Ex. Filter all writes from user 1234 on graph 5: "1234:*:5".
+ * Filter all writes to user 2345 on all graphs: "*:2345:*".
+ * Filter writes on edges between users 100 and 200 on all graphs: "100:200:*".
+ */
+class SetFilter(set: JSet[String]) extends JobFilter {

More natural to use a Scala set in this class?

@jcorwin
jcorwin added a note Mar 1, 2012

The zookeeper map gives us a view of a java set, so it seems easiest to just take that. I guess we could wrap it in a scala set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@stuhood stuhood commented on the diff Mar 1, 2012
src/main/scala/com/twitter/flockdb/JobFilter.scala
+/**
+ * A filter based on a ZooKeeperMap. Filters are stored as node keys under the given zkPath.
+ * The Zookeeper node values are not used.
+ */
+class ZooKeeperSetFilter(zkClient: ZooKeeperClient, zkPath: String) extends JobFilter {
+ val log = Logger.get
+
+ private val deserializer = new Function[Array[Byte], Unit]() {
+ override def apply(data : Array[Byte]) = ()
+ }
+
+ ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, zkPath)
+ private val zkMap: ZooKeeperMap[Unit] = ZooKeeperMap.create(zkClient, zkPath, deserializer)
+ log.info("Initial filter set: " + ( JavaConversions.asScalaSet(zkMap.keySet()).mkString(", ")))
+
+ private val setFilter = new SetFilter(zkMap.keySet())
@stuhood
Twitter, Inc. member
stuhood added a note Mar 1, 2012

I don't see where this is ever updated... should there be a ZK watch somewhere to refresh it?

@stuhood
Twitter, Inc. member
stuhood added a note Mar 1, 2012

John pointed out that the keySet is a mutable view, so it is automatically refreshed from ZK.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment