Skip to content

Commit

Permalink
Avoid potential race while fetching a replication lag value
Browse files Browse the repository at this point in the history
  • Loading branch information
Clément GARNIER committed Feb 21, 2014
1 parent 14cd346 commit 7fe26b9
Showing 1 changed file with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,32 +83,38 @@ class ZookeeperConsistencyPersistence(zk: ZookeeperClient, service: Service, upd
case Some(value) =>
currentLagMap + ((token, slave) -> value)
case None =>
currentLagMap
currentLagMap - ((token, slave))
}
}

// Fetch replication lag for a given slave, watch for changes, and for node creation if doesn't exist
private def fetchReplicationLagValue(token: Long, slave: Node): Option[Int] = {
val path = ZookeeperClusterManager.zkMemberReplicaLagPath(service.name, token, slave)

try {
val callback = lagValueChangedCallbacks.get(path).getOrElse {
val newCallback = (e: NodeValueChanged) => lagMapAgent.send(fetchReplicationLag(token, slave) _)
lagValueChangedCallbacks += (path -> newCallback)
newCallback
}
Some(zk.getInt(path, Some(callback)))
} catch {
case e: KeeperException if e.code == Code.NONODE =>
// Zookeeper node doesn't exist: register a callback triggered when it will be created
val callback = lagStatusChangedCallbacks.get(path).getOrElse {
val newCallback = (e: NodeStatusChanged) => lagMapAgent.send(fetchReplicationLag(token, slave) _)
lagStatusChangedCallbacks += (path -> newCallback)
// Register a callback triggered when the Zookeeper node will be either created or deleted
val statusCallback = lagStatusChangedCallbacks.get(path).getOrElse {
val newCallback = (e: NodeStatusChanged) => lagMapAgent.send(fetchReplicationLag(token, slave) _)
lagStatusChangedCallbacks += (path -> newCallback)
newCallback
}
zk.exists(path, Some(statusCallback)) match {
case true => {
val valueCallback = lagValueChangedCallbacks.get(path).getOrElse {
val newCallback = (e: NodeValueChanged) => lagMapAgent.send(fetchReplicationLag(token, slave) _)
lagValueChangedCallbacks += (path -> newCallback)
newCallback
}
zk.exists(path, Some(callback))
None
case e: Throwable => throw e

try {
Some(zk.getInt(path, Some(valueCallback)))
} catch {
case e: KeeperException if e.code == Code.NONODE =>
// Node has been deleted between exists() and getInt() calls: will be handled by the status callback
None
case e: Throwable => throw e
}
}
case false => None
}
}

Expand Down

0 comments on commit 7fe26b9

Please sign in to comment.