Skip to content

Commit

Permalink
ISPN-12177 Persist IRAC version during shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and wburns committed Oct 23, 2020
1 parent 133086b commit 971632d
Show file tree
Hide file tree
Showing 33 changed files with 946 additions and 195 deletions.
Expand Up @@ -33,6 +33,7 @@
import org.infinispan.commands.irac.IracRequestStateCommand;
import org.infinispan.commands.irac.IracStateResponseCommand;
import org.infinispan.commands.irac.IracTouchKeyCommand;
import org.infinispan.commands.irac.IracUpdateVersionCommand;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
Expand Down Expand Up @@ -88,6 +89,7 @@
import org.infinispan.commons.util.IntSet;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.versioning.irac.IracEntryVersion;
import org.infinispan.encoding.DataConversion;
import org.infinispan.expiration.impl.TouchCommand;
import org.infinispan.factories.ComponentRegistry;
Expand Down Expand Up @@ -635,7 +637,7 @@ <K, I, R> InitialPublisherCommand<K, I, R> buildInitialPublisherCommand(String r

IracCleanupKeyCommand buildIracCleanupKeyCommand(Object key, Object lockOwner, IracMetadata tombstone);

IracMetadataRequestCommand buildIracMetadataRequestCommand(int segment);
IracMetadataRequestCommand buildIracMetadataRequestCommand(int segment, IracEntryVersion versionSeen);

IracRequestStateCommand buildIracRequestStateCommand(IntSet segments);

Expand All @@ -645,4 +647,6 @@ IracPutKeyValueCommand buildIracPutKeyValueCommand(Object key, int segment, Obje
PrivateMetadata privateMetadata);

IracTouchKeyCommand buildIracTouchCommand(Object key);

IracUpdateVersionCommand buildIracUpdateVersionCommand(Map<Integer, IracEntryVersion> segmentsVersion);
}
Expand Up @@ -34,6 +34,7 @@
import org.infinispan.commands.irac.IracRequestStateCommand;
import org.infinispan.commands.irac.IracStateResponseCommand;
import org.infinispan.commands.irac.IracTouchKeyCommand;
import org.infinispan.commands.irac.IracUpdateVersionCommand;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
Expand Down Expand Up @@ -97,6 +98,7 @@
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.versioning.irac.IracEntryVersion;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.encoding.DataConversion;
import org.infinispan.expiration.impl.TouchCommand;
Expand Down Expand Up @@ -723,8 +725,8 @@ public IracCleanupKeyCommand buildIracCleanupKeyCommand(Object key, Object lockO
}

@Override
public IracMetadataRequestCommand buildIracMetadataRequestCommand(int segment) {
return new IracMetadataRequestCommand(cacheName, segment);
public IracMetadataRequestCommand buildIracMetadataRequestCommand(int segment, IracEntryVersion versionSeen) {
return new IracMetadataRequestCommand(cacheName, segment, versionSeen);
}

@Override
Expand All @@ -747,4 +749,9 @@ public IracPutKeyValueCommand buildIracPutKeyValueCommand(Object key, int segmen
public IracTouchKeyCommand buildIracTouchCommand(Object key) {
return new IracTouchKeyCommand(cacheName, key);
}

@Override
public IracUpdateVersionCommand buildIracUpdateVersionCommand(Map<Integer, IracEntryVersion> segmentsVersion) {
return new IracUpdateVersionCommand(cacheName, segmentsVersion);
}
}
Expand Up @@ -23,6 +23,7 @@
import org.infinispan.commands.irac.IracRequestStateCommand;
import org.infinispan.commands.irac.IracStateResponseCommand;
import org.infinispan.commands.irac.IracTouchKeyCommand;
import org.infinispan.commands.irac.IracUpdateVersionCommand;
import org.infinispan.commands.module.ModuleCommandFactory;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
Expand Down Expand Up @@ -467,6 +468,9 @@ public CacheRpcCommand fromStream(byte id, byte type, ByteString cacheName) {
case IracTouchKeyCommand.COMMAND_ID:
command = new IracTouchKeyCommand(cacheName);
break;
case IracUpdateVersionCommand.COMMAND_ID:
command = new IracUpdateVersionCommand(cacheName);
break;
default:
throw new CacheException("Unknown command id " + id + "!");
}
Expand Down
Expand Up @@ -9,6 +9,7 @@

import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.container.versioning.irac.IracEntryVersion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.transport.Address;
Expand All @@ -27,18 +28,19 @@ public class IracMetadataRequestCommand implements CacheRpcCommand, TopologyAffe
private ByteString cacheName;
private int segment;
private int topologyId = -1;
private IracEntryVersion versionSeen;

@SuppressWarnings("unused")
public IracMetadataRequestCommand() {
}

public IracMetadataRequestCommand(ByteString cacheName) {
this.cacheName = cacheName;
}

public IracMetadataRequestCommand(ByteString cacheName, int segment) {
public IracMetadataRequestCommand(ByteString cacheName, int segment, IracEntryVersion versionSeen) {
this.cacheName = cacheName;
this.segment = segment;
this.versionSeen = versionSeen;
}

@Override
Expand All @@ -48,7 +50,7 @@ public ByteString getCacheName() {

@Override
public CompletionStage<?> invokeAsync(ComponentRegistry registry) throws Throwable {
return completedFuture(registry.getIracVersionGenerator().running().generateNewMetadata(segment));
return completedFuture(registry.getIracVersionGenerator().running().generateNewMetadata(segment, versionSeen));
}

@Override
Expand All @@ -64,13 +66,13 @@ public boolean isReturnValueExpected() {
@Override
public void writeTo(ObjectOutput output) throws IOException {
output.writeInt(segment);
output.writeInt(topologyId);
output.writeObject(versionSeen);
}

@Override
public void readFrom(ObjectInput input) throws IOException {
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
this.segment = input.readInt();
this.topologyId = input.readInt();
this.versionSeen = (IracEntryVersion) input.readObject();
}

@Override
Expand All @@ -90,6 +92,7 @@ public String toString() {
"cacheName=" + cacheName +
", segment=" + segment +
", topologyId=" + topologyId +
", versionSeen=" + versionSeen +
'}';
}

Expand Down
@@ -0,0 +1,92 @@
package org.infinispan.commands.irac;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.container.versioning.irac.IracEntryVersion;
import org.infinispan.container.versioning.irac.IracVersionGenerator;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.CompletableFutures;

/**
* It transfers the current versions stored in {@link IracVersionGenerator} to the other nodes when joins/leaving events
* occurs.
*
* @author Pedro Ruivo
* @since 12.0
*/
public class IracUpdateVersionCommand implements CacheRpcCommand {
public static final byte COMMAND_ID = 32;
private final ByteString cacheName;
private Map<Integer, IracEntryVersion> versions;

public IracUpdateVersionCommand() {
this(null);
}

public IracUpdateVersionCommand(ByteString cacheName) {
this.cacheName = cacheName;
}

public IracUpdateVersionCommand(ByteString cacheName, Map<Integer, IracEntryVersion> segmentsVersion) {
this(cacheName);
this.versions = segmentsVersion;
}

@Override
public CompletionStage<?> invokeAsync(ComponentRegistry registry) throws Throwable {
IracVersionGenerator versionGenerator = registry.getIracVersionGenerator().running();
versions.forEach(versionGenerator::updateVersion);
return CompletableFutures.completedNull();
}

@Override
public ByteString getCacheName() {
return cacheName;
}

@Override
public byte getCommandId() {
return COMMAND_ID;
}

@Override
public boolean isReturnValueExpected() {
return false;
}

@Override
public Address getOrigin() {
//no-op, not required.
return null;
}

@Override
public void setOrigin(Address origin) {
//no-op, not required.
}

@Override
public void writeTo(ObjectOutput output) throws IOException {
MarshallUtil.marshallMap(versions, DataOutput::writeInt, ObjectOutput::writeObject, output);
}

@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
versions = MarshallUtil.unmarshallMap(input, DataInput::readInt, IracUpdateVersionCommand::read, HashMap::new);
}

private static IracEntryVersion read(ObjectInput input) throws IOException, ClassNotFoundException {
return (IracEntryVersion) input.readObject();
}
}
Expand Up @@ -155,6 +155,7 @@ public String toString() {
.append(", putIfAbsent=").append(putIfAbsent)
.append(", valueMatcher=").append(valueMatcher)
.append(", metadata=").append(metadata)
.append(", internalMetadata=").append(internalMetadata)
.append(", successful=").append(successful)
.append(", topologyId=").append(getTopologyId())
.append("}")
Expand Down

0 comments on commit 971632d

Please sign in to comment.