Skip to content

Commit ada4691

Browse files
authored
[apps] fix streaming retry logic (#256)
* fix streaming retry logic * Update retry.ts
1 parent de1437f commit ada4691

File tree

3 files changed

+50
-18
lines changed

3 files changed

+50
-18
lines changed

packages/apps/src/plugins/http/plugin.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ export class HttpPlugin implements ISender {
121121
reject(err);
122122
});
123123

124-
this._server.listen(port, () => {
124+
this._server.listen(port, () => {
125125
this.logger.info(`listening on port ${port} 🚀`);
126126
resolve();
127127
});
@@ -194,7 +194,8 @@ export class HttpPlugin implements ISender {
194194
token: this.botToken,
195195
})
196196
),
197-
ref
197+
ref,
198+
this.logger
198199
);
199200
}
200201

packages/apps/src/plugins/http/stream.ts

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
SentActivity,
1111
TypingActivity,
1212
} from '@microsoft/teams.api';
13-
import { EventEmitter } from '@microsoft/teams.common';
13+
import { ConsoleLogger, EventEmitter, ILogger } from '@microsoft/teams.common';
1414

1515
import { IStreamer, IStreamerEvents } from '../../types';
1616
import { promises } from '../../utils';
@@ -30,10 +30,12 @@ export class HttpStream implements IStreamer {
3030

3131
private _result?: SentActivity;
3232
private _timeout?: NodeJS.Timeout;
33+
private _logger: ILogger;
3334

34-
constructor(client: Client, ref: ConversationReference) {
35+
constructor(client: Client, ref: ConversationReference, logger?: ILogger) {
3536
this.client = client;
3637
this.ref = ref;
38+
this._logger = logger?.child('stream') || new ConsoleLogger('@teams/http/stream');
3739
}
3840

3941
emit(activity: Partial<IMessageActivity> | string) {
@@ -54,8 +56,15 @@ export class HttpStream implements IStreamer {
5456
}
5557

5658
async close() {
57-
if (!this.index && !this.queue.length) return;
58-
if (this._result) return this._result;
59+
if (!this.index && !this.queue.length) {
60+
this._logger.debug('closed with no content');
61+
return;
62+
}
63+
64+
if (this._result) {
65+
this._logger.debug('already closed');
66+
return this._result;
67+
}
5968

6069
while (!this.id || this.queue.length) {
6170
await new Promise((resolve) => setTimeout(resolve, 200));
@@ -68,7 +77,10 @@ export class HttpStream implements IStreamer {
6877
.addStreamFinal()
6978
.withChannelData(this.channelData);
7079

71-
const res = await promises.retry(this.send(activity));
80+
const res = await promises.retry(() => this.send(activity), {
81+
logger: this._logger
82+
});
83+
7284
this.events.emit('close', res);
7385

7486
this.index = 0;
@@ -78,6 +90,7 @@ export class HttpStream implements IStreamer {
7890
this.channelData = {};
7991
this.entities = [];
8092
this._result = res;
93+
this._logger.debug(res);
8194
return res;
8295
}
8396

@@ -88,10 +101,9 @@ export class HttpStream implements IStreamer {
88101
this._timeout = undefined;
89102
}
90103

91-
const size = Math.round(this.queue.length / 10);
92104
let i = 0;
93105

94-
while (this.queue.length && i <= size) {
106+
while (this.queue.length && i < 10) {
95107
const activity = this.queue.shift();
96108

97109
if (!activity) continue;
@@ -118,13 +130,18 @@ export class HttpStream implements IStreamer {
118130
i++;
119131
}
120132

121-
this.index++;
133+
if (i === 0) return;
134+
122135
const activity = new TypingActivity({ id: this.id })
123136
.withText(this.text)
124-
.addStreamUpdate(this.index);
137+
.addStreamUpdate(this.index + 1);
138+
139+
const res = await promises.retry(() => this.send(activity), {
140+
logger: this._logger
141+
});
125142

126-
const res = await promises.retry(this.send(activity));
127143
this.events.emit('chunk', res);
144+
this.index++;
128145

129146
if (!this.id) {
130147
this.id = res.id;
Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { ILogger } from '@microsoft/teams.common';
2+
13
export type RetryOptions = {
24
/**
35
* the max number of retry attempts
@@ -7,23 +9,35 @@ export type RetryOptions = {
79

810
/**
911
* the delay in ms per retry
10-
* @default 200
1112
*/
1213
readonly delay?: number;
14+
15+
/**
16+
* the logger to use
17+
*/
18+
readonly logger?: ILogger;
1319
};
1420

15-
export async function retry<T = any>(promise: Promise<T>, options?: RetryOptions) {
16-
const max = options?.max || 3;
17-
const delay = options?.delay || 200;
21+
export async function retry<T = any>(factory: () => Promise<T>, options?: RetryOptions) {
22+
const max = options?.max ?? 5;
23+
const delay = options?.delay ?? 500;
24+
const log = options?.logger?.child('retry');
1825

1926
try {
20-
return await promise;
27+
return await factory();
2128
} catch (err) {
2229
if (max > 0) {
30+
log?.debug(`delaying ${delay}ms...`);
2331
await new Promise((resolve) => setTimeout(resolve, delay));
24-
return retry(promise, { max: max - 1, delay: delay * 2 });
32+
log?.debug('retrying...');
33+
return retry(factory, {
34+
max: max - 1,
35+
delay: delay * 2,
36+
logger: options?.logger,
37+
});
2538
}
2639

40+
log?.error(err);
2741
throw err;
2842
}
2943
}

0 commit comments

Comments
 (0)