From 503f9544709619965913d32e24b3d057467b6301 Mon Sep 17 00:00:00 2001 From: Jari Kolehmainen Date: Fri, 12 Feb 2021 10:23:44 +0200 Subject: [PATCH 1/2] handle suspend/resume error from stream read Signed-off-by: Jari Kolehmainen --- package.json | 2 +- src/renderer/api/kube-api.ts | 23 +++++--- src/renderer/utils/readableStream.ts | 87 ++++++++++++++++++++++++++++ yarn.lock | 26 ++------- 4 files changed, 109 insertions(+), 29 deletions(-) create mode 100644 src/renderer/utils/readableStream.ts diff --git a/package.json b/package.json index f663accadcbf..fcbc51dd656c 100644 --- a/package.json +++ b/package.json @@ -222,7 +222,7 @@ "react": "^17.0.1", "react-dom": "^17.0.1", "react-router": "^5.2.0", - "readable-web-to-node-stream": "^3.0.1", + "readable-stream": "^3.6.0", "request": "^2.88.2", "request-promise-native": "^1.0.8", "semver": "^7.3.2", diff --git a/src/renderer/api/kube-api.ts b/src/renderer/api/kube-api.ts index 97cfb0522b74..a880cc2406f2 100644 --- a/src/renderer/api/kube-api.ts +++ b/src/renderer/api/kube-api.ts @@ -10,8 +10,8 @@ import { createKubeApiURL, parseKubeApi } from "./kube-api-parse"; import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api"; import { IKubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object"; import byline from "byline"; -import { ReadableWebToNodeStream } from "readable-web-to-node-stream"; import { IKubeWatchEvent } from "./kube-watch-api"; +import { ReadableWebToNodeStream } from "../utils/readableStream"; export interface IKubeApiOptions { /** @@ -373,7 +373,13 @@ export class KubeApi { opts.abortController = new AbortController(); } let errorReceived = false; + let timedRetry: NodeJS.Timeout; const { abortController, namespace, callback } = opts; + + abortController.signal.addEventListener("abort", () => { + clearTimeout(timedRetry); + }); + const watchUrl = this.getWatchUrl(namespace); const responsePromise = this.request.getResponse(watchUrl, null, { signal: abortController.signal @@ -387,14 +393,17 @@ export class KubeApi { } const nodeStream = new ReadableWebToNodeStream(response.body); - nodeStream.on("end", () => { - if (errorReceived) return; // kubernetes errors should be handled in a callback + ["end", "close", "error"].forEach((eventName) => { + nodeStream.on(eventName, () => { + if (errorReceived) return; // kubernetes errors should be handled in a callback - setTimeout(() => { // we did not get any kubernetes errors so let's retry - if (abortController.signal.aborted) return; + clearTimeout(timedRetry); + timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry + if (abortController.signal.aborted) return; - this.watch({...opts, namespace, callback}); - }, 1000); + this.watch({...opts, namespace, callback}); + }, 1000); + }); }); const stream = byline(nodeStream); diff --git a/src/renderer/utils/readableStream.ts b/src/renderer/utils/readableStream.ts new file mode 100644 index 000000000000..3b511064271b --- /dev/null +++ b/src/renderer/utils/readableStream.ts @@ -0,0 +1,87 @@ +import { Readable } from "readable-stream"; + +/** + * ReadableWebToNodeStream + * + * Copied from https://github.com/Borewit/readable-web-to-node-stream + * + * Adds read error handler + * + * */ +export class ReadableWebToNodeStream extends Readable { + + public bytesRead = 0; + public released = false; + + /** + * Default web API stream reader + * https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader + */ + private reader: ReadableStreamReader; + private pendingRead: Promise; + + /** + * + * @param stream Readable​Stream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream + */ + constructor(stream: ReadableStream) { + super(); + this.reader = stream.getReader(); + } + + /** + * Implementation of readable._read(size). + * When readable._read() is called, if data is available from the resource, + * the implementation should begin pushing that data into the read queue + * https://nodejs.org/api/stream.html#stream_readable_read_size_1 + */ + public async _read() { + // Should start pushing data into the queue + // Read data from the underlying Web-API-readable-stream + if (this.released) { + this.push(null); // Signal EOF + + return; + } + + try { + this.pendingRead = this.reader.read(); + const data = await this.pendingRead; + + // clear the promise before pushing pushing new data to the queue and allow sequential calls to _read() + delete this.pendingRead; + + if (data.done || this.released) { + this.push(null); // Signal EOF + } else { + this.bytesRead += data.value.length; + this.push(data.value); // Push new data to the queue + } + } catch(error) { + this.push(null); // Signal EOF + } + } + + /** + * If there is no unresolved read call to Web-API Readable​Stream immediately returns; + * otherwise will wait until the read is resolved. + */ + public async waitForReadToComplete() { + if (this.pendingRead) { + await this.pendingRead; + } + } + + /** + * Close wrapper + */ + public async close(): Promise { + await this.syncAndRelease(); + } + + private async syncAndRelease() { + this.released = true; + await this.waitForReadToComplete(); + await this.reader.releaseLock(); + } +} diff --git a/yarn.lock b/yarn.lock index cd3384eba509..db3e7c7e2c5d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1615,14 +1615,6 @@ "@types/prop-types" "*" csstype "^3.0.2" -"@types/readable-stream@^2.3.9": - version "2.3.9" - resolved "https://registry.yarnpkg.com/@types/readable-stream/-/readable-stream-2.3.9.tgz#40a8349e6ace3afd2dd1b6d8e9b02945de4566a9" - integrity sha512-sqsgQqFT7HmQz/V5jH1O0fvQQnXAJO46Gg9LRO/JPfjmVmGUlcx831TZZO3Y3HtWhIkzf3kTsNT0Z0kzIhIvZw== - dependencies: - "@types/node" "*" - safe-buffer "*" - "@types/relateurl@*": version "0.2.28" resolved "https://registry.yarnpkg.com/@types/relateurl/-/relateurl-0.2.28.tgz#6bda7db8653fa62643f5ee69e9f69c11a392e3a6" @@ -11464,14 +11456,6 @@ readable-stream@~1.1.10: isarray "0.0.1" string_decoder "~0.10.x" -readable-web-to-node-stream@^3.0.1: - version "3.0.1" - resolved "https://registry.yarnpkg.com/readable-web-to-node-stream/-/readable-web-to-node-stream-3.0.1.tgz#3f619b1bc5dd73a4cfe5c5f9b4f6faba55dff845" - integrity sha512-4zDC6CvjUyusN7V0QLsXVB7pJCD9+vtrM9bYDRv6uBQ+SKfx36rp5AFNPRgh9auKRul/a1iFZJYXcCbwRL+SaA== - dependencies: - "@types/readable-stream" "^2.3.9" - readable-stream "^3.6.0" - readdir-scoped-modules@^1.0.0, readdir-scoped-modules@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/readdir-scoped-modules/-/readdir-scoped-modules-1.1.0.tgz#8d45407b4f870a0dcaebc0e28670d18e74514309" @@ -11890,16 +11874,16 @@ rxjs@^6.5.2: dependencies: tslib "^1.9.0" -safe-buffer@*, safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0: - version "5.2.1" - resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6" - integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== - safe-buffer@5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1: version "5.1.2" resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d" integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g== +safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0: + version "5.2.1" + resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6" + integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== + safe-regex@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/safe-regex/-/safe-regex-1.1.0.tgz#40a3669f3b077d1e943d44629e157dd48023bf2e" From 4dfc90bd2e778ac10913f537e3920b077335924c Mon Sep 17 00:00:00 2001 From: Jari Kolehmainen Date: Fri, 12 Feb 2021 10:35:11 +0200 Subject: [PATCH 2/2] add missing types Signed-off-by: Jari Kolehmainen --- package.json | 1 + yarn.lock | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index fcbc51dd656c..05da6d14a524 100644 --- a/package.json +++ b/package.json @@ -277,6 +277,7 @@ "@types/react-router-dom": "^5.1.6", "@types/react-select": "^3.0.13", "@types/react-window": "^1.8.2", + "@types/readable-stream": "^2.3.9", "@types/request": "^2.48.5", "@types/request-promise-native": "^1.0.17", "@types/semver": "^7.2.0", diff --git a/yarn.lock b/yarn.lock index db3e7c7e2c5d..16ac2ca4f8d1 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1615,6 +1615,14 @@ "@types/prop-types" "*" csstype "^3.0.2" +"@types/readable-stream@^2.3.9": + version "2.3.9" + resolved "https://registry.yarnpkg.com/@types/readable-stream/-/readable-stream-2.3.9.tgz#40a8349e6ace3afd2dd1b6d8e9b02945de4566a9" + integrity sha512-sqsgQqFT7HmQz/V5jH1O0fvQQnXAJO46Gg9LRO/JPfjmVmGUlcx831TZZO3Y3HtWhIkzf3kTsNT0Z0kzIhIvZw== + dependencies: + "@types/node" "*" + safe-buffer "*" + "@types/relateurl@*": version "0.2.28" resolved "https://registry.yarnpkg.com/@types/relateurl/-/relateurl-0.2.28.tgz#6bda7db8653fa62643f5ee69e9f69c11a392e3a6" @@ -11874,16 +11882,16 @@ rxjs@^6.5.2: dependencies: tslib "^1.9.0" +safe-buffer@*, safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0: + version "5.2.1" + resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6" + integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== + safe-buffer@5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1: version "5.1.2" resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d" integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g== -safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0: - version "5.2.1" - resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6" - integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== - safe-regex@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/safe-regex/-/safe-regex-1.1.0.tgz#40a3669f3b077d1e943d44629e157dd48023bf2e"