Skip to content

Commit

Permalink
Merge pull request #994 from brizental/1727076-wait-on-timer
Browse files Browse the repository at this point in the history
Bug 1727076 - Have PingUploadWorker sleep in case it gets a Wait task
  • Loading branch information
brizental committed Dec 8, 2021
2 parents 4260b2b + a92aae8 commit dc4b6a5
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 22 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,9 @@
* [#984](https://github.com/mozilla/glean.js/pull/984): BUGFIX: Return correct upload result in case an error happens while building a ping request.
* [#988](https://github.com/mozilla/glean.js/pull/988): BUGFIX: Enforce rate limitation at upload time, not at ping submission time.
* Note: This change required a big refactoring of the internal uploading logic.
* [#994](https://github.com/mozilla/glean.js/pull/994): Automatically restart ping upload once the rate limit window is ended.
* Prior to this change, ping uploading would only be resumed once the `.submit()` API was called again, even if Glean was not throttled anymore.
* **Note**: this change does not apply to QML. We used the `setTimeout`/`clearTimeout` APIs in this feature and those are not available on the QML platform. Follow [Bug 1743140](https://bugzilla.mozilla.org/show_bug.cgi?id=1743140) for updates.
* [#1015](https://github.com/mozilla/glean.js/pull/1015): BUGFIX: Make attempting to call the `setUploadEnabled` API before initializing Glean a no-op.
* [#1016](https://github.com/mozilla/glean.js/pull/1016): BUGFIX: Make shutdown a no-op in case Glean is not initialized.

Expand Down
2 changes: 2 additions & 0 deletions glean/src/core/upload/manager.ts
Expand Up @@ -228,6 +228,8 @@ class PingUploadManager implements PingsDatabaseObserver {
*
* This does not interfere in the jobs themselves.
*
* It the rate limit is hit, this will resolve without finishing to process everything.
*
* @returns A promise which resolves once current ongoing upload worker job is complete.
* This should not hang for too long because of the upload limitations.
*/
Expand Down
73 changes: 67 additions & 6 deletions glean/src/core/upload/worker.ts
Expand Up @@ -6,7 +6,7 @@ import { gzipSync, strToU8 } from "fflate";

import type { QueuedPing } from "./manager";
import type Uploader from "./uploader.js";
import type { UploadTask} from "./task.js";
import type { UploadTask } from "./task.js";
import { GLEAN_VERSION } from "../constants.js";
import { Context } from "../context.js";
import log, { LoggingLevel } from "../log.js";
Expand All @@ -27,6 +27,22 @@ class PingBodyOverflowError extends Error {
class PingUploadWorker {
private currentJob?: Promise<void>;

// Whether or not someone is blocking on the currentJob.
private isBlocking = false;

// The id of the current timer running due to a Wait signal.
// If `undefined` no timer is currently running.
//
// This is necessary in case we need to clear the timeout due to aborting the worker.
private waitTimeoutId?: number;

// A resolver for the waiting promise created due to a Wait signal.
//
// This is necessary for the case when the worker is aborted and the timeout cleared.
// In that case the timeout will not resolve the promise itself
// and it will need to be resolved from the outside.
private waitPromiseResolver?: (aborted: boolean) => void;

constructor (
private readonly uploader: Uploader,
private readonly serverEndpoint: string,
Expand Down Expand Up @@ -129,11 +145,33 @@ class PingUploadWorker {
const result = await this.attemptPingUpload(nextTask.ping);
await processUploadResponse(nextTask.ping, result);
continue;
// TODO(bug1727076): Actually set a timer to continue once timeout is complete.
// Note that the timer must be cleared in case an abort signal is issued.
case UploadTaskTypes.Wait:
if (this.isBlocking) {
return;
}

try {
const wasAborted = await new Promise<boolean>(resolve => {
this.waitPromiseResolver = resolve;
this.waitTimeoutId = Context.platform.timer
.setTimeout(() => {
this.waitPromiseResolver = undefined;
this.waitTimeoutId = undefined;
resolve(false);
}, nextTask.remainingTime);
});

if (wasAborted) {
return;
}
} catch(_) {
this.waitPromiseResolver = undefined;
this.waitTimeoutId = undefined;
return;
}

continue;
case UploadTaskTypes.Done:
this.currentJob = undefined;
return;
}
}
Expand All @@ -152,7 +190,17 @@ class PingUploadWorker {
processUploadResponse: (ping: QueuedPing, result: UploadResult) => Promise<void>,
): void {
if (!this.currentJob) {
this.currentJob = this.workInternal(getUploadTask, processUploadResponse);
this.currentJob = this.workInternal(getUploadTask, processUploadResponse)
.then(() => {
this.currentJob = undefined;
})
.catch(error => {
log(
LOG_TAG,
[ "IMPOSSIBLE: Something went wrong while processing ping upload tasks.", error ],
LoggingLevel.Error
);
});
}
}

Expand All @@ -169,7 +217,20 @@ class PingUploadWorker {
*/
async blockOnCurrentJob() {
if (this.currentJob) {
return this.currentJob;
// If we are currently waiting, just cut the timeout short
// and stop the current job.
if (this.waitTimeoutId && this.waitPromiseResolver) {
Context.platform.timer.clearTimeout(this.waitTimeoutId);
this.waitPromiseResolver(true);
this.waitPromiseResolver = undefined;
this.waitTimeoutId = undefined;
}

this.isBlocking = true;
await this.currentJob;
this.isBlocking = false;

return;
}

return Promise.resolve();
Expand Down
1 change: 1 addition & 0 deletions glean/src/platform/browser/web/index.ts
Expand Up @@ -11,6 +11,7 @@ const WebPlaftorm: Platform = {
Storage,
uploader,
info,
timer: { setTimeout, clearTimeout },
name: "web"
};

Expand Down
1 change: 1 addition & 0 deletions glean/src/platform/browser/webext/index.ts
Expand Up @@ -12,6 +12,7 @@ const WebExtPlatform: Platform = {
Storage,
uploader,
info,
timer: { setTimeout, clearTimeout },
name: "webext"
};

Expand Down
9 changes: 7 additions & 2 deletions glean/src/platform/index.ts
Expand Up @@ -18,9 +18,14 @@ interface Platform {
Storage: StorageBuilder,
// The environment specific uploader implementation
uploader: Uploader,
// The environment specifici implemtation of platform information getters
// The environment specific implementation of platform information getters
info: PlatformInfo,
// The name of the platform, useful for logging and debugging purposes.
// The timer functions available on the current platform
timer: {
setTimeout: (cb: () => void, timeout: number) => number,
clearTimeout: (id: number) => void,
},
// The name of the platform, useful for logging and debugging purposes
name: string,
}

Expand Down
1 change: 1 addition & 0 deletions glean/src/platform/node/index.ts
Expand Up @@ -15,6 +15,7 @@ const NodePlatform: Platform = {
...TestPlatform,
uploader: Uploader,
info: PlatformInfo,
timer: { setTimeout, clearTimeout },
name: "node"
};

Expand Down
6 changes: 6 additions & 0 deletions glean/src/platform/qt/index.ts
Expand Up @@ -13,6 +13,12 @@ const QtPlatform: Platform = {
Storage,
uploader,
info,
timer: {
// TODO(bug1743140): Actually implement these functions here.
setTimeout: () => { throw new Error(); },
// eslint-disable-next-line @typescript-eslint/no-empty-function
clearTimeout: () => {}
},
name: "Qt"
};

Expand Down
5 changes: 5 additions & 0 deletions glean/src/platform/test/index.ts
Expand Up @@ -35,10 +35,15 @@ const MockPlatformInfo: PlatformInfo = {
},
};

const safeSetTimeout = typeof setTimeout !== "undefined" ? setTimeout : () => { throw new Error(); };
// eslint-disable-next-line @typescript-eslint/no-empty-function
const safeClearTimeout = typeof clearTimeout !== "undefined" ? clearTimeout : () => {};

const TestPlatform: Platform = {
Storage: MockStorage,
uploader: new MockUploader(),
info: MockPlatformInfo,
timer: { setTimeout: safeSetTimeout, clearTimeout: safeClearTimeout },
name: "test"
};

Expand Down

0 comments on commit dc4b6a5

Please sign in to comment.