From 5932dc266fc2c5aa92d13d5c6ecc01d479392b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Galder=20Zamarre=C3=B1o?= Date: Fri, 3 Feb 2012 10:36:15 +0100 Subject: [PATCH] ISPN-1826 Update view after all addresses have been removed --- .../hotrod/AbstractVersionedEncoder.scala | 3 +-- .../CrashedMemberDetectorListener.scala | 10 ++++++- .../infinispan/server/hotrod/Encoder10.scala | 8 +++--- .../infinispan/server/hotrod/Encoder11.scala | 8 +++--- .../server/hotrod/HotRodEncoder.scala | 5 ++-- .../server/hotrod/HotRodServer.scala | 26 +++++++++---------- .../hotrod/CrashedMemberDetectorTest.scala | 8 ++++-- 7 files changed, 38 insertions(+), 30 deletions(-) diff --git a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedEncoder.scala b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedEncoder.scala index 5eea489abfc5..3c68d64b40cd 100644 --- a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedEncoder.scala +++ b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedEncoder.scala @@ -23,7 +23,6 @@ import org.jboss.netty.buffer.ChannelBuffer import org.infinispan.Cache import org.infinispan.manager.EmbeddedCacheManager import org.infinispan.remoting.transport.Address -import java.util.concurrent.atomic.AtomicInteger /** * This class represents the work to be done by an encoder of a particular @@ -38,7 +37,7 @@ abstract class AbstractVersionedEncoder { * Write the header to the given channel buffer */ def writeHeader(r: Response, buf: ChannelBuffer, - addressCache: Cache[Address, ServerAddress], viewId: AtomicInteger) + addressCache: Cache[Address, ServerAddress], server: HotRodServer) /** * Write operation response using the given channel buffer diff --git a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CrashedMemberDetectorListener.scala b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CrashedMemberDetectorListener.scala index 5c5ef2a399c4..4a86554c43eb 100644 --- a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CrashedMemberDetectorListener.scala +++ b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CrashedMemberDetectorListener.scala @@ -36,7 +36,7 @@ import org.infinispan.context.Flag * @since 5.1 */ @Listener(sync = false) // Use a separate thread to avoid blocking the view handler thread -class CrashedMemberDetectorListener(cache: Cache[Address, ServerAddress]) extends Log { +class CrashedMemberDetectorListener(cache: Cache[Address, ServerAddress], server: HotRodServer) extends Log { // Let all nodes remove the address from their own cache locally. By doing // this, we can guarantee that transport view id has been updated before @@ -56,13 +56,21 @@ class CrashedMemberDetectorListener(cache: Cache[Address, ServerAddress]) extend val newMembers = collectionAsScalaIterable(e.getNewMembers) val oldMembers = collectionAsScalaIterable(e.getOldMembers) val goneMembers = oldMembers.filterNot(newMembers contains _) + // Consider doing removeAsync and then waiting for all removals... goneMembers.foreach { addr => trace("Remove %s from address cache", addr) addressCache.remove(addr) } + updateViewdId(e) } catch { case t: Throwable => logErrorDetectingCrashedMember(t) } } + protected def updateViewdId(e: ViewChangedEvent) { + // Multiple members could leave at the same time, so delay any view id + // updates until the all addresses have been removed from memory. + server.setViewId(e.getViewId) + } + } \ No newline at end of file diff --git a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder10.scala b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder10.scala index 1c0cba1ace83..4c132eccec75 100644 --- a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder10.scala +++ b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder10.scala @@ -41,8 +41,8 @@ import java.util.concurrent.atomic.AtomicInteger object Encoder10 extends AbstractVersionedEncoder with Constants with Log { override def writeHeader(r: Response, buf: ChannelBuffer, - addressCache: Cache[Address, ServerAddress], viewId: AtomicInteger) { - val topologyResp = getTopologyResponse(r, addressCache, viewId) + addressCache: Cache[Address, ServerAddress], server: HotRodServer) { + val topologyResp = getTopologyResponse(r, addressCache, server) buf.writeByte(MAGIC_RES.byteValue) writeUnsignedLong(r.messageId, buf) buf.writeByte(r.operation.id.byteValue) @@ -113,12 +113,12 @@ object Encoder10 extends AbstractVersionedEncoder with Constants with Log { } def getTopologyResponse(r: Response, addressCache: Cache[Address, ServerAddress], - viewId: AtomicInteger): AbstractTopologyResponse = { + server: HotRodServer): AbstractTopologyResponse = { // If clustered, set up a cache for topology information if (addressCache != null) { r.clientIntel match { case 2 | 3 => { - val lastViewId = viewId.get() + val lastViewId = server.getViewId if (r.topologyId != lastViewId) { val cache = getCacheInstance(r.cacheName, addressCache.getCacheManager) val config = cache.getConfiguration diff --git a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder11.scala b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder11.scala index d783809911d3..55ff412334f8 100644 --- a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder11.scala +++ b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder11.scala @@ -40,8 +40,8 @@ object Encoder11 extends AbstractVersionedEncoder with Constants with Log { val encoder10 = Encoder10 override def writeHeader(r: Response, buf: ChannelBuffer, - addressCache: Cache[Address, ServerAddress], viewId: AtomicInteger) { - val topologyResp = getTopologyResponse(r, addressCache, viewId) + addressCache: Cache[Address, ServerAddress], server: HotRodServer) { + val topologyResp = getTopologyResponse(r, addressCache, server) buf.writeByte(MAGIC_RES.byteValue) writeUnsignedLong(r.messageId, buf) buf.writeByte(r.operation.id.byteValue) @@ -67,12 +67,12 @@ object Encoder11 extends AbstractVersionedEncoder with Constants with Log { encoder10.writeResponse(this, r, buf, cacheManager) def getTopologyResponse(r: Response, addressCache: Cache[Address, ServerAddress], - viewId: AtomicInteger): AbstractTopologyResponse = { + server: HotRodServer): AbstractTopologyResponse = { // If clustered, set up a cache for topology information if (addressCache != null) { r.clientIntel match { case 2 | 3 => { - val lastViewId = viewId.get() + val lastViewId = server.getViewId if (r.topologyId != lastViewId) { val cache = getCacheInstance(r.cacheName, addressCache.getCacheManager) val config = cache.getConfiguration diff --git a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala index 9362639212cd..2c49e6c2cfbe 100644 --- a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala +++ b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala @@ -30,7 +30,6 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder import org.jboss.netty.channel.Channel import org.infinispan.server.core.transport.ExtendedChannelBuffer._ import org.infinispan.remoting.transport.Address -import java.util.concurrent.atomic.AtomicInteger /** * Hot Rod specific encoder. @@ -38,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger * @author Galder ZamarreƱo * @since 4.1 */ -class HotRodEncoder(cacheManager: EmbeddedCacheManager, viewId: AtomicInteger) +class HotRodEncoder(cacheManager: EmbeddedCacheManager, server: HotRodServer) extends OneToOneEncoder with Constants with Log { import HotRodServer._ @@ -59,7 +58,7 @@ class HotRodEncoder(cacheManager: EmbeddedCacheManager, viewId: AtomicInteger) } r.version match { - case VERSION_10 | VERSION_11 => encoder.writeHeader(r, buf, addressCache, viewId) + case VERSION_10 | VERSION_11 => encoder.writeHeader(r, buf, addressCache, server) // if error before reading version, don't send any topology changes // cos the encoding might vary from one version to the other case 0 => encoder.writeHeader(r, buf, null, null) diff --git a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala index 9b784edc537a..704d2278d085 100644 --- a/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala +++ b/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala @@ -58,11 +58,15 @@ class HotRodServer extends AbstractProtocolServer("HotRod") with Log { private var address: ServerAddress = _ private var addressCache: Cache[Address, ServerAddress] = _ private var topologyUpdateTimeout: Long = _ - private var viewId: AtomicInteger = _ + private var viewId: Int = _ def getAddress: ServerAddress = address - override def getEncoder = new HotRodEncoder(getCacheManager, viewId) + def getViewId: Int = viewId + + def setViewId(viewId: Int) = this.viewId = viewId + + override def getEncoder = new HotRodEncoder(getCacheManager, this) override def getDecoder : HotRodDecoder = { val hotRodDecoder = new HotRodDecoder(getCacheManager, transport) @@ -75,8 +79,6 @@ class HotRodServer extends AbstractProtocolServer("HotRod") with Log { val defaultPort = 11222 isClustered = cacheManager.getGlobalConfiguration.getTransportClass != null if (isClustered) { - viewId = new AtomicInteger() - val typedProps = TypedProperties.toTypedProperties(properties) defineTopologyCacheConfig(cacheManager, typedProps) // Retrieve host and port early on to populate topology cache @@ -120,7 +122,7 @@ class HotRodServer extends AbstractProtocolServer("HotRod") with Log { addressCache.getAdvancedCache.getRpcManager.getTransport)) clusterAddress = cacheManager.getAddress address = new ServerAddress(host, port) - cacheManager.addListener(new CrashedMemberDetectorListener(addressCache)) + cacheManager.addListener(new CrashedMemberDetectorListener(addressCache, this)) // Map cluster address to server endpoint address debug("Map %s cluster address with %s server endpoint in address cache", clusterAddress, address) addressCache.put(clusterAddress, address) @@ -166,23 +168,19 @@ class HotRodServer extends AbstractProtocolServer("HotRod") with Log { /** * Listener that provides guarantees for view id updates. So, a view id will * only be considered to have changed once the address cache has been - * updated to add or remove an address from the cache. That way, when the - * encoder makes the view id comparison (client provided vs server side - * view id), it has the guarantees that the address cache has already been - * updated. + * updated to add an address from the cache. That way, when the encoder + * makes the view id comparison (client provided vs server side view id), + * it has the guarantees that the address cache has already been updated. */ @Listener class ViewIdUpdater(transport: Transport) { - @CacheEntryCreated @CacheEntryRemoved + @CacheEntryCreated def viewIdUpdate(event: CacheEntryEvent[Address, ServerAddress]) { // Only update view id once cache has been updated if (!event.isPre) { val localViewId = transport.getViewId - // Could this be a lazySet? We want the value to eventually be set, - // clients could wait a little bit for the view id to be updated - // on the server side... - viewId.set(localViewId) + viewId = localViewId if (isTraceEnabled) { log.tracef("Address cache had %s for key %s. View id is now %d", event.getType, event.getKey, localViewId) diff --git a/server/hotrod/src/test/scala/org/infinispan/server/hotrod/CrashedMemberDetectorTest.scala b/server/hotrod/src/test/scala/org/infinispan/server/hotrod/CrashedMemberDetectorTest.scala index bd6617b9ba38..593ad66926af 100644 --- a/server/hotrod/src/test/scala/org/infinispan/server/hotrod/CrashedMemberDetectorTest.scala +++ b/server/hotrod/src/test/scala/org/infinispan/server/hotrod/CrashedMemberDetectorTest.scala @@ -22,12 +22,12 @@ package org.infinispan.server.hotrod import org.testng.annotations.Test import org.infinispan.test.fwk.TestCacheManagerFactory import org.infinispan.remoting.transport.Address -import org.infinispan.notifications.cachemanagerlistener.event.EventImpl import org.infinispan.notifications.cachemanagerlistener.event.Event.Type import org.infinispan.distribution.TestAddress import java.util.ArrayList import org.testng.AssertJUnit._ import org.infinispan.test.SingleCacheManagerTest +import org.infinispan.notifications.cachemanagerlistener.event.{ViewChangedEvent, EventImpl} /** * Tests crashed or stopped member logic. @@ -47,7 +47,11 @@ class CrashedMemberDetectorTest extends SingleCacheManagerTest { cache.put(new TestAddress(2), new ServerAddress("b", 456)) cache.put(new TestAddress(3), new ServerAddress("c", 789)) - val detector = new CrashedMemberDetectorListener(cache) + val detector = new CrashedMemberDetectorListener(cache, null) { + override protected def updateViewdId(e: ViewChangedEvent) = { + // Do nothing... + } + } val oldMembers = new ArrayList[Address]() oldMembers.add(new TestAddress(1))