Skip to content

Commit

Permalink
Inject handlers into LensProxy to remove circular-deps
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Malton <sebastian@malton.name>
  • Loading branch information
Nokel81 committed Jul 30, 2021
1 parent 4b969b7 commit fd0a4d8
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 117 deletions.
21 changes: 15 additions & 6 deletions src/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import * as LensExtensionsMainApi from "../extensions/main-api";
import { app, autoUpdater, dialog, powerMonitor } from "electron";
import { appName, isMac, productName } from "../common/vars";
import path from "path";
import { LensProxy } from "./proxy/lens-proxy";
import { LensProxy } from "./lens-proxy";
import { WindowManager } from "./window-manager";
import { ClusterManager } from "./cluster-manager";
import { shellSync } from "./shell-sync";
Expand All @@ -49,7 +49,6 @@ import { pushCatalogToRenderer } from "./catalog-pusher";
import { catalogEntityRegistry } from "./catalog";
import { HelmRepoManager } from "./helm/helm-repo-manager";
import { syncGeneralEntities, syncWeblinks, KubeconfigSyncManager } from "./catalog-sources";
import { handleWsUpgrade } from "./proxy/ws-upgrade";
import configurePackages from "../common/configure-packages";
import { PrometheusProviderRegistry } from "./prometheus";
import * as initializers from "./initializers";
Expand All @@ -61,6 +60,10 @@ import { ExtensionsStore } from "../extensions/extensions-store";
import { FilesystemProvisionerStore } from "./extension-filesystem";
import { SentryInit } from "../common/sentry";
import { DetectorRegistry } from "./cluster-detectors/detector-registry";
import { Router } from "./router";
import { initMenu } from "./menu";
import { initTray } from "./tray";
import { kubeApiRequest, shellApiRequest } from "./proxy-functions";

// This has to be called before start using winton-based logger
// For example, before any logger.log
Expand Down Expand Up @@ -160,10 +163,11 @@ app.on("ready", async () => {

HelmRepoManager.createInstance(); // create the instance

const lensProxy = LensProxy.createInstance(
handleWsUpgrade,
req => ClusterManager.getInstance().getClusterForRequest(req),
);
const lensProxy = LensProxy.createInstance(new Router(), {
getClusterForRequest: req => ClusterManager.getInstance().getClusterForRequest(req),
kubeApiRequest,
shellApiRequest,
});

ClusterManager.createInstance().init();
KubeconfigSyncManager.createInstance();
Expand Down Expand Up @@ -208,6 +212,11 @@ app.on("ready", async () => {
logger.info("🖥️ Starting WindowManager");
const windowManager = WindowManager.createInstance();

cleanup.push(
initMenu(windowManager),
initTray(windowManager),
);

installDeveloperTools();

if (!startHidden) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/k8s-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import request, { RequestPromiseOptions } from "request-promise-native";
import { apiKubePrefix } from "../common/vars";
import type { IMetricsReqParams } from "../renderer/api/endpoints/metrics.api";
import { LensProxy } from "./proxy/lens-proxy";
import { LensProxy } from "./lens-proxy";
import type { Cluster } from "./cluster";

export async function k8sRequest<T = any>(cluster: Cluster, path: string, options: RequestPromiseOptions = {}): Promise<T> {
Expand Down
2 changes: 1 addition & 1 deletion src/main/kubeconfig-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import path from "path";
import fs from "fs-extra";
import { dumpConfigYaml } from "../common/kube-helpers";
import logger from "./logger";
import { LensProxy } from "./proxy/lens-proxy";
import { LensProxy } from "./lens-proxy";

export class KubeconfigManager {
protected configDir = app.getPath("temp");
Expand Down
106 changes: 31 additions & 75 deletions src/main/proxy/lens-proxy.ts → src/main/lens-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,59 @@
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

import net from "net";
import type net from "net";
import type http from "http";
import spdy from "spdy";
import httpProxy from "http-proxy";
import url from "url";
import { apiPrefix, apiKubePrefix } from "../../common/vars";
import { Router } from "../router";
import type { ContextHandler } from "../context-handler";
import logger from "../logger";
import { Singleton } from "../../common/utils";
import type { Cluster } from "../cluster";

type WSUpgradeHandler = (req: http.IncomingMessage, socket: net.Socket, head: Buffer) => void;
import { apiPrefix, apiKubePrefix } from "../common/vars";
import type { Router } from "./router";
import type { ContextHandler } from "./context-handler";
import logger from "./logger";
import { Singleton } from "../common/utils";
import type { Cluster } from "./cluster";
import type { ProxyApiRequestArgs } from "./proxy-functions";

type GetClusterForRequest = (req: http.IncomingMessage) => Cluster | null;

export interface LensProxyFunctions {
getClusterForRequest: GetClusterForRequest,
shellApiRequest: (args: ProxyApiRequestArgs) => void | Promise<void>;
kubeApiRequest: (args: ProxyApiRequestArgs) => void | Promise<void>;
}

export class LensProxy extends Singleton {
protected origin: string;
protected proxyServer: http.Server;
protected router = new Router();
protected closed = false;
protected retryCounters = new Map<string, number>();
protected proxy = this.createProxy();
protected getClusterForRequest: GetClusterForRequest;

public port: number;

constructor(handleWsUpgrade: WSUpgradeHandler, protected getClusterForRequest: (req: http.IncomingMessage) => Cluster | undefined) {
constructor(protected router: Router, functions: LensProxyFunctions) {
super();

const proxy = this.createProxy();
const { shellApiRequest, kubeApiRequest } = functions;

this.getClusterForRequest = functions.getClusterForRequest;

this.proxyServer = spdy.createServer({
spdy: {
plain: true,
protocols: ["http/1.1", "spdy/3.1"]
}
}, (req: http.IncomingMessage, res: http.ServerResponse) => {
this.handleRequest(proxy, req, res);
this.handleRequest(req, res);
});

this.proxyServer
.on("upgrade", (req: http.IncomingMessage, socket: net.Socket, head: Buffer) => {
if (req.url.startsWith(`${apiPrefix}?`)) {
handleWsUpgrade(req, socket, head);
} else {
this.handleProxyUpgrade(proxy, req, socket, head)
.catch(error => logger.error(`[LENS-PROXY]: failed to handle proxy upgrade: ${error}`));
}
const isInternal = req.url.startsWith(`${apiPrefix}?`);
const reqHandler = isInternal ? shellApiRequest : kubeApiRequest;

(async () => reqHandler({ req, socket, head }))()
.catch(error => logger.error(logger.error(`[LENS-PROXY]: failed to handle proxy upgrade: ${error}`)));
});
}

Expand Down Expand Up @@ -104,58 +112,6 @@ export class LensProxy extends Singleton {
this.closed = true;
}

protected async handleProxyUpgrade(proxy: httpProxy, req: http.IncomingMessage, socket: net.Socket, head: Buffer) {
const cluster = this.getClusterForRequest(req);

if (cluster) {
const proxyUrl = await cluster.contextHandler.resolveAuthProxyUrl() + req.url.replace(apiKubePrefix, "");
const apiUrl = url.parse(cluster.apiUrl);
const pUrl = url.parse(proxyUrl);
const connectOpts = { port: parseInt(pUrl.port), host: pUrl.hostname };
const proxySocket = new net.Socket();

proxySocket.connect(connectOpts, () => {
proxySocket.write(`${req.method} ${pUrl.path} HTTP/1.1\r\n`);
proxySocket.write(`Host: ${apiUrl.host}\r\n`);

for (let i = 0; i < req.rawHeaders.length; i += 2) {
const key = req.rawHeaders[i];

if (key !== "Host" && key !== "Authorization") {
proxySocket.write(`${req.rawHeaders[i]}: ${req.rawHeaders[i+1]}\r\n`);
}
}
proxySocket.write("\r\n");
proxySocket.write(head);
});

proxySocket.setKeepAlive(true);
socket.setKeepAlive(true);
proxySocket.setTimeout(0);
socket.setTimeout(0);

proxySocket.on("data", function (chunk) {
socket.write(chunk);
});
proxySocket.on("end", function () {
socket.end();
});
proxySocket.on("error", function () {
socket.write(`HTTP/${req.httpVersion} 500 Connection error\r\n\r\n`);
socket.end();
});
socket.on("data", function (chunk) {
proxySocket.write(chunk);
});
socket.on("end", function () {
proxySocket.end();
});
socket.on("error", function () {
proxySocket.end();
});
}
}

protected createProxy(): httpProxy {
const proxy = httpProxy.createProxyServer();

Expand Down Expand Up @@ -195,7 +151,7 @@ export class LensProxy extends Singleton {
logger.debug(`Retrying proxy request to url: ${reqId}`);
setTimeout(() => {
this.retryCounters.set(reqId, retryCount + 1);
this.handleRequest(proxy, req, res)
this.handleRequest(req, res)
.catch(error => logger.error(`[LENS-PROXY]: failed to handle request on proxy error: ${error}`));
}, timeoutMs);
}
Expand Down Expand Up @@ -226,7 +182,7 @@ export class LensProxy extends Singleton {
return req.headers.host + req.url;
}

protected async handleRequest(proxy: httpProxy, req: http.IncomingMessage, res: http.ServerResponse) {
protected async handleRequest(req: http.IncomingMessage, res: http.ServerResponse) {
const cluster = this.getClusterForRequest(req);

if (cluster) {
Expand All @@ -237,7 +193,7 @@ export class LensProxy extends Singleton {
// this should be safe because we have already validated cluster uuid
res.setHeader("Access-Control-Allow-Origin", "*");

return proxy.web(req, res, proxyTarget);
return this.proxy.web(req, res, proxyTarget);
}
}
this.router.route(cluster, req, res);
Expand Down
7 changes: 3 additions & 4 deletions src/main/proxy/index.ts → src/main/proxy-functions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

// Don't export the contents here
// It will break the extension webpack

export default {};
export * from "./shell-api-request";
export * from "./kube-api-request";
export * from "./types";
84 changes: 84 additions & 0 deletions src/main/proxy-functions/kube-api-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright (c) 2021 OpenLens Authors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

import { chunk } from "lodash";
import net from "net";
import url from "url";
import { apiKubePrefix } from "../../common/vars";
import { ClusterManager } from "../cluster-manager";
import type { ProxyApiRequestArgs } from "./types";

const skipRawHeaders = new Set(["Host", "Authorization"]);

export async function kubeApiRequest({ req, socket, head }: ProxyApiRequestArgs) {
const cluster = ClusterManager.getInstance().getClusterForRequest(req);

if (!cluster) {
return;
}

const proxyUrl = await cluster.contextHandler.resolveAuthProxyUrl() + req.url.replace(apiKubePrefix, "");
const apiUrl = url.parse(cluster.apiUrl);
const pUrl = url.parse(proxyUrl);
const connectOpts = { port: parseInt(pUrl.port), host: pUrl.hostname };
const proxySocket = new net.Socket();

proxySocket.connect(connectOpts, () => {
proxySocket.write(`${req.method} ${pUrl.path} HTTP/1.1\r\n`);
proxySocket.write(`Host: ${apiUrl.host}\r\n`);

for (const [key, value] of chunk(req.rawHeaders, 2)) {
if (skipRawHeaders.has(key)) {
continue;
}

proxySocket.write(`${key}: ${value}\r\n`);
}

proxySocket.write("\r\n");
proxySocket.write(head);
});

proxySocket.setKeepAlive(true);
socket.setKeepAlive(true);
proxySocket.setTimeout(0);
socket.setTimeout(0);

proxySocket.on("data", function (chunk) {
socket.write(chunk);
});
proxySocket.on("end", function () {
socket.end();
});
proxySocket.on("error", function () {
socket.write(`HTTP/${req.httpVersion} 500 Connection error\r\n\r\n`);
socket.end();
});
socket.on("data", function (chunk) {
proxySocket.write(chunk);
});
socket.on("end", function () {
proxySocket.end();
});
socket.on("error", function () {
proxySocket.end();
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,18 @@
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

/**
* This file is here so that the "../shell-session" import can be injected into
* LensProxy at creation time. So that the `pty.node` extension isn't loaded
* into Lens Extension webpack bundle.
*/

import * as WebSocket from "ws";
import type http from "http";
import type net from "net";
import url from "url";
import logger from "../logger";
import * as WebSocket from "ws";
import { NodeShellSession, LocalShellSession } from "../shell-session";
import type { ProxyApiRequestArgs } from "./types";
import { ClusterManager } from "../cluster-manager";
import logger from "../logger";

function createWsListener(): WebSocket.Server {
export function shellApiRequest({ req, socket, head }: ProxyApiRequestArgs) {
const ws = new WebSocket.Server({ noServer: true });

return ws.on("connection", ((socket: WebSocket, req: http.IncomingMessage) => {
ws.on("connection", ((socket: WebSocket, req: http.IncomingMessage) => {
const cluster = ClusterManager.getInstance().getClusterForRequest(req);
const nodeParam = url.parse(req.url, true).query["node"]?.toString();
const shell = nodeParam
Expand All @@ -46,12 +40,8 @@ function createWsListener(): WebSocket.Server {
shell.open()
.catch(error => logger.error(`[SHELL-SESSION]: failed to open: ${error}`, { error }));
}));
}

export async function handleWsUpgrade(req: http.IncomingMessage, socket: net.Socket, head: Buffer) {
const wsServer = createWsListener();

wsServer.handleUpgrade(req, socket, head, (con) => {
wsServer.emit("connection", con, req);
ws.handleUpgrade(req, socket, head, (con) => {
ws.emit("connection", con, req);
});
}

0 comments on commit fd0a4d8

Please sign in to comment.