-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle suspend/resume error from watch stream read #2136
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,8 +10,8 @@ import { createKubeApiURL, parseKubeApi } from "./kube-api-parse"; | |
import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api"; | ||
import { IKubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object"; | ||
import byline from "byline"; | ||
import { ReadableWebToNodeStream } from "readable-web-to-node-stream"; | ||
import { IKubeWatchEvent } from "./kube-watch-api"; | ||
import { ReadableWebToNodeStream } from "../utils/readableStream"; | ||
|
||
export interface IKubeApiOptions<T extends KubeObject> { | ||
/** | ||
|
@@ -373,7 +373,13 @@ export class KubeApi<T extends KubeObject = any> { | |
opts.abortController = new AbortController(); | ||
} | ||
let errorReceived = false; | ||
let timedRetry: NodeJS.Timeout; | ||
const { abortController, namespace, callback } = opts; | ||
|
||
abortController.signal.addEventListener("abort", () => { | ||
clearTimeout(timedRetry); | ||
}); | ||
|
||
const watchUrl = this.getWatchUrl(namespace); | ||
const responsePromise = this.request.getResponse(watchUrl, null, { | ||
signal: abortController.signal | ||
|
@@ -387,14 +393,17 @@ export class KubeApi<T extends KubeObject = any> { | |
} | ||
const nodeStream = new ReadableWebToNodeStream(response.body); | ||
|
||
nodeStream.on("end", () => { | ||
if (errorReceived) return; // kubernetes errors should be handled in a callback | ||
["end", "close", "error"].forEach((eventName) => { | ||
nodeStream.on(eventName, () => { | ||
if (errorReceived) return; // kubernetes errors should be handled in a callback | ||
|
||
setTimeout(() => { // we did not get any kubernetes errors so let's retry | ||
if (abortController.signal.aborted) return; | ||
clearTimeout(timedRetry); | ||
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry | ||
if (abortController.signal.aborted) return; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this give lines 379-381 above, if the signal has already aborted, shouldn't the watch just stop anyway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, we don't need this. |
||
|
||
this.watch({...opts, namespace, callback}); | ||
}, 1000); | ||
this.watch({...opts, namespace, callback}); | ||
}, 1000); | ||
}); | ||
}); | ||
|
||
const stream = byline(nodeStream); | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,87 @@ | ||||||
import { Readable } from "readable-stream"; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Since we bundle app with Node.js There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, I'll do that in a separate PR. |
||||||
|
||||||
/** | ||||||
* ReadableWebToNodeStream | ||||||
* | ||||||
* Copied from https://github.com/Borewit/readable-web-to-node-stream | ||||||
* | ||||||
* Adds read error handler | ||||||
* | ||||||
* */ | ||||||
export class ReadableWebToNodeStream extends Readable { | ||||||
|
||||||
public bytesRead = 0; | ||||||
public released = false; | ||||||
|
||||||
/** | ||||||
* Default web API stream reader | ||||||
* https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader | ||||||
*/ | ||||||
private reader: ReadableStreamReader; | ||||||
private pendingRead: Promise<any>; | ||||||
|
||||||
/** | ||||||
* | ||||||
* @param stream ReadableStream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream | ||||||
*/ | ||||||
constructor(stream: ReadableStream) { | ||||||
super(); | ||||||
this.reader = stream.getReader(); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Implementation of readable._read(size). | ||||||
* When readable._read() is called, if data is available from the resource, | ||||||
* the implementation should begin pushing that data into the read queue | ||||||
* https://nodejs.org/api/stream.html#stream_readable_read_size_1 | ||||||
*/ | ||||||
public async _read() { | ||||||
// Should start pushing data into the queue | ||||||
// Read data from the underlying Web-API-readable-stream | ||||||
if (this.released) { | ||||||
this.push(null); // Signal EOF | ||||||
|
||||||
return; | ||||||
} | ||||||
|
||||||
try { | ||||||
this.pendingRead = this.reader.read(); | ||||||
const data = await this.pendingRead; | ||||||
|
||||||
// clear the promise before pushing pushing new data to the queue and allow sequential calls to _read() | ||||||
delete this.pendingRead; | ||||||
|
||||||
if (data.done || this.released) { | ||||||
this.push(null); // Signal EOF | ||||||
} else { | ||||||
this.bytesRead += data.value.length; | ||||||
this.push(data.value); // Push new data to the queue | ||||||
} | ||||||
} catch(error) { | ||||||
this.push(null); // Signal EOF | ||||||
} | ||||||
Comment on lines
+60
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the change? Why do we want to signal There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot even catch errors that are thrown from here. If we could I would not have copied this library to our codebase. All other error cases end up with EOF so it makes sense to push this error to the similar path. I really would like to get this fix into upstream and that means this class itself won't be logging anything. |
||||||
} | ||||||
|
||||||
/** | ||||||
* If there is no unresolved read call to Web-API ReadableStream immediately returns; | ||||||
* otherwise will wait until the read is resolved. | ||||||
*/ | ||||||
public async waitForReadToComplete() { | ||||||
if (this.pendingRead) { | ||||||
await this.pendingRead; | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Close wrapper | ||||||
*/ | ||||||
public async close(): Promise<void> { | ||||||
await this.syncAndRelease(); | ||||||
} | ||||||
|
||||||
private async syncAndRelease() { | ||||||
this.released = true; | ||||||
await this.waitForReadToComplete(); | ||||||
await this.reader.releaseLock(); | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment is wrong. Shouldn't it read something like: