Skip to content

Commit

Permalink
ISPN-1826 Update view after all addresses have been removed
Browse files Browse the repository at this point in the history
  • Loading branch information
galderz committed Feb 3, 2012
1 parent 242187a commit 5932dc2
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 30 deletions.
Expand Up @@ -23,7 +23,6 @@ import org.jboss.netty.buffer.ChannelBuffer
import org.infinispan.Cache
import org.infinispan.manager.EmbeddedCacheManager
import org.infinispan.remoting.transport.Address
import java.util.concurrent.atomic.AtomicInteger

/**
* This class represents the work to be done by an encoder of a particular
Expand All @@ -38,7 +37,7 @@ abstract class AbstractVersionedEncoder {
* Write the header to the given channel buffer
*/
def writeHeader(r: Response, buf: ChannelBuffer,
addressCache: Cache[Address, ServerAddress], viewId: AtomicInteger)
addressCache: Cache[Address, ServerAddress], server: HotRodServer)

/**
* Write operation response using the given channel buffer
Expand Down
Expand Up @@ -36,7 +36,7 @@ import org.infinispan.context.Flag
* @since 5.1
*/
@Listener(sync = false) // Use a separate thread to avoid blocking the view handler thread
class CrashedMemberDetectorListener(cache: Cache[Address, ServerAddress]) extends Log {
class CrashedMemberDetectorListener(cache: Cache[Address, ServerAddress], server: HotRodServer) extends Log {

// Let all nodes remove the address from their own cache locally. By doing
// this, we can guarantee that transport view id has been updated before
Expand All @@ -56,13 +56,21 @@ class CrashedMemberDetectorListener(cache: Cache[Address, ServerAddress]) extend
val newMembers = collectionAsScalaIterable(e.getNewMembers)
val oldMembers = collectionAsScalaIterable(e.getOldMembers)
val goneMembers = oldMembers.filterNot(newMembers contains _)
// Consider doing removeAsync and then waiting for all removals...
goneMembers.foreach { addr =>
trace("Remove %s from address cache", addr)
addressCache.remove(addr)
}
updateViewdId(e)
} catch {
case t: Throwable => logErrorDetectingCrashedMember(t)
}
}

