-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
progress.ts
127 lines (113 loc) · 4.22 KB
/
progress.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { TimeoutError } from '../utils/errors';
import { assert, monotonicTime } from '../utils/utils';
import { LogName } from '../utils/debugLogger';
import { CallMetadata, Instrumentation, SdkObject } from './instrumentation';
export interface Progress {
log(message: string): void;
timeUntilDeadline(): number;
isRunning(): boolean;
cleanupWhenAborted(cleanup: () => any): void;
throwIfAborted(): void;
beforeInputAction(): Promise<void>;
afterInputAction(): Promise<void>;
metadata: CallMetadata;
}
export class ProgressController {
// Promise and callback that forcefully abort the progress.
// This promise always rejects.
private _forceAbort: (error: Error) => void = () => {};
private _forceAbortPromise: Promise<any>;
// Cleanups to be run only in the case of abort.
private _cleanups: (() => any)[] = [];
private _logName = 'api';
private _state: 'before' | 'running' | 'aborted' | 'finished' = 'before';
private _deadline: number = 0;
private _timeout: number = 0;
readonly metadata: CallMetadata;
readonly instrumentation: Instrumentation;
readonly sdkObject: SdkObject;
constructor(metadata: CallMetadata, sdkObject: SdkObject) {
this.metadata = metadata;
this.sdkObject = sdkObject;
this.instrumentation = sdkObject.instrumentation;
this._forceAbortPromise = new Promise((resolve, reject) => this._forceAbort = reject);
this._forceAbortPromise.catch(e => null); // Prevent unhandled promise rejection.
}
setLogName(logName: LogName) {
this._logName = logName;
}
async run<T>(task: (progress: Progress) => Promise<T>, timeout?: number): Promise<T> {
if (timeout) {
this._timeout = timeout;
this._deadline = timeout ? monotonicTime() + timeout : 0;
}
assert(this._state === 'before');
this._state = 'running';
const progress: Progress = {
log: message => {
if (this._state === 'running')
this.metadata.log.push(message);
// Note: we might be sending logs after progress has finished, for example browser logs.
this.instrumentation.onCallLog(this._logName, message, this.sdkObject, this.metadata);
},
timeUntilDeadline: () => this._deadline ? this._deadline - monotonicTime() : 2147483647, // 2^31-1 safe setTimeout in Node.
isRunning: () => this._state === 'running',
cleanupWhenAborted: (cleanup: () => any) => {
if (this._state === 'running')
this._cleanups.push(cleanup);
else
runCleanup(cleanup);
},
throwIfAborted: () => {
if (this._state === 'aborted')
throw new AbortedError();
},
beforeInputAction: async () => {
await this.instrumentation.onBeforeInputAction(this.sdkObject, this.metadata);
},
afterInputAction: async () => {
await this.instrumentation.onAfterInputAction(this.sdkObject, this.metadata);
},
metadata: this.metadata
};
const timeoutError = new TimeoutError(`Timeout ${this._timeout}ms exceeded.`);
const timer = setTimeout(() => this._forceAbort(timeoutError), progress.timeUntilDeadline());
try {
const promise = task(progress);
const result = await Promise.race([promise, this._forceAbortPromise]);
this._state = 'finished';
return result;
} catch (e) {
this._state = 'aborted';
await Promise.all(this._cleanups.splice(0).map(cleanup => runCleanup(cleanup)));
throw e;
} finally {
clearTimeout(timer);
}
}
abort(error: Error) {
this._forceAbort(error);
}
}
async function runCleanup(cleanup: () => any) {
try {
await cleanup();
} catch (e) {
}
}
class AbortedError extends Error {}