Skip to content

Commit

Permalink
ISPN-14192 Do not send empty batches with IRAC
Browse files Browse the repository at this point in the history
* Includes change to ConcurrentSmallIntSet preventing wrong constructor
  argument
  • Loading branch information
pruivo committed Oct 4, 2022
1 parent 8e25772 commit 94653e8
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class ConcurrentSmallIntSet implements IntSet {
* @param minCapacityExclusive The implementation performs sizing to ensure values up to this can be stored
*/
public ConcurrentSmallIntSet(int minCapacityExclusive) {
if (minCapacityExclusive < 1) {
throw new IllegalArgumentException("minCapacityExclusive (" + minCapacityExclusive + ") < 1");
}
// We add 31 as that is 2^5 -1 so we round up
int intLength = intIndex(minCapacityExclusive + 31);
array = new AtomicIntegerArray(intLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public static IntSet mutableSet(int value1, int value2) {
* @return concurrent set
*/
public static IntSet concurrentSet(int maxExclusive) {
return new ConcurrentSmallIntSet(maxExclusive);
// if maxExclusive = 0; then we have an empty set
return maxExclusive < 1 ? immutableEmptySet() : new ConcurrentSmallIntSet(maxExclusive);
}

/**
Expand All @@ -189,6 +190,10 @@ public static IntSet concurrentSet(int maxExclusive) {
* @return concurrent copy
*/
public static IntSet concurrentCopyFrom(IntSet intSet, int maxExclusive) {
// if maxExclusive = 0; then we have an empty set
if (maxExclusive < 1) {
return immutableEmptySet();
}
ConcurrentSmallIntSet cis = new ConcurrentSmallIntSet(maxExclusive);
intSet.forEach((IntConsumer) cis::set);
return cis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ private static Update readUpdateFrom(ObjectInput input) throws IOException, Clas
throw new IllegalStateException();
}

public boolean isEmpty() {
return updateList.isEmpty();
}

private interface Update {
byte getType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,21 +436,24 @@ private CompletableSource sendUpdateBatch(Collection<? extends IracStateData> ba
cmd.addUpdate(data.state.getKey(), data.entry.getValue(), data.entry.getMetadata(), data.entry.getInternalMetadata().iracMetadata());
}
}
IracResponseCollector rspCollector = new IracResponseCollector(commandsFactory.getCacheName(), validState, this::onBatchResponse);
try {
for (IracXSiteBackup backup : asyncBackups) {
if (takeOfflineManager.getSiteState(backup.getSiteName()) == SiteState.OFFLINE) {
continue; // backup is offline

IracResponseCollector rspCollector = null;
if (!cmd.isEmpty()) {
rspCollector = new IracResponseCollector(commandsFactory.getCacheName(), validState, this::onBatchResponse);
try {
for (IracXSiteBackup backup : asyncBackups) {
if (takeOfflineManager.getSiteState(backup.getSiteName()) == SiteState.OFFLINE) {
continue; // backup is offline
}
rspCollector.dependsOn(backup, sendToRemoteSite(backup, cmd));
}
rspCollector.dependsOn(backup, sendToRemoteSite(backup, cmd));
}
} catch (Throwable throwable) {
// marshalling error? JGroups error?
for (IracStateData data : batch) {
data.state.retry();
} catch (Throwable throwable) {
// marshalling error? JGroups error?
for (IracStateData data : batch) {
data.state.retry();
}
onUnexpectedThrowable(throwable);
}
onUnexpectedThrowable(throwable);
return Completable.complete();
}

if (!invalidState.isEmpty()) {
Expand All @@ -461,7 +464,7 @@ private CompletableSource sendUpdateBatch(Collection<? extends IracStateData> ba
removeStateFromCluster(invalidState);
}

return Completable.fromCompletionStage(rspCollector.freeze());
return rspCollector == null ? Completable.complete() : Completable.fromCompletionStage(rspCollector.freeze());
}

private CompletionStage<Void> sendClearUpdate() {
Expand Down

0 comments on commit 94653e8

Please sign in to comment.