Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix that failing bucket requests marked buckets as missing on server #7477

Merged
merged 8 commits into from
Dec 5, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { BucketAddress } from "oxalis/constants";
import type DataCube from "oxalis/model/bucket_data_handling/data_cube";
import type { DataStoreInfo } from "oxalis/store";
import Store from "oxalis/store";
import { asAbortable } from "libs/utils";
import { asAbortable, sleep } from "libs/utils";

export type PullQueueItem = {
priority: number;
Expand All @@ -19,6 +19,7 @@ export const PullQueueConstants = {
};
const BATCH_SIZE = 6;
const PULL_ABORTION_ERROR = new DOMException("Pull aborted.", "AbortError");
const MAX_RETRY_DELAY = 5000;

class PullQueue {
cube: DataCube;
Expand All @@ -27,6 +28,8 @@ class PullQueue {
layerName: string;
datastoreInfo: DataStoreInfo;
abortController: AbortController;
consecutiveErrorCount: number;
isRetryScheduled: boolean;

constructor(cube: DataCube, layerName: string, datastoreInfo: DataStoreInfo) {
this.cube = cube;
Expand All @@ -37,13 +40,13 @@ class PullQueue {
comparator: (b, a) => b.priority - a.priority,
});
this.batchCount = 0;
this.consecutiveErrorCount = 0;
this.isRetryScheduled = false;
this.abortController = new AbortController();
}

pull(): Array<Promise<void>> {
// Starting to download some buckets
const promises = [];

pull(): void {
// Start to download some buckets
while (this.batchCount < PullQueueConstants.BATCH_LIMIT && this.priorityQueue.length > 0) {
const batch = [];

Expand All @@ -58,11 +61,9 @@ class PullQueue {
}

if (batch.length > 0) {
promises.push(this.pullBatch(batch));
this.pullBatch(batch);
}
}

return promises;
}

abortRequests() {
Expand All @@ -78,6 +79,7 @@ class PullQueue {
const layerInfo = getLayerByName(dataset, this.layerName);
const { renderMissingDataBlack } = Store.getState().datasetConfiguration;

let hasErrored = false;
try {
const bucketBuffers = await asAbortable(
requestWithFallback(layerInfo, batch),
Expand Down Expand Up @@ -118,13 +120,41 @@ class PullQueue {
if (!(error instanceof DOMException && error.name === "AbortError")) {
// AbortErrors are deliberate. Don't show them on the console.
console.error(error);
hasErrored = true;
}
} finally {
if (hasErrored) {
this.consecutiveErrorCount++;
} else {
this.consecutiveErrorCount = 0;
}
this.batchCount--;
this.pull();

if (!hasErrored) {
// Continue to process the pull queue without delay.
this.pull();
} else {
// The current batch failed and we schedule a retry. However,
// parallel batches might fail, too, and also schedule a retry.
// To avoid that pull() is called X times in Y seconds, we only
// initiate a retry after a sleep if no concurrent invocation
// "claimed" the `isRetryScheduled` boolean.
if (!this.isRetryScheduled) {
this.isRetryScheduled = true;
sleep(this.getRetryDelay()).then(() => {
this.isRetryScheduled = false;
this.pull();
});
}
}
}
}

private getRetryDelay(): number {
const exponentialBackOff = 25 * 2 ** (this.consecutiveErrorCount / 10);
philippotto marked this conversation as resolved.
Show resolved Hide resolved
return Math.min(exponentialBackOff, MAX_RETRY_DELAY);
}

private handleBucket(
bucketAddress: BucketAddress,
bucketData: Uint8Array | null | undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ class TemporalBucketManager {
this.loadedPromises.push(this.makeLoadedPromise(bucket));
}

pullBucket(bucket: DataBucket): Array<Promise<void>> {
philippotto marked this conversation as resolved.
Show resolved Hide resolved
pullBucket(bucket: DataBucket): void {
this.pullQueue.add({
bucket: bucket.zoomedAddress,
priority: PullQueueConstants.PRIORITY_HIGHEST,
});
return this.pullQueue.pull();
this.pullQueue.pull();
}

makeLoadedPromise(bucket: DataBucket): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ export async function requestFromStore(
detailedError,
isOnline: window.navigator.onLine,
});
return batch.map((_val) => null);
throw errorResponse;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,8 +1019,8 @@ class DatasetSettings extends React.PureComponent<DatasetSettingsProps, State> {
};

reloadLayerData = async (layerName: string): Promise<void> => {
await clearCache(this.props.dataset, layerName);
this.props.reloadHistogram(layerName);
// await clearCache(this.props.dataset, layerName);
philippotto marked this conversation as resolved.
Show resolved Hide resolved
// this.props.reloadHistogram(layerName);
await api.data.reloadBuckets(layerName);
Toast.success(`Successfully reloaded data of layer ${layerName}.`);
};
Expand Down