Skip to content

Commit

Permalink
ISPN-2634 Implement cross-site replication based on IRAC
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and ryanemerson committed May 12, 2020
1 parent ae4491d commit 84f3366
Show file tree
Hide file tree
Showing 105 changed files with 6,501 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public interface ProtoStreamTypeIds {
int META_PARAMS_INTERNAL_METADATA = CORE_LOWER_BOUND + 3;
int REMOTE_METADATA = CORE_LOWER_BOUND + 4;
int UUID = CORE_LOWER_BOUND + 5;
int IRAC_VERSION = UUID + 1;
int IRAC_SITE_VERSION = IRAC_VERSION + 1;
int IRAC_VERSION_ENTRY = IRAC_SITE_VERSION + 1;
int IRAC_METADATA = IRAC_VERSION_ENTRY + 1;

// Counter range 4000 -> 4199
int COUNTERS_LOWER_BOUND = CORE_LOWER_BOUND + 3000;
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/org/infinispan/commands/CommandsFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.irac.IracCleanupKeyCommand;
import org.infinispan.commands.irac.IracClearKeysCommand;
import org.infinispan.commands.irac.IracMetadataRequestCommand;
import org.infinispan.commands.irac.IracPutKeyCommand;
import org.infinispan.commands.irac.IracRemoveKeyCommand;
import org.infinispan.commands.irac.IracRequestStateCommand;
import org.infinispan.commands.irac.IracStateResponseCommand;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
Expand Down Expand Up @@ -78,6 +85,7 @@
import org.infinispan.commons.tx.XidImpl;
import org.infinispan.commons.util.IntSet;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.encoding.DataConversion;
import org.infinispan.expiration.impl.TouchCommand;
import org.infinispan.factories.ComponentRegistry;
Expand All @@ -89,6 +97,7 @@
import org.infinispan.functional.EntryView.WriteEntryView;
import org.infinispan.functional.impl.Params;
import org.infinispan.metadata.Metadata;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.notifications.cachelistener.cluster.ClusterEvent;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCommand;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
Expand Down Expand Up @@ -614,4 +623,18 @@ <K, I, R> InitialPublisherCommand<K, I, R> buildInitialPublisherCommand(String r
CheckTransactionRpcCommand buildCheckTransactionRpcCommand(Collection<GlobalTransaction> globalTransactions);

TouchCommand buildTouchCommand(Object key, int segment);

<K,V> IracPutKeyCommand buildIracPutKeyCommand(InternalCacheEntry<K, V> entry);

IracRemoveKeyCommand buildIracRemoveKeyCommand(Object key, IracMetadata iracMetadata);

IracClearKeysCommand buildIracClearKeysCommand();

IracCleanupKeyCommand buildIracCleanupKeyCommand(Object key, Object lockOwner, IracMetadata tombstone);

IracMetadataRequestCommand buildIracMetadataRequestCommand(int segment);

IracRequestStateCommand buildIracRequestStateCommand(IntSet segments);

IracStateResponseCommand buildIracStateResponseCommand(Object key, Object lockOwner, IracMetadata tombstone);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.irac.IracCleanupKeyCommand;
import org.infinispan.commands.irac.IracClearKeysCommand;
import org.infinispan.commands.irac.IracMetadataRequestCommand;
import org.infinispan.commands.irac.IracPutKeyCommand;
import org.infinispan.commands.irac.IracRemoveKeyCommand;
import org.infinispan.commands.irac.IracRequestStateCommand;
import org.infinispan.commands.irac.IracStateResponseCommand;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetAllCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
Expand Down Expand Up @@ -87,6 +94,7 @@
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.encoding.DataConversion;
import org.infinispan.expiration.impl.TouchCommand;
Expand All @@ -106,6 +114,8 @@
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.core.GlobalMarshaller;
import org.infinispan.metadata.Metadata;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.metadata.impl.PrivateMetadata;
import org.infinispan.notifications.cachelistener.cluster.ClusterEvent;
import org.infinispan.notifications.cachelistener.cluster.MultiClusterEventCommand;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
Expand Down Expand Up @@ -686,4 +696,42 @@ public CheckTransactionRpcCommand buildCheckTransactionRpcCommand(Collection<Glo
public TouchCommand buildTouchCommand(Object key, int segment) {
return new TouchCommand(cacheName, key, segment);
}

@Override
public <K, V> IracPutKeyCommand buildIracPutKeyCommand(InternalCacheEntry<K, V> entry) {
PrivateMetadata internalMetadata = entry.getInternalMetadata();
assert internalMetadata != null : "[IRAC] Metadata to send to remote site is null! key=" + entry.getKey();
IracMetadata iracMetadata = internalMetadata.iracMetadata();
return new IracPutKeyCommand(cacheName, entry.getKey(), entry.getValue(), entry.getMetadata(), iracMetadata);
}

@Override
public IracRemoveKeyCommand buildIracRemoveKeyCommand(Object key, IracMetadata iracMetadata) {
return new IracRemoveKeyCommand(cacheName, key, iracMetadata);
}

@Override
public IracClearKeysCommand buildIracClearKeysCommand() {
return new IracClearKeysCommand(cacheName);
}

@Override
public IracCleanupKeyCommand buildIracCleanupKeyCommand(Object key, Object lockOwner, IracMetadata tombstone) {
return new IracCleanupKeyCommand(cacheName, key, lockOwner, tombstone);
}

@Override
public IracMetadataRequestCommand buildIracMetadataRequestCommand(int segment) {
return new IracMetadataRequestCommand(cacheName, segment);
}

@Override
public IracRequestStateCommand buildIracRequestStateCommand(IntSet segments) {
return new IracRequestStateCommand(cacheName, segments);
}

@Override
public IracStateResponseCommand buildIracStateResponseCommand(Object key, Object lockOwner, IracMetadata tombstone) {
return new IracStateResponseCommand(cacheName, key, lockOwner, tombstone);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
import org.infinispan.commands.functional.WriteOnlyKeyValueCommand;
import org.infinispan.commands.functional.WriteOnlyManyCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.irac.IracCleanupKeyCommand;
import org.infinispan.commands.irac.IracClearKeysCommand;
import org.infinispan.commands.irac.IracMetadataRequestCommand;
import org.infinispan.commands.irac.IracRemoveKeyCommand;
import org.infinispan.commands.irac.IracRequestStateCommand;
import org.infinispan.commands.irac.IracStateResponseCommand;
import org.infinispan.commands.irac.IracPutKeyCommand;
import org.infinispan.commands.module.ModuleCommandFactory;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
Expand Down Expand Up @@ -431,6 +438,27 @@ public CacheRpcCommand fromStream(byte id, byte type, ByteString cacheName) {
case TouchCommand.COMMAND_ID:
command = new TouchCommand(cacheName);
break;
case IracPutKeyCommand.COMMAND_ID:
command = new IracPutKeyCommand(cacheName);
break;
case IracCleanupKeyCommand.COMMAND_ID:
command = new IracCleanupKeyCommand(cacheName);
break;
case IracMetadataRequestCommand.COMMAND_ID:
command = new IracMetadataRequestCommand(cacheName);
break;
case IracRequestStateCommand.COMMAND_ID:
command = new IracRequestStateCommand(cacheName);
break;
case IracStateResponseCommand.COMMAND_ID:
command = new IracStateResponseCommand(cacheName);
break;
case IracRemoveKeyCommand.COMMAND_ID:
command = new IracRemoveKeyCommand(cacheName);
break;
case IracClearKeysCommand.COMMAND_ID:
command = new IracClearKeysCommand(cacheName);
break;
default:
throw new CacheException("Unknown command id " + id + "!");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.infinispan.commands.irac;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.concurrent.CompletableFuture;

import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.CompletableFutures;

/**
* Sends a cleanup request from primary owner to backup owners.
* <p>
* Sent after a successfully update of all remote sites.
*
* @author Pedro Ruivo
* @since 11.0
*/
public class IracCleanupKeyCommand implements CacheRpcCommand {

public static final byte COMMAND_ID = 122;

private ByteString cacheName;
private Object key;
private Object lockOwner;
private IracMetadata tombstone;

@SuppressWarnings("unused")
public IracCleanupKeyCommand() {
}

public IracCleanupKeyCommand(ByteString cacheName) {
this.cacheName = cacheName;
}

public IracCleanupKeyCommand(ByteString cacheName, Object key, Object lockOwner, IracMetadata tombstone) {
this.cacheName = cacheName;
this.key = key;
this.lockOwner = lockOwner;
this.tombstone = tombstone;
}

@Override
public ByteString getCacheName() {
return cacheName;
}

@Override
public CompletableFuture<Object> invokeAsync(ComponentRegistry componentRegistry) {
componentRegistry.getIracManager().running().cleanupKey(key, lockOwner, tombstone);
return CompletableFutures.completedNull();
}

@Override
public byte getCommandId() {
return COMMAND_ID;
}

@Override
public boolean isReturnValueExpected() {
return false;
}

@Override
public void writeTo(ObjectOutput output) throws IOException {
output.writeObject(key);
boolean cId = lockOwner instanceof CommandInvocationId;
output.writeBoolean(cId);
if (cId) {
CommandInvocationId.writeTo(output, (CommandInvocationId) lockOwner);
} else {
output.writeObject(lockOwner);
}
}

@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
this.key = input.readObject();
boolean cId = input.readBoolean();
if (cId) {
lockOwner = CommandInvocationId.readFrom(input);
} else {
this.lockOwner = input.readObject();
}
}

@Override
public Address getOrigin() {
//not needed
return null;
}

@Override
public void setOrigin(Address origin) {
//no-op
}

@Override
public String toString() {
return "IracCleanupKeyCommand{" +
"key=" + key +
", lockOwner=" + lockOwner +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.infinispan.commands.irac;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.concurrent.CompletionStage;

import org.infinispan.util.ByteString;
import org.infinispan.xsite.BackupReceiver;
import org.infinispan.xsite.irac.IracManager;

/**
* A clear request that is sent to the remote site by {@link IracManager}.
*
* @author Pedro Ruivo
* @since 11.0
*/
public class IracClearKeysCommand extends IracUpdateKeyCommand {

public static final byte COMMAND_ID = 17;

@SuppressWarnings("unused")
public IracClearKeysCommand() {
super(COMMAND_ID, null);
}

public IracClearKeysCommand(ByteString cacheName) {
super(COMMAND_ID, cacheName);
}

public CompletionStage<Void> executeOperation(BackupReceiver receiver) {
return receiver.clearKeys();
}

@Override
public byte getCommandId() {
return COMMAND_ID;
}

@Override
public void writeTo(ObjectOutput output) throws IOException {

}

@Override
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {

}

public Object getKey() {
return null;
}

public IracClearKeysCommand copyForCacheName(ByteString cacheName) {
return new IracClearKeysCommand(cacheName);
}

@Override
public boolean isClear() {
return true;
}

@Override
public String toString() {
return "IracClearKeyCommand{" +
"originSite='" + originSite + '\'' +
", cacheName=" + cacheName +
'}';
}
}

0 comments on commit 84f3366

Please sign in to comment.