Skip to content

Commit

Permalink
Merge pull request #285 from wajam/clement-change-master
Browse files Browse the repository at this point in the history
Implement changeMasterServiceMember in ZookeeperConsistencyPersistence
  • Loading branch information
clementgarnier committed Feb 24, 2014
2 parents c750a9b + 482a31e commit f09dac7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 3 deletions.
Expand Up @@ -27,6 +27,12 @@ trait ZookeeperTestHelpers {
zk.ensureAllExists(votePath, "", CreateMode.PERSISTENT)
}

def zkGetServiceMember(service: Service, token: Long): ServiceMember = {
val path = ZookeeperClusterManager.zkMemberPath(service.name, token)
val node = zk.getString(path)
ServiceMember.fromString(node)
}

def zkDeleteServiceMember(service: Service, serviceMember: ServiceMember) {
val path = ZookeeperClusterManager.zkMemberPath(service.name, serviceMember.token)
zk.deleteRecursive(path)
Expand Down
Expand Up @@ -12,6 +12,7 @@ import com.wajam.nrv.cluster.{Cluster, LocalNode, Node}
import com.wajam.nrv.zookeeper.{ZookeeperTestHelpers, ZookeeperClient}
import com.wajam.nrv.zookeeper.cluster.ZookeeperClusterManager
import com.wajam.commons.ControlableCurrentTime
import org.scalatest.concurrent.Eventually

@RunWith(classOf[JUnitRunner])
class TestZookeeperConsistencyPersistence extends FlatSpec with BeforeAndAfter with Eventually {
Expand Down Expand Up @@ -265,4 +266,34 @@ class TestZookeeperConsistencyPersistence extends FlatSpec with BeforeAndAfter w
checkCachedAndPersistedLagValues(service, token, slave, newLag)
}
}

it should "change the master service member if the node provided is a slave on this shard" in new Builder {
consistency.start()

val token = 0

val node = new Node("localhost", Map("nrv" -> 12346))
val serviceMember = new ServiceMember(token, node)

consistency.changeMasterServiceMember(token, node)

zkGetServiceMember(service, token) should be(serviceMember)

eventually {
service.getMemberAtToken(token) should be(Some(serviceMember))
}
}

it should "NOT change the master service member if the node provided is NOT a slave on this shard" in new Builder {
consistency.start()

val token = 0

val node = new Node("localhost", Map("nrv" -> 12347))
val serviceMember = new ServiceMember(token, node)

intercept[IllegalArgumentException] {
consistency.changeMasterServiceMember(token, node)
}
}
}
Expand Up @@ -166,15 +166,28 @@ class ZookeeperConsistencyPersistence(zk: ZookeeperClient, service: Service, upd
}
}

override def replicationLagSeconds(token: Long, node: Node) = {
def replicationLagSeconds(token: Long, node: Node) = {
lagMapAgent().get((token, node))
}

override def updateReplicationLagSeconds(token: Long, node: Node, lag: Int): Unit = {
def updateReplicationLagSeconds(token: Long, node: Node, lag: Int): Unit = {
lagMapAgent.send(persistReplicationLag(token, node, lag) _)
}

def changeMasterServiceMember(token: Long, node: Node) = Unit
def changeMasterServiceMember(token: Long, node: Node): Unit = {
import ZookeeperClient.string2bytes

replicasMapping.get(token) match {
case Some(nodes) if nodes.contains(node) => {
val path = ZookeeperClusterManager.zkMemberPath(service.name, token)
val serviceMember = new ServiceMember(token, new Node(node.hostname, Map("nrv" -> node.ports("nrv"))))

zk.set(path, serviceMember.toString)
}
case Some(_) => throw new IllegalArgumentException("Node is not a slave on this shard")
case None => throw new IllegalArgumentException("Token not found")
}
}

private def mappingFuture = mappingFetcher.ask(Fetch).mapTo[(Int, ReplicasMapping)]

Expand Down

0 comments on commit f09dac7

Please sign in to comment.