protected def updateViewdId(e: ViewChangedEvent) {
// Multiple members could leave at the same time, so delay any view id
// updates until the all addresses have been removed from memory.
server.setViewId(e.getViewId)
}

}
Expand Up @@ -41,8 +41,8 @@ import java.util.concurrent.atomic.AtomicInteger
object Encoder10 extends AbstractVersionedEncoder with Constants with Log {

override def writeHeader(r: Response, buf: ChannelBuffer,
addressCache: Cache[Address, ServerAddress], viewId: AtomicInteger) {
val topologyResp = getTopologyResponse(r, addressCache, viewId)
addressCache: Cache[Address, ServerAddress], server: HotRodServer) {
val topologyResp = getTopologyResponse(r, addressCache, server)
buf.writeByte(MAGIC_RES.byteValue)
writeUnsignedLong(r.messageId, buf)
buf.writeByte(r.operation.id.byteValue)
Expand Down Expand Up @@ -113,12 +113,12 @@ object Encoder10 extends AbstractVersionedEncoder with Constants with Log {
}

def getTopologyResponse(r: Response, addressCache: Cache[Address, ServerAddress],
viewId: AtomicInteger): AbstractTopologyResponse = {
server: HotRodServer): AbstractTopologyResponse = {
// If clustered, set up a cache for topology information
if (addressCache != null) {
r.clientIntel match {
case 2 | 3 => {
val lastViewId = viewId.get()
val lastViewId = server.getViewId
if (r.topologyId != lastViewId) {
val cache = getCacheInstance(r.cacheName, addressCache.getCacheManager)
val config = cache.getConfiguration
Expand Down
Expand Up @@ -40,8 +40,8 @@ object Encoder11 extends AbstractVersionedEncoder with Constants with Log {
val encoder10 = Encoder10

override def writeHeader(r: Response, buf: ChannelBuffer,
addressCache: Cache[Address, ServerAddress], viewId: AtomicInteger) {
val topologyResp = getTopologyResponse(r, addressCache, viewId)
addressCache: Cache[Address, ServerAddress], server: HotRodServer) {
val topologyResp = getTopologyResponse(r, addressCache, server)
buf.writeByte(MAGIC_RES.byteValue)
writeUnsignedLong(r.messageId, buf)
buf.writeByte(r.operation.id.byteValue)
Expand All @@ -67,12 +67,12 @@ object Encoder11 extends AbstractVersionedEncoder with Constants with Log {
encoder10.writeResponse(this, r, buf, cacheManager)

def getTopologyResponse(r: Response, addressCache: Cache[Address, ServerAddress],
viewId: AtomicInteger): AbstractTopologyResponse = {
server: HotRodServer): AbstractTopologyResponse = {
// If clustered, set up a cache for topology information
if (addressCache != null) {
r.clientIntel match {
case 2 | 3 => {
val lastViewId = viewId.get()
val lastViewId = server.getViewId
if (r.topologyId != lastViewId) {
val cache = getCacheInstance(r.cacheName, addressCache.getCacheManager)
val config = cache.getConfiguration
Expand Down
Expand Up @@ -30,15 +30,14 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
import org.jboss.netty.channel.Channel
import org.infinispan.server.core.transport.ExtendedChannelBuffer._
import org.infinispan.remoting.transport.Address
import java.util.concurrent.atomic.AtomicInteger

/**
* Hot Rod specific encoder.
*
* @author Galder Zamarreño
* @since 4.1
*/
class HotRodEncoder(cacheManager: EmbeddedCacheManager, viewId: AtomicInteger)
class HotRodEncoder(cacheManager: EmbeddedCacheManager, server: HotRodServer)
extends OneToOneEncoder with Constants with Log {
import HotRodServer._

Expand All @@ -59,7 +58,7 @@ class HotRodEncoder(cacheManager: EmbeddedCacheManager, viewId: AtomicInteger)
}

r.version match {
case VERSION_10 | VERSION_11 => encoder.writeHeader(r, buf, addressCache, viewId)
case VERSION_10 | VERSION_11 => encoder.writeHeader(r, buf, addressCache, server)
// if error before reading version, don't send any topology changes
// cos the encoding might vary from one version to the other
case 0 => encoder.writeHeader(r, buf, null, null)
Expand Down
Expand Up @@ -58,11 +58,15 @@ class HotRodServer extends AbstractProtocolServer("HotRod") with Log {
private var address: ServerAddress = _
private var addressCache: Cache[Address, ServerAddress] = _
private var topologyUpdateTimeout: Long = _
private var viewId: AtomicInteger = _
private var viewId: Int = _

def getAddress: ServerAddress = address

override def getEncoder = new HotRodEncoder(getCacheManager, viewId)
def getViewId: Int = viewId

def setViewId(viewId: Int) = this.viewId = viewId

override def getEncoder = new HotRodEncoder(getCacheManager, this)

override def getDecoder : HotRodDecoder = {
val hotRodDecoder = new HotRodDecoder(getCacheManager, transport)
Expand All @@ -75,8 +79,6 @@ class HotRodServer extends AbstractProtocolServer("HotRod") with Log {
val defaultPort = 11222
isClustered = cacheManager.getGlobalConfiguration.getTransportClass != null
if (isClustered) {
viewId = new AtomicInteger()

val typedProps = TypedProperties.toTypedProperties(properties)
defineTopologyCacheConfig(cacheManager, typedProps)
// Retrieve host and port early on to populate topology cache
Expand Down Expand Up @@ -120,7 +122,7 @@ class HotRodServer extends AbstractProtocolServer("HotRod") with Log {
addressCache.getAdvancedCache.getRpcManager.getTransport))
clusterAddress = cacheManager.getAddress
address = new ServerAddress(host, port)
cacheManager.addListener(new CrashedMemberDetectorListener(addressCache))
cacheManager.addListener(new CrashedMemberDetectorListener(addressCache, this))
// Map cluster address to server endpoint address
debug("Map %s cluster address with %s server endpoint in address cache", clusterAddress, address)
addressCache.put(clusterAddress, address)
Expand Down Expand Up @@ -166,23 +168,19 @@ class HotRodServer extends AbstractProtocolServer("HotRod") with Log {
/**
* Listener that provides guarantees for view id updates. So, a view id will
* only be considered to have changed once the address cache has been
* updated to add or remove an address from the cache. That way, when the
* encoder makes the view id comparison (client provided vs server side
* view id), it has the guarantees that the address cache has already been
* updated.
* updated to add an address from the cache. That way, when the encoder
* makes the view id comparison (client provided vs server side view id),
* it has the guarantees that the address cache has already been updated.
*/
@Listener
class ViewIdUpdater(transport: Transport) {

@CacheEntryCreated @CacheEntryRemoved
@CacheEntryCreated
def viewIdUpdate(event: CacheEntryEvent[Address, ServerAddress]) {
// Only update view id once cache has been updated
if (!event.isPre) {
val localViewId = transport.getViewId
// Could this be a lazySet? We want the value to eventually be set,
// clients could wait a little bit for the view id to be updated
// on the server side...
viewId.set(localViewId)
viewId = localViewId
if (isTraceEnabled) {
log.tracef("Address cache had %s for key %s. View id is now %d",
event.getType, event.getKey, localViewId)
Expand Down
Expand Up @@ -22,12 +22,12 @@ package org.infinispan.server.hotrod
import org.testng.annotations.Test
import org.infinispan.test.fwk.TestCacheManagerFactory
import org.infinispan.remoting.transport.Address
import org.infinispan.notifications.cachemanagerlistener.event.EventImpl
import org.infinispan.notifications.cachemanagerlistener.event.Event.Type
import org.infinispan.distribution.TestAddress
import java.util.ArrayList
import org.testng.AssertJUnit._
import org.infinispan.test.SingleCacheManagerTest
import org.infinispan.notifications.cachemanagerlistener.event.{ViewChangedEvent, EventImpl}

/**
* Tests crashed or stopped member logic.
Expand All @@ -47,7 +47,11 @@ class CrashedMemberDetectorTest extends SingleCacheManagerTest {
cache.put(new TestAddress(2), new ServerAddress("b", 456))
cache.put(new TestAddress(3), new ServerAddress("c", 789))

val detector = new CrashedMemberDetectorListener(cache)
val detector = new CrashedMemberDetectorListener(cache, null) {
override protected def updateViewdId(e: ViewChangedEvent) = {
// Do nothing...
}
}

val oldMembers = new ArrayList[Address]()
oldMembers.add(new TestAddress(1))
Expand Down

0 comments on commit 5932dc2

Please sign in to comment.