Skip to content

Commit

Permalink
feat: add WS proxy system
Browse files Browse the repository at this point in the history
  • Loading branch information
loopingz committed May 31, 2023
1 parent 5513a21 commit fdc394d
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 232 deletions.
3 changes: 2 additions & 1 deletion packages/core/package.json
Expand Up @@ -70,7 +70,8 @@
"sinon": "^15.0.0",
"source-map-support": "^0.5.19",
"ts-node": "^10.1.0",
"typescript": "~4.9.3"
"typescript": "~4.9.3",
"ws": "^8.13.0"
},
"files": [
"lib",
Expand Down
115 changes: 114 additions & 1 deletion packages/core/src/services/proxy.spec.ts
@@ -1,10 +1,123 @@
import { suite, test } from "@testdeck/mocha";
import * as assert from "assert";
import { EventEmitter } from "events";
import * as http from "http";
import sinon from "sinon";
import { WritableStreamBuffer } from "stream-buffers";
import { WebSocket, WebSocketServer } from "ws";
import { WebdaError } from "../errors";
import { WebdaTest } from "../test";
import { ProxyService } from "./proxy";
import { HttpContext } from "../utils/httpcontext";
import { createHttpHeader, ProxyService } from "./proxy";

@suite
class WSProxyTest extends WebdaTest {
@test
cov() {
assert.strictEqual(createHttpHeader("plop", { test: ["1", "2"] }), `plop\r\ntest: 1\r\ntest: 2\r\n\r\n`);
}

@test
async wsProxy() {
const proxyService = new ProxyService(this.webda, "proxy", {
url: "/proxy",
backend: "http://localhost:28888/"
});
const socket: any = new EventEmitter();
socket.destroy = sinon.stub();
socket.end = sinon.stub();
proxyService.proxyWS({ url: "/toto" }, socket, undefined);
assert.ok(!socket.destroy.calledOnce);
proxyService.proxyWS({ url: "/proxy/", method: "POST" }, socket, undefined);
assert.ok(socket.destroy.calledOnce);
socket.destroy.resetHistory();
proxyService.proxyWS({ url: "/proxy/", method: "GET", headers: {} }, socket, undefined);
assert.ok(socket.destroy.calledOnce);
socket.destroy.resetHistory();
proxyService.proxyWS({ url: "/proxy/", method: "GET", headers: { upgrade: "toto" } }, socket, undefined);
assert.ok(socket.destroy.calledOnce);
const proxyRequestSocket: any = <any>new EventEmitter();
proxyRequestSocket.end = sinon.stub();
const createRequest = sinon.stub(proxyService, "createWSRequest").callsFake(() => {
return <any>proxyRequestSocket;
});
const ctx = await this.newContext();
await proxyService.rawProxyWS(ctx, "", socket);
proxyRequestSocket.emit("error");
assert.ok(socket.end.calledOnce);

socket.write = sinon.stub();
proxyRequestSocket.emit("response", {
pipe: sinon.stub(),
headers: {},
httpVersion: "1.0",
statusCode: 200,
statusMessage: "OK"
});
assert.ok(socket.write.calledOnce);
createRequest.restore();
return new Promise<void>(async (resolve, reject) => {
try {
console.log("WS server constructor");
const wss = new WebSocketServer({ port: 28888 });

wss.on("connection", function connection(ws) {
ws.on("error", console.error);

ws.on("message", function message(data) {
console.log("received: %s", data);
ws.send(data);
});

ws.send("something");
});

this.registerService(proxyService);
await proxyService.resolve().init();
console.log("Http server constructor");
const httpServer = http
.createServer((req, res) => {
console.log("Got request", req.url);
})
.listen("28887");
this.webda.emit("Webda.Init.Http", httpServer);
// @ts-ignore
this.webda.getContextFromRequest = async (req, res) =>
this.webda.newWebContext(
new HttpContext("localhost", "GET", "/proxy/", "http", "28887", req.headers),
res,
true
);

const ws = new WebSocket("ws://localhost:28887/proxy/");

ws.on("error", console.error);

ws.on("open", function open() {
console.log("connected");
ws.send(Date.now());
});

ws.on("close", function close() {
console.log("disconnected");
});

ws.on("message", function message(data: any) {
if (isNaN(data)) {
return;
}
console.log(`Round-trip time: ${Date.now() - data} ms`);
ws.close();
wss.close();
httpServer.close();
resolve();
});
} catch (err) {
console.error(err);
}
});
}
}

@suite
class ProxyTest extends WebdaTest {
Expand Down
177 changes: 159 additions & 18 deletions packages/core/src/services/proxy.ts
Expand Up @@ -3,8 +3,32 @@ import * as https from "https";
import { Counter, Gauge, Histogram } from "../core";
import { WebdaError } from "../errors";
import { WebContext } from "../utils/context";
import { HttpContext } from "../utils/httpcontext";
import { Route, Service, ServiceParameters } from "./service";

export function createHttpHeader(line, headers) {
return (
Object.keys(headers)
.reduce(
function (head, key) {
var value = headers[key];

if (!Array.isArray(value)) {
head.push(key + ": " + value);
return head;
}

for (var i = 0; i < value.length; i++) {
head.push(key + ": " + value[i]);
}
return head;
},
[line]
)
.join("\r\n") + "\r\n\r\n"
);
}

/**
* Proxy to a backend service
*/
Expand Down Expand Up @@ -84,6 +108,16 @@ export class ProxyService<T extends ProxyParameters = ProxyParameters> extends S
return new ProxyParameters(params);
}

resolve() {
// Register the proxy on the 'upgrade' event of http socket
this.getWebda().on("Webda.Init.Http", (evt: any) => {
evt.on("upgrade", (req, socket, head) => {
this.proxyWS(req, socket, head);
});
});
return super.resolve();
}

/**
* Create the request to the backend
* @param url
Expand All @@ -92,7 +126,13 @@ export class ProxyService<T extends ProxyParameters = ProxyParameters> extends S
* @param callback
* @returns
*/
createRequest(url: string, method: string, headers: any, callback: (response: http.IncomingMessage) => void) {
createRequest(
url: string,
method: string,
headers: any,
callback: (response: http.IncomingMessage) => void,
options: { timeout?: number } = { timeout: 30000 }
) {
let mod: any = http;
if (url.startsWith("https://")) {
mod = https;
Expand All @@ -101,7 +141,8 @@ export class ProxyService<T extends ProxyParameters = ProxyParameters> extends S
url,
{
method,
headers
headers,
...options
},
callback
);
Expand Down Expand Up @@ -141,26 +182,22 @@ export class ProxyService<T extends ProxyParameters = ProxyParameters> extends S
* @param context
* @returns
*/
getRequestHeaders(context: WebContext) {
const headers = { ...context.getHttpContext().getHeaders() };
getRequestHeaders(context: HttpContext) {
const headers = { ...context.getHeaders() };
if (this.parameters.proxyHeaders) {
let xff = context.getHttpContext().getHeader("x-forwarded-for");
let xff = context.getHeader("x-forwarded-for");
if (!xff) {
xff += `, ${context.getHttpContext().getClientIp()}`;
xff += `, ${context.getClientIp()}`;
} else {
xff = context.getHttpContext().getClientIp();
xff = context.getClientIp();
}
const protocol = context.getHttpContext().getProtocol();
headers["X-Rewrite-URL"] = context.getHttpContext().getRelativeUri();
headers["X-Forwarded-Host"] = context
.getHttpContext()
.getHeader("x-forwarded-host", `${context.getHttpContext().getHost()}`);
headers["X-Forwarded-Proto"] = context
.getHttpContext()
.getHeader("x-forwarded-proto", protocol.substring(0, protocol.length - 1));
const protocol = context.getProtocol();
headers["X-Rewrite-URL"] = context.getRelativeUri();
headers["X-Forwarded-Host"] = context.getHeader("x-forwarded-host", `${context.getHost()}`);
headers["X-Forwarded-Proto"] = context.getHeader("x-forwarded-proto", protocol.substring(0, protocol.length - 1));
headers["X-Forwarded-For"] = xff;
}
return context.getHttpContext().getHeaders();
return headers;
}

/**
Expand All @@ -174,6 +211,110 @@ export class ProxyService<T extends ProxyParameters = ProxyParameters> extends S
return this.rawProxy(ctx, host, subUrl);
}

/**
* Proxy request to the backend errored
* @param e
*/
onError(e) {
this.log("ERROR", "Proxying error", e);
}

async proxyWS(req, socket, head) {
if (!req.url.startsWith(this.parameters.url)) {
return;
}
if (req.method !== "GET" || !req.headers.upgrade) {
socket.destroy();
}

if (req.headers.upgrade.toLowerCase() !== "websocket") {
socket.destroy();
}
// Proxy WS Only works with a WebdaServer from @webda/shell for now
const webdaContext = await (<any>this.getWebda()).getContextFromRequest(req);
await webdaContext.init();
return this.rawProxyWS(
webdaContext,
`${this.parameters.backend}${req.url.substring(this.parameters.url.length)}`,
socket
);
}

/**
* Create the request to the WS backend
* @param url
* @param context
* @returns
*/
createWSRequest(url: string, context: WebContext): http.ClientRequest {
const Url = new URL(url);
return (Url.protocol === "http:" ? http : https).request({
path: Url.pathname + Url.search,
method: "GET",
headers: { ...this.getRequestHeaders(context.getHttpContext()), host: Url.host },
host: Url.hostname,
port: Url.port
});
}

/**
*
* @param context
* @param url
* @param socket
*/
async rawProxyWS(context: WebContext, url: string, socket: any) {
this.log("DEBUG", "Proxying upgrade request", `${url}`);
const proxyReq = this.createWSRequest(url, context);
proxyReq.on("response", res => {
// @ts-ignore
if (!res.upgrade) {
socket.write(
createHttpHeader("HTTP/" + res.httpVersion + " " + res.statusCode + " " + res.statusMessage, res.headers)
);
res.pipe(socket);
}
});

const onError = err => {
this.log("ERROR", err);
socket.end();
};

proxyReq.on("error", onError);

proxyReq.on("upgrade", (proxyRes, proxySocket, proxyHead) => {
proxySocket.on("error", onError);

// Allow us to listen when the websocket has completed
proxySocket.on("end", function () {
socket.end();
});

// The pipe below will end proxySocket if socket closes cleanly, but not
// if it errors (eg, vanishes from the net and starts returning
// EHOSTUNREACH). We need to do that explicitly.
socket.on("error", function () {
/* c8 ignore next 2 */
proxySocket.end();
});

proxySocket.setTimeout(0);
proxySocket.setNoDelay(true);
proxySocket.setKeepAlive(true, 0);

if (proxyHead && proxyHead.length) proxySocket.unshift(proxyHead);

//
// Remark: Handle writing the headers to the socket when switching protocols
// Also handles when a header is an array
//
socket.write(createHttpHeader("HTTP/1.1 101 Switching Protocols", proxyRes.headers));
proxySocket.pipe(socket).pipe(proxySocket);
});
proxyReq.end();
}

/**
* Proxy an url to the response directly
* @param ctx
Expand All @@ -194,7 +335,7 @@ export class ProxyService<T extends ProxyParameters = ProxyParameters> extends S
const onError = e => {
this.metrics.http_request_in_flight.dec();
this.metrics.http_errors.inc({ ...labels, statuscode: -1 });
this.log("ERROR", "Proxying error", e);
this.onError(e);
resolve();
};
let Host;
Expand All @@ -208,7 +349,7 @@ export class ProxyService<T extends ProxyParameters = ProxyParameters> extends S
`${host}${url}`,
ctx.getHttpContext().getMethod(),
{
...this.getRequestHeaders(ctx),
...this.getRequestHeaders(ctx.getHttpContext()),
Host,
...headers
},
Expand Down

0 comments on commit fdc394d

Please sign in to comment.