Skip to content

Commit e035a0d

Browse files
pkuGeosteipete
authored andcommitted
telegram: rebuild transport after stalled polling cycles
1 parent 663ba5a commit e035a0d

File tree

2 files changed

+97
-11
lines changed

2 files changed

+97
-11
lines changed

extensions/telegram/src/monitor.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,10 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
9191
const activeRunner = pollingSession?.activeRunner;
9292
if (isNetworkError && isTelegramPollingError && activeRunner && activeRunner.isRunning()) {
9393
pollingSession?.markForceRestarted();
94+
pollingSession?.markTransportDirty();
9495
pollingSession?.abortActiveFetch();
9596
void activeRunner.stop().catch(() => {});
97+
log("[telegram][diag] marking transport dirty after polling network failure");
9698
log(
9799
`[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`,
98100
);
@@ -180,9 +182,11 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
180182
}
181183

182184
// Create transport once to preserve sticky IPv4 fallback state across polling restarts
183-
const telegramTransport = resolveTelegramTransport(proxyFetch, {
184-
network: account.config.network,
185-
});
185+
const createTelegramTransportForPolling = () =>
186+
resolveTelegramTransport(proxyFetch, {
187+
network: account.config.network,
188+
});
189+
const telegramTransport = createTelegramTransportForPolling();
186190

