Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[13.0.x] ISPN-13430 IRAC tombstones are leaking #9629

Merged
merged 1 commit into from Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,7 @@
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.irac.IracCleanupKeyCommand;
import org.infinispan.commands.irac.IracCleanupTombstoneCommand;
import org.infinispan.commands.irac.IracClearKeysCommand;
import org.infinispan.commands.irac.IracMetadataRequestCommand;
import org.infinispan.commands.irac.IracPutKeyCommand;
Expand Down Expand Up @@ -438,7 +439,7 @@ default SingleRpcCommand buildSingleRpcCommand(ReplicableCommand call) {
LockControlCommand buildLockControlCommand(Collection<?> keys, long flagsBitSet, GlobalTransaction gtx);

/**
* Same as {@link #buildLockControlCommand(Object, long, GlobalTransaction)}
* Same as {@link #buildLockControlCommand(Collection, long, GlobalTransaction)}
* but for locking a single key vs a collection of keys.
*/
LockControlCommand buildLockControlCommand(Object key, long flagsBitSet, GlobalTransaction gtx);
Expand Down Expand Up @@ -641,7 +642,9 @@ <K, I, R> InitialPublisherCommand<K, I, R> buildInitialPublisherCommand(String r

IracClearKeysCommand buildIracClearKeysCommand();

IracCleanupKeyCommand buildIracCleanupKeyCommand(int segment, Object key, Object lockOwner, IracMetadata tombstone);
IracCleanupKeyCommand buildIracCleanupKeyCommand(int segment, Object key, Object lockOwner);

IracCleanupTombstoneCommand buildIracCleanupTombstoneCommand(Object key, IracMetadata tombstone);

IracMetadataRequestCommand buildIracMetadataRequestCommand(int segment, IracEntryVersion versionSeen);

Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.irac.IracCleanupKeyCommand;
import org.infinispan.commands.irac.IracCleanupTombstoneCommand;
import org.infinispan.commands.irac.IracClearKeysCommand;
import org.infinispan.commands.irac.IracMetadataRequestCommand;
import org.infinispan.commands.irac.IracPutKeyCommand;
Expand Down Expand Up @@ -728,8 +729,13 @@ public IracClearKeysCommand buildIracClearKeysCommand() {
}

@Override
public IracCleanupKeyCommand buildIracCleanupKeyCommand(int segment, Object key, Object lockOwner, IracMetadata tombstone) {
return new IracCleanupKeyCommand(cacheName, segment, key, lockOwner, tombstone);
public IracCleanupKeyCommand buildIracCleanupKeyCommand(int segment, Object key, Object lockOwner) {
return new IracCleanupKeyCommand(cacheName, segment, key, lockOwner);
}

@Override
public IracCleanupTombstoneCommand buildIracCleanupTombstoneCommand(Object key, IracMetadata tombstone) {
return new IracCleanupTombstoneCommand(cacheName, key, tombstone);
}

@Override
Expand Down
Expand Up @@ -16,6 +16,7 @@
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.irac.IracCleanupKeyCommand;
import org.infinispan.commands.irac.IracCleanupTombstoneCommand;
import org.infinispan.commands.irac.IracClearKeysCommand;
import org.infinispan.commands.irac.IracMetadataRequestCommand;
import org.infinispan.commands.irac.IracPutKeyCommand;
Expand Down Expand Up @@ -487,6 +488,9 @@ public CacheRpcCommand fromStream(byte id, byte type, ByteString cacheName) {
case XSiteSetStateTransferModeCommand.COMMAND_ID:
command = new XSiteSetStateTransferModeCommand(cacheName);
break;
case IracCleanupTombstoneCommand.COMMAND_ID:
command = new IracCleanupTombstoneCommand(cacheName);
break;
default:
throw new CacheException("Unknown command id " + id + "!");
}
Expand Down
Expand Up @@ -7,16 +7,16 @@

import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.util.Util;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.CompletableFutures;

/**
* Sends a cleanup request from primary owner to backup owners.
* <p>
* Sent after a successfully update of all remote sites.
* Sent after a successful update of all remote sites.
*
* @author Pedro Ruivo
* @since 11.0
Expand All @@ -29,7 +29,6 @@ public class IracCleanupKeyCommand implements CacheRpcCommand {
private int segment;
private Object key;
private Object lockOwner;
private IracMetadata tombstone;

@SuppressWarnings("unused")
public IracCleanupKeyCommand() {
Expand All @@ -39,12 +38,11 @@ public IracCleanupKeyCommand(ByteString cacheName) {
this.cacheName = cacheName;
}

public IracCleanupKeyCommand(ByteString cacheName, int segment, Object key, Object lockOwner, IracMetadata tombstone) {
public IracCleanupKeyCommand(ByteString cacheName, int segment, Object key, Object lockOwner) {
this.cacheName = cacheName;
this.segment = segment;
this.key = key;
this.lockOwner = lockOwner;
this.tombstone = tombstone;
}

@Override
Expand All @@ -54,7 +52,7 @@ public ByteString getCacheName() {

@Override
public CompletableFuture<Object> invokeAsync(ComponentRegistry componentRegistry) {
componentRegistry.getIracManager().running().cleanupKey(segment, key, lockOwner, tombstone);
componentRegistry.getIracManager().running().cleanupKey(segment, key, lockOwner);
return CompletableFutures.completedNull();
}

Expand Down Expand Up @@ -107,7 +105,8 @@ public void setOrigin(Address origin) {
@Override
public String toString() {
return "IracCleanupKeyCommand{" +
"key=" + key +
"cacheName=" + cacheName +
", key=" + Util.toStr(key) +
", lockOwner=" + lockOwner +
'}';
}
Expand Down
@@ -0,0 +1,144 @@
package org.infinispan.commands.irac;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.concurrent.CompletionStage;

import org.infinispan.commons.util.Util;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ValidSingleResponseCollector;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.xsite.BackupReceiver;
import org.infinispan.xsite.XSiteReplicateCommand;
import org.infinispan.xsite.irac.IracManager;

/**
* A {@link XSiteReplicateCommand} to check and cleanup tombstones for IRAC algorithm.
* <p>
* This command has 2 modes: (1, when tombstone==null) when it is sent to a remote site, it checks if the key exists in
* the {@link IracManager}. This check is performed in the primary owner of the key; (2, when tombstone!=null) if it is
* sent from primary owner to backup owners, the backup owners remove the tombstone.
*
* @since 14.0
*/
public class IracCleanupTombstoneCommand extends XSiteReplicateCommand<Boolean> {

public static final byte COMMAND_ID = 37;

private Object key;
private IracMetadata tombstone;

@SuppressWarnings("unused")
public IracCleanupTombstoneCommand() {
super(COMMAND_ID, null);
}

public IracCleanupTombstoneCommand(ByteString cacheName) {
super(COMMAND_ID, cacheName);
}

public IracCleanupTombstoneCommand(ByteString cacheName, Object key, IracMetadata tombstone) {
super(COMMAND_ID, cacheName);
this.key = key;
this.tombstone = tombstone;
}

@Override
public ByteString getCacheName() {
return cacheName;
}

@Override
public CompletionStage<Boolean> invokeAsync(ComponentRegistry registry) {
if (tombstone == null) {
// command received from a remote site.
// check if the key exists in IracManager
return isKeyInIracManager(registry);
}
// removes the tombstone
registry.getIracTombstoneCleaner().running().removeTombstone(key, tombstone);
return CompletableFutures.completedNull();
}

@Override
public byte getCommandId() {
return COMMAND_ID;
}

@Override
public CompletionStage<Boolean> performInLocalSite(ComponentRegistry registry, boolean preserveOrder) {
DistributionInfo distribution = registry.getDistributionManager().getCacheTopology().getDistribution(key);
if (distribution.isPrimary()) {
return isKeyInIracManager(registry);
} else {
RpcManager manager = registry.getRpcManager().running();
return manager.invokeCommand(distribution.primary(), this, new BooleanResponseCollector(), manager.getSyncRpcOptions());
}
}

@Override
public CompletionStage<Boolean> performInLocalSite(BackupReceiver receiver, boolean preserveOrder) {
throw new IllegalStateException("Should never be invoked!");
}

@Override
public boolean isReturnValueExpected() {
return true;
}

@Override
public void writeTo(ObjectOutput output) throws IOException {
output.writeObject(key);
IracMetadata.writeTo(output, tombstone);
}

@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
this.key = input.readObject();
this.tombstone = IracMetadata.readFrom(input);
}

@Override
public Address getOrigin() {
//not needed
return null;
}

@Override
public void setOrigin(Address origin) {
//no-op
}

@Override
public String toString() {
return "IracCleanupTombstoneCommand{" +
"cacheName=" + cacheName +
", key=" + Util.toStr(key) +
", tombstone=" + tombstone +
'}';
}

private CompletionStage<Boolean> isKeyInIracManager(ComponentRegistry registry) {
return CompletableFutures.booleanStage(registry.getIracManager().running().containsKey(key));
}

private static final class BooleanResponseCollector extends ValidSingleResponseCollector<Boolean> {

@Override
protected Boolean withValidResponse(Address sender, ValidResponse response) {
return (Boolean) response.getResponseValue();
}

@Override
protected Boolean targetNotFound(Address sender) {
return Boolean.TRUE;
}
}
}
Expand Up @@ -58,7 +58,7 @@ public void writeTo(ObjectOutput output) throws IOException {
output.writeObject(key);
output.writeObject(value);
output.writeObject(metadata);
iracMetadata.writeTo(output);
IracMetadata.writeTo(output, iracMetadata);
}

@Override
Expand Down
Expand Up @@ -52,7 +52,7 @@ public byte getCommandId() {
@Override
public void writeTo(ObjectOutput output) throws IOException {
output.writeObject(key);
iracMetadata.writeTo(output);
IracMetadata.writeTo(output, iracMetadata);
output.writeBoolean(expiration);
}

Expand Down
Expand Up @@ -72,11 +72,7 @@ public void writeTo(ObjectOutput output) throws IOException {
} else {
output.writeObject(lockOwner);
}
boolean nullTombstone = tombstone == null;
output.writeBoolean(nullTombstone);
if (!nullTombstone) {
tombstone.writeTo(output);
}
IracMetadata.writeTo(output, tombstone);
}

@Override
Expand All @@ -88,11 +84,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
} else {
this.lockOwner = input.readObject();
}
if (input.readBoolean()) {
tombstone = null;
} else {
tombstone = IracMetadata.readFrom(input);
}
this.tombstone = IracMetadata.readFrom(input);
}

@Override
Expand Down