-
Notifications
You must be signed in to change notification settings - Fork 612
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ISPN-2634 Implement cross-site replication based on IRAC
- Loading branch information
Showing
105 changed files
with
6,501 additions
and
296 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
110 changes: 110 additions & 0 deletions
110
core/src/main/java/org/infinispan/commands/irac/IracCleanupKeyCommand.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package org.infinispan.commands.irac; | ||
|
||
import java.io.IOException; | ||
import java.io.ObjectInput; | ||
import java.io.ObjectOutput; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
import org.infinispan.commands.CommandInvocationId; | ||
import org.infinispan.commands.remote.CacheRpcCommand; | ||
import org.infinispan.factories.ComponentRegistry; | ||
import org.infinispan.metadata.impl.IracMetadata; | ||
import org.infinispan.remoting.transport.Address; | ||
import org.infinispan.util.ByteString; | ||
import org.infinispan.util.concurrent.CompletableFutures; | ||
|
||
/** | ||
* Sends a cleanup request from primary owner to backup owners. | ||
* <p> | ||
* Sent after a successfully update of all remote sites. | ||
* | ||
* @author Pedro Ruivo | ||
* @since 11.0 | ||
*/ | ||
public class IracCleanupKeyCommand implements CacheRpcCommand { | ||
|
||
public static final byte COMMAND_ID = 122; | ||
|
||
private ByteString cacheName; | ||
private Object key; | ||
private Object lockOwner; | ||
private IracMetadata tombstone; | ||
|
||
@SuppressWarnings("unused") | ||
public IracCleanupKeyCommand() { | ||
} | ||
|
||
public IracCleanupKeyCommand(ByteString cacheName) { | ||
this.cacheName = cacheName; | ||
} | ||
|
||
public IracCleanupKeyCommand(ByteString cacheName, Object key, Object lockOwner, IracMetadata tombstone) { | ||
this.cacheName = cacheName; | ||
this.key = key; | ||
this.lockOwner = lockOwner; | ||
this.tombstone = tombstone; | ||
} | ||
|
||
@Override | ||
public ByteString getCacheName() { | ||
return cacheName; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Object> invokeAsync(ComponentRegistry componentRegistry) { | ||
componentRegistry.getIracManager().running().cleanupKey(key, lockOwner, tombstone); | ||
return CompletableFutures.completedNull(); | ||
} | ||
|
||
@Override | ||
public byte getCommandId() { | ||
return COMMAND_ID; | ||
} | ||
|
||
@Override | ||
public boolean isReturnValueExpected() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public void writeTo(ObjectOutput output) throws IOException { | ||
output.writeObject(key); | ||
boolean cId = lockOwner instanceof CommandInvocationId; | ||
output.writeBoolean(cId); | ||
if (cId) { | ||
CommandInvocationId.writeTo(output, (CommandInvocationId) lockOwner); | ||
} else { | ||
output.writeObject(lockOwner); | ||
} | ||
} | ||
|
||
@Override | ||
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { | ||
this.key = input.readObject(); | ||
boolean cId = input.readBoolean(); | ||
if (cId) { | ||
lockOwner = CommandInvocationId.readFrom(input); | ||
} else { | ||
this.lockOwner = input.readObject(); | ||
} | ||
} | ||
|
||
@Override | ||
public Address getOrigin() { | ||
//not needed | ||
return null; | ||
} | ||
|
||
@Override | ||
public void setOrigin(Address origin) { | ||
//no-op | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "IracCleanupKeyCommand{" + | ||
"key=" + key + | ||
", lockOwner=" + lockOwner + | ||
'}'; | ||
} | ||
} |
70 changes: 70 additions & 0 deletions
70
core/src/main/java/org/infinispan/commands/irac/IracClearKeysCommand.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package org.infinispan.commands.irac; | ||
|
||
import java.io.IOException; | ||
import java.io.ObjectInput; | ||
import java.io.ObjectOutput; | ||
import java.util.concurrent.CompletionStage; | ||
|
||
import org.infinispan.util.ByteString; | ||
import org.infinispan.xsite.BackupReceiver; | ||
import org.infinispan.xsite.irac.IracManager; | ||
|
||
/** | ||
* A clear request that is sent to the remote site by {@link IracManager}. | ||
* | ||
* @author Pedro Ruivo | ||
* @since 11.0 | ||
*/ | ||
public class IracClearKeysCommand extends IracUpdateKeyCommand { | ||
|
||
public static final byte COMMAND_ID = 17; | ||
|
||
@SuppressWarnings("unused") | ||
public IracClearKeysCommand() { | ||
super(COMMAND_ID, null); | ||
} | ||
|
||
public IracClearKeysCommand(ByteString cacheName) { | ||
super(COMMAND_ID, cacheName); | ||
} | ||
|
||
public CompletionStage<Void> executeOperation(BackupReceiver receiver) { | ||
return receiver.clearKeys(); | ||
} | ||
|
||
@Override | ||
public byte getCommandId() { | ||
return COMMAND_ID; | ||
} | ||
|
||
@Override | ||
public void writeTo(ObjectOutput output) throws IOException { | ||
|
||
} | ||
|
||
@Override | ||
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { | ||
|
||
} | ||
|
||
public Object getKey() { | ||
return null; | ||
} | ||
|
||
public IracClearKeysCommand copyForCacheName(ByteString cacheName) { | ||
return new IracClearKeysCommand(cacheName); | ||
} | ||
|
||
@Override | ||
public boolean isClear() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "IracClearKeyCommand{" + | ||
"originSite='" + originSite + '\'' + | ||
", cacheName=" + cacheName + | ||
'}'; | ||
} | ||
} |
Oops, something went wrong.