187191
pollingSession = new TelegramPollingSession({
188192
token,
@@ -196,6 +200,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
196200
persistUpdateId,
197201
log,
198202
telegramTransport,
203+
createTelegramTransport: createTelegramTransportForPolling,
199204
});
200205
await pollingSession.runUntilAbort();
201206
} finally {

extensions/telegram/src/polling-session.ts

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type TelegramPollingSessionOpts = {
5050
log: (line: string) => void;
5151
/** Pre-resolved Telegram transport to reuse across bot instances */
5252
telegramTransport?: TelegramTransport;
53+
/** Rebuild Telegram transport after stall/network recovery when marked dirty. */
54+
createTelegramTransport?: () => TelegramTransport;
5355
};
5456

5557
export class TelegramPollingSession {
@@ -58,8 +60,12 @@ export class TelegramPollingSession {
5860
#forceRestarted = false;
5961
#activeRunner: ReturnType<typeof run> | undefined;
6062
#activeFetchAbort: AbortController | undefined;
63+
#telegramTransport: TelegramTransport | undefined;
64+
#discardTransportOnRestart = false;
6165

62-
constructor(private readonly opts: TelegramPollingSessionOpts) {}
66+
constructor(private readonly opts: TelegramPollingSessionOpts) {
67+
this.#telegramTransport = opts.telegramTransport;
68+
}
6369

6470
get activeRunner() {
6571
return this.#activeRunner;
@@ -69,6 +75,10 @@ export class TelegramPollingSession {
6975
this.#forceRestarted = true;
7076
}
7177

78+
markTransportDirty() {
79+
this.#discardTransportOnRestart = true;
80+
}
81+
7282
abortActiveFetch() {
7383
this.#activeFetchAbort?.abort();
7484
}
@@ -126,6 +136,15 @@ export class TelegramPollingSession {
126136
async #createPollingBot(): Promise<TelegramBot | undefined> {
127137
const fetchAbortController = new AbortController();
128138
this.#activeFetchAbort = fetchAbortController;
139+
const shouldRebuildTransport = this.#discardTransportOnRestart || !this.#telegramTransport;
140+
const telegramTransport = shouldRebuildTransport
141+
? (this.opts.createTelegramTransport?.() ?? this.#telegramTransport)
142+
: this.#telegramTransport;
143+
if (shouldRebuildTransport && telegramTransport) {
144+
this.opts.log("[telegram][diag] rebuilding transport for next polling cycle");
145+
}
146+
this.#telegramTransport = telegramTransport;
147+
this.#discardTransportOnRestart = false;
129148
try {
130149
return createTelegramBot({
131150
token: this.opts.token,
@@ -138,7 +157,7 @@ export class TelegramPollingSession {
138157
lastUpdateId: this.opts.getLastUpdateId(),
139158
onUpdateId: this.opts.persistUpdateId,
140159
},
141-
telegramTransport: this.opts.telegramTransport,
160+
telegramTransport,
142161
});
143162
} catch (err) {
144163
await this.#waitBeforeRetryOnRecoverableSetupError(err, "Telegram setup network error");
@@ -186,11 +205,49 @@ export class TelegramPollingSession {
186205
await this.#confirmPersistedOffset(bot);
187206

188207
let lastGetUpdatesAt = Date.now();
189-
bot.api.config.use((prev, method, payload, signal) => {
190-
if (method === "getUpdates") {
191-
lastGetUpdatesAt = Date.now();
208+
let lastGetUpdatesStartedAt: number | null = null;
209+
let lastGetUpdatesFinishedAt: number | null = null;
210+
let lastGetUpdatesDurationMs: number | null = null;
211+
let lastGetUpdatesOutcome = "not-started";
212+
let lastGetUpdatesError: string | null = null;
213+
let lastGetUpdatesOffset: number | null = null;
214+
let inFlightGetUpdates = 0;
215+
let stopSequenceLogged = false;
216+
let stallDiagLoggedAt = 0;
217+
218+
bot.api.config.use(async (prev, method, payload, signal) => {
219+
if (method !== "getUpdates") {
220+
return prev(method, payload, signal);
221+
}
222+
223+
const startedAt = Date.now();
224+
lastGetUpdatesAt = startedAt;
225+
lastGetUpdatesStartedAt = startedAt;
226+
lastGetUpdatesOffset =
227+
payload && typeof payload === "object" && "offset" in payload
228+
? ((payload as { offset?: number }).offset ?? null)
229+
: null;
230+
inFlightGetUpdates += 1;
231+
lastGetUpdatesOutcome = "started";
232+
lastGetUpdatesError = null;
233+
234+
try {
235+
const result = await prev(method, payload, signal);
236+
const finishedAt = Date.now();
237+
lastGetUpdatesFinishedAt = finishedAt;
238+
lastGetUpdatesDurationMs = finishedAt - startedAt;
239+
lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok";
240+
return result;
241+
} catch (err) {
242+
const finishedAt = Date.now();
243+
lastGetUpdatesFinishedAt = finishedAt;
244+
lastGetUpdatesDurationMs = finishedAt - startedAt;
245+
lastGetUpdatesOutcome = "error";
246+
lastGetUpdatesError = formatErrorMessage(err);
247+
throw err;
248+
} finally {
249+
inFlightGetUpdates = Math.max(0, inFlightGetUpdates - 1);
192250
}
193-
return prev(method, payload, signal);
194251
});
195252

196253
const runner = run(bot, this.opts.runnerOptions);
@@ -236,11 +293,26 @@ export class TelegramPollingSession {
236293
if (this.opts.abortSignal?.aborted) {
237294
return;
238295
}
239-
const elapsed = Date.now() - lastGetUpdatesAt;
296+
297+
const now = Date.now();
298+
const activeElapsed =
299+
inFlightGetUpdates > 0 && lastGetUpdatesStartedAt != null ? now - lastGetUpdatesStartedAt : 0;
300+
const idleElapsed = inFlightGetUpdates > 0 ? 0 : now - (lastGetUpdatesFinishedAt ?? lastGetUpdatesAt);
301+
const elapsed = inFlightGetUpdates > 0 ? activeElapsed : idleElapsed;
302+
240303
if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) {
304+
if (stallDiagLoggedAt && now - stallDiagLoggedAt < POLL_STALL_THRESHOLD_MS / 2) {
305+
return;
306+
}
307+
stallDiagLoggedAt = now;
308+
this.#discardTransportOnRestart = true;
241309
stalledRestart = true;
310+
const elapsedLabel =
311+
inFlightGetUpdates > 0
312+
? `active getUpdates stuck for ${formatDurationPrecise(elapsed)}`
313+
: `no completed getUpdates for ${formatDurationPrecise(elapsed)}`;
242314
this.opts.log(
243-
`[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`,
315+
`[telegram] Polling stall detected (${elapsedLabel}); forcing restart. [diag inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${lastGetUpdatesError}` : ""}]`,
244316
);
245317
void stopRunner();
246318
void stopBot();
@@ -270,6 +342,9 @@ export class TelegramPollingSession {
270342
? "unhandled network error"
271343
: "runner stopped (maxRetryTime exceeded or graceful stop)";
272344
this.#forceRestarted = false;
345+
this.opts.log(
346+
`[telegram][diag] polling cycle finished reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${lastGetUpdatesError}` : ""}`,
347+
);
273348
const shouldRestart = await this.#waitBeforeRestart(
274349
(delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`,
275350
);
@@ -284,11 +359,17 @@ export class TelegramPollingSession {
284359
this.#webhookCleared = false;
285360
}
286361
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
362+
if (isConflict || isRecoverable) {
363+
this.#discardTransportOnRestart = true;
364+
}
287365
if (!isConflict && !isRecoverable) {
288366
throw err;
289367
}
290368
const reason = isConflict ? "getUpdates conflict" : "network error";
291369
const errMsg = formatErrorMessage(err);
370+
this.opts.log(
371+
`[telegram][diag] polling cycle error reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"} err=${errMsg}${lastGetUpdatesError ? ` lastGetUpdatesError=${lastGetUpdatesError}` : ""}`,
372+
);
292373
const shouldRestart = await this.#waitBeforeRestart(
293374
(delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`,
294375
);

0 commit comments

Comments
 (0)