Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-14192 Do not send empty batches with IRAC #10367

Merged
merged 1 commit into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ class ConcurrentSmallIntSet implements IntSet {
private final AtomicInteger currentSize = new AtomicInteger();

/**
* Creates a new, empty map which can accommodate ints in value up to {@code minCapacityExclusive - 1}. This number
* Creates a new, empty map which can accommodate ints in value up to {@code maxCapacityExclusive - 1}. This number
* will be rounded up to the nearest 32.
* @param minCapacityExclusive The implementation performs sizing to ensure values up to this can be stored
* @param maxCapacityExclusive The implementation performs sizing to ensure values up to this can be stored
*/
public ConcurrentSmallIntSet(int minCapacityExclusive) {
public ConcurrentSmallIntSet(int maxCapacityExclusive) {
if (maxCapacityExclusive < 1) {
throw new IllegalArgumentException("maxCapacityExclusive (" + maxCapacityExclusive + ") < 1");
}
// We add 31 as that is 2^5 -1 so we round up
int intLength = intIndex(minCapacityExclusive + 31);
int intLength = intIndex(maxCapacityExclusive + 31);
array = new AtomicIntegerArray(intLength);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public static IntSet mutableSet(int value1, int value2) {
* @return concurrent set
*/
public static IntSet concurrentSet(int maxExclusive) {
return new ConcurrentSmallIntSet(maxExclusive);
// if maxExclusive = 0; then we have an empty set
return maxExclusive == 0 ? immutableEmptySet() : new ConcurrentSmallIntSet(maxExclusive);
}

/**
Expand All @@ -189,6 +190,10 @@ public static IntSet concurrentSet(int maxExclusive) {
* @return concurrent copy
*/
public static IntSet concurrentCopyFrom(IntSet intSet, int maxExclusive) {
// if maxExclusive = 0; then we have an empty set
if (maxExclusive == 0) {
return immutableEmptySet();
}
ConcurrentSmallIntSet cis = new ConcurrentSmallIntSet(maxExclusive);
intSet.forEach((IntConsumer) cis::set);
return cis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ private static Update readUpdateFrom(ObjectInput input) throws IOException, Clas
throw new IllegalStateException();
}

public boolean isEmpty() {
return updateList.isEmpty();
}

private interface Update {
byte getType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,21 +436,26 @@ private CompletableSource sendUpdateBatch(Collection<? extends IracStateData> ba
cmd.addUpdate(data.state.getKey(), data.entry.getValue(), data.entry.getMetadata(), data.entry.getInternalMetadata().iracMetadata());
}
}
IracResponseCollector rspCollector = new IracResponseCollector(commandsFactory.getCacheName(), validState, this::onBatchResponse);
try {
for (IracXSiteBackup backup : asyncBackups) {
if (takeOfflineManager.getSiteState(backup.getSiteName()) == SiteState.OFFLINE) {
continue; // backup is offline

IracResponseCollector rspCollector = null;
if (!cmd.isEmpty()) {
rspCollector = new IracResponseCollector(commandsFactory.getCacheName(), validState, this::onBatchResponse);
try {
for (IracXSiteBackup backup : asyncBackups) {
if (takeOfflineManager.getSiteState(backup.getSiteName()) == SiteState.OFFLINE) {
continue; // backup is offline
}
rspCollector.dependsOn(backup, sendToRemoteSite(backup, cmd));
}
rspCollector.dependsOn(backup, sendToRemoteSite(backup, cmd));
}
} catch (Throwable throwable) {
// marshalling error? JGroups error?
for (IracStateData data : batch) {
data.state.retry();
} catch (Throwable throwable) {
// safety net; should never happen
// onUnexpectedThrowable() log the exception!
for (IracStateData data : batch) {
data.state.retry();
}
onUnexpectedThrowable(throwable);
rspCollector = null;
}
onUnexpectedThrowable(throwable);
return Completable.complete();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This slightly changes how exception handling operates. With the new changes if there is a Throwable, we now have to wait for the registered remote site backup commands to complete. Do we want that? It is probably a very small chance of this happening, but just wanted to make sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the invalidState list being cleaned up now and not before?

There is no reason to keep the invalid state around more time than necessary to be cleaned up in the "next round".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite, If you have more than one backup if the second errors, we would have to wait until the first completes as rspCollector would have a stage dependency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'm setting rspCollector = null; to avoid waiting although, I think, this code will never be executed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes I missed that sorry. So we are good.

}

if (!invalidState.isEmpty()) {
Expand All @@ -461,7 +466,7 @@ private CompletableSource sendUpdateBatch(Collection<? extends IracStateData> ba
removeStateFromCluster(invalidState);
}

return Completable.fromCompletionStage(rspCollector.freeze());
return rspCollector == null ? Completable.complete() : Completable.fromCompletionStage(rspCollector.freeze());
}

private CompletionStage<Void> sendClearUpdate() {
Expand Down