From fdc394de666d74e9130d29fb6d4ddd67b650430f Mon Sep 17 00:00:00 2001 From: Remi Cattiau Date: Wed, 31 May 2023 09:55:57 -0700 Subject: [PATCH] feat: add WS proxy system --- packages/core/package.json | 3 +- packages/core/src/services/proxy.spec.ts | 115 ++++++++++++- packages/core/src/services/proxy.ts | 177 +++++++++++++++++--- packages/core/webda.module.json | 76 ++++++++- packages/gcp/webda.module.json | 198 ----------------------- packages/graphql/webda.module.json | 12 +- packages/mongodb/webda.module.json | 6 +- sample-app/package.json | 2 +- 8 files changed, 357 insertions(+), 232 deletions(-) diff --git a/packages/core/package.json b/packages/core/package.json index f6cdbd238..66c6dfbbd 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -70,7 +70,8 @@ "sinon": "^15.0.0", "source-map-support": "^0.5.19", "ts-node": "^10.1.0", - "typescript": "~4.9.3" + "typescript": "~4.9.3", + "ws": "^8.13.0" }, "files": [ "lib", diff --git a/packages/core/src/services/proxy.spec.ts b/packages/core/src/services/proxy.spec.ts index 29b104a6f..9216f2516 100644 --- a/packages/core/src/services/proxy.spec.ts +++ b/packages/core/src/services/proxy.spec.ts @@ -1,10 +1,123 @@ import { suite, test } from "@testdeck/mocha"; import * as assert from "assert"; +import { EventEmitter } from "events"; import * as http from "http"; +import sinon from "sinon"; import { WritableStreamBuffer } from "stream-buffers"; +import { WebSocket, WebSocketServer } from "ws"; import { WebdaError } from "../errors"; import { WebdaTest } from "../test"; -import { ProxyService } from "./proxy"; +import { HttpContext } from "../utils/httpcontext"; +import { createHttpHeader, ProxyService } from "./proxy"; + +@suite +class WSProxyTest extends WebdaTest { + @test + cov() { + assert.strictEqual(createHttpHeader("plop", { test: ["1", "2"] }), `plop\r\ntest: 1\r\ntest: 2\r\n\r\n`); + } + + @test + async wsProxy() { + const proxyService = new ProxyService(this.webda, "proxy", { + url: "/proxy", + backend: "http://localhost:28888/" + }); + const socket: any = new EventEmitter(); + socket.destroy = sinon.stub(); + socket.end = sinon.stub(); + proxyService.proxyWS({ url: "/toto" }, socket, undefined); + assert.ok(!socket.destroy.calledOnce); + proxyService.proxyWS({ url: "/proxy/", method: "POST" }, socket, undefined); + assert.ok(socket.destroy.calledOnce); + socket.destroy.resetHistory(); + proxyService.proxyWS({ url: "/proxy/", method: "GET", headers: {} }, socket, undefined); + assert.ok(socket.destroy.calledOnce); + socket.destroy.resetHistory(); + proxyService.proxyWS({ url: "/proxy/", method: "GET", headers: { upgrade: "toto" } }, socket, undefined); + assert.ok(socket.destroy.calledOnce); + const proxyRequestSocket: any = new EventEmitter(); + proxyRequestSocket.end = sinon.stub(); + const createRequest = sinon.stub(proxyService, "createWSRequest").callsFake(() => { + return proxyRequestSocket; + }); + const ctx = await this.newContext(); + await proxyService.rawProxyWS(ctx, "", socket); + proxyRequestSocket.emit("error"); + assert.ok(socket.end.calledOnce); + + socket.write = sinon.stub(); + proxyRequestSocket.emit("response", { + pipe: sinon.stub(), + headers: {}, + httpVersion: "1.0", + statusCode: 200, + statusMessage: "OK" + }); + assert.ok(socket.write.calledOnce); + createRequest.restore(); + return new Promise(async (resolve, reject) => { + try { + console.log("WS server constructor"); + const wss = new WebSocketServer({ port: 28888 }); + + wss.on("connection", function connection(ws) { + ws.on("error", console.error); + + ws.on("message", function message(data) { + console.log("received: %s", data); + ws.send(data); + }); + + ws.send("something"); + }); + + this.registerService(proxyService); + await proxyService.resolve().init(); + console.log("Http server constructor"); + const httpServer = http + .createServer((req, res) => { + console.log("Got request", req.url); + }) + .listen("28887"); + this.webda.emit("Webda.Init.Http", httpServer); + // @ts-ignore + this.webda.getContextFromRequest = async (req, res) => + this.webda.newWebContext( + new HttpContext("localhost", "GET", "/proxy/", "http", "28887", req.headers), + res, + true + ); + + const ws = new WebSocket("ws://localhost:28887/proxy/"); + + ws.on("error", console.error); + + ws.on("open", function open() { + console.log("connected"); + ws.send(Date.now()); + }); + + ws.on("close", function close() { + console.log("disconnected"); + }); + + ws.on("message", function message(data: any) { + if (isNaN(data)) { + return; + } + console.log(`Round-trip time: ${Date.now() - data} ms`); + ws.close(); + wss.close(); + httpServer.close(); + resolve(); + }); + } catch (err) { + console.error(err); + } + }); + } +} @suite class ProxyTest extends WebdaTest { diff --git a/packages/core/src/services/proxy.ts b/packages/core/src/services/proxy.ts index c7e516716..120d4706c 100644 --- a/packages/core/src/services/proxy.ts +++ b/packages/core/src/services/proxy.ts @@ -3,8 +3,32 @@ import * as https from "https"; import { Counter, Gauge, Histogram } from "../core"; import { WebdaError } from "../errors"; import { WebContext } from "../utils/context"; +import { HttpContext } from "../utils/httpcontext"; import { Route, Service, ServiceParameters } from "./service"; +export function createHttpHeader(line, headers) { + return ( + Object.keys(headers) + .reduce( + function (head, key) { + var value = headers[key]; + + if (!Array.isArray(value)) { + head.push(key + ": " + value); + return head; + } + + for (var i = 0; i < value.length; i++) { + head.push(key + ": " + value[i]); + } + return head; + }, + [line] + ) + .join("\r\n") + "\r\n\r\n" + ); +} + /** * Proxy to a backend service */ @@ -84,6 +108,16 @@ export class ProxyService extends S return new ProxyParameters(params); } + resolve() { + // Register the proxy on the 'upgrade' event of http socket + this.getWebda().on("Webda.Init.Http", (evt: any) => { + evt.on("upgrade", (req, socket, head) => { + this.proxyWS(req, socket, head); + }); + }); + return super.resolve(); + } + /** * Create the request to the backend * @param url @@ -92,7 +126,13 @@ export class ProxyService extends S * @param callback * @returns */ - createRequest(url: string, method: string, headers: any, callback: (response: http.IncomingMessage) => void) { + createRequest( + url: string, + method: string, + headers: any, + callback: (response: http.IncomingMessage) => void, + options: { timeout?: number } = { timeout: 30000 } + ) { let mod: any = http; if (url.startsWith("https://")) { mod = https; @@ -101,7 +141,8 @@ export class ProxyService extends S url, { method, - headers + headers, + ...options }, callback ); @@ -141,26 +182,22 @@ export class ProxyService extends S * @param context * @returns */ - getRequestHeaders(context: WebContext) { - const headers = { ...context.getHttpContext().getHeaders() }; + getRequestHeaders(context: HttpContext) { + const headers = { ...context.getHeaders() }; if (this.parameters.proxyHeaders) { - let xff = context.getHttpContext().getHeader("x-forwarded-for"); + let xff = context.getHeader("x-forwarded-for"); if (!xff) { - xff += `, ${context.getHttpContext().getClientIp()}`; + xff += `, ${context.getClientIp()}`; } else { - xff = context.getHttpContext().getClientIp(); + xff = context.getClientIp(); } - const protocol = context.getHttpContext().getProtocol(); - headers["X-Rewrite-URL"] = context.getHttpContext().getRelativeUri(); - headers["X-Forwarded-Host"] = context - .getHttpContext() - .getHeader("x-forwarded-host", `${context.getHttpContext().getHost()}`); - headers["X-Forwarded-Proto"] = context - .getHttpContext() - .getHeader("x-forwarded-proto", protocol.substring(0, protocol.length - 1)); + const protocol = context.getProtocol(); + headers["X-Rewrite-URL"] = context.getRelativeUri(); + headers["X-Forwarded-Host"] = context.getHeader("x-forwarded-host", `${context.getHost()}`); + headers["X-Forwarded-Proto"] = context.getHeader("x-forwarded-proto", protocol.substring(0, protocol.length - 1)); headers["X-Forwarded-For"] = xff; } - return context.getHttpContext().getHeaders(); + return headers; } /** @@ -174,6 +211,110 @@ export class ProxyService extends S return this.rawProxy(ctx, host, subUrl); } + /** + * Proxy request to the backend errored + * @param e + */ + onError(e) { + this.log("ERROR", "Proxying error", e); + } + + async proxyWS(req, socket, head) { + if (!req.url.startsWith(this.parameters.url)) { + return; + } + if (req.method !== "GET" || !req.headers.upgrade) { + socket.destroy(); + } + + if (req.headers.upgrade.toLowerCase() !== "websocket") { + socket.destroy(); + } + // Proxy WS Only works with a WebdaServer from @webda/shell for now + const webdaContext = await (this.getWebda()).getContextFromRequest(req); + await webdaContext.init(); + return this.rawProxyWS( + webdaContext, + `${this.parameters.backend}${req.url.substring(this.parameters.url.length)}`, + socket + ); + } + + /** + * Create the request to the WS backend + * @param url + * @param context + * @returns + */ + createWSRequest(url: string, context: WebContext): http.ClientRequest { + const Url = new URL(url); + return (Url.protocol === "http:" ? http : https).request({ + path: Url.pathname + Url.search, + method: "GET", + headers: { ...this.getRequestHeaders(context.getHttpContext()), host: Url.host }, + host: Url.hostname, + port: Url.port + }); + } + + /** + * + * @param context + * @param url + * @param socket + */ + async rawProxyWS(context: WebContext, url: string, socket: any) { + this.log("DEBUG", "Proxying upgrade request", `${url}`); + const proxyReq = this.createWSRequest(url, context); + proxyReq.on("response", res => { + // @ts-ignore + if (!res.upgrade) { + socket.write( + createHttpHeader("HTTP/" + res.httpVersion + " " + res.statusCode + " " + res.statusMessage, res.headers) + ); + res.pipe(socket); + } + }); + + const onError = err => { + this.log("ERROR", err); + socket.end(); + }; + + proxyReq.on("error", onError); + + proxyReq.on("upgrade", (proxyRes, proxySocket, proxyHead) => { + proxySocket.on("error", onError); + + // Allow us to listen when the websocket has completed + proxySocket.on("end", function () { + socket.end(); + }); + + // The pipe below will end proxySocket if socket closes cleanly, but not + // if it errors (eg, vanishes from the net and starts returning + // EHOSTUNREACH). We need to do that explicitly. + socket.on("error", function () { + /* c8 ignore next 2 */ + proxySocket.end(); + }); + + proxySocket.setTimeout(0); + proxySocket.setNoDelay(true); + proxySocket.setKeepAlive(true, 0); + + if (proxyHead && proxyHead.length) proxySocket.unshift(proxyHead); + + // + // Remark: Handle writing the headers to the socket when switching protocols + // Also handles when a header is an array + // + socket.write(createHttpHeader("HTTP/1.1 101 Switching Protocols", proxyRes.headers)); + proxySocket.pipe(socket).pipe(proxySocket); + }); + proxyReq.end(); + } + /** * Proxy an url to the response directly * @param ctx @@ -194,7 +335,7 @@ export class ProxyService extends S const onError = e => { this.metrics.http_request_in_flight.dec(); this.metrics.http_errors.inc({ ...labels, statuscode: -1 }); - this.log("ERROR", "Proxying error", e); + this.onError(e); resolve(); }; let Host; @@ -208,7 +349,7 @@ export class ProxyService extends S `${host}${url}`, ctx.getHttpContext().getMethod(), { - ...this.getRequestHeaders(ctx), + ...this.getRequestHeaders(ctx.getHttpContext()), Host, ...headers }, diff --git a/packages/core/webda.module.json b/packages/core/webda.module.json index ff3188a26..700af5280 100644 --- a/packages/core/webda.module.json +++ b/packages/core/webda.module.json @@ -37,7 +37,24 @@ "models": { "graph": { "Webda/AclModel": {}, - "Webda/CoreModel": {}, + "Webda/Comment": { + "parent": { + "attribute": "target", + "model": "Webda/CoreModel" + }, + "links": [ + { + "attribute": "author", + "model": "Webda/User", + "type": "LINK" + } + ] + }, + "Webda/CoreModel": { + "children": [ + "Webda/Comment" + ] + }, "Webda/Ident": { "links": [ { @@ -71,6 +88,7 @@ }, "tree": { "Webda/AclModel": {}, + "Webda/Comment": {}, "Webda/RoleModel": {}, "Webda/User": { "Webda/SimpleUser": {} @@ -84,6 +102,7 @@ "plurals": {}, "list": { "Webda/AclModel": "lib/models/aclmodel:default", + "Webda/Comment": "lib/models/comment:Comment", "Webda/CoreModel": "lib/models/coremodel:CoreModel", "Webda/Ident": "lib/models/ident:Ident", "Webda/OwnerModel": "lib/models/ownermodel:OwnerModel", @@ -107,6 +126,21 @@ "_lastUpdate": "Date", "___deleted": "boolean" }, + "Webda/Comment": { + "target": "ModelParent", + "author": "ModelLink", + "title": "string", + "description": "string", + "___class": "CoreModelDefinition", + "___type": "string", + "___types": "string[]", + "___ctx": "OperationContext", + "___store": "Store", + "___dirty": "Set", + "_creationDate": "Date", + "_lastUpdate": "Date", + "___deleted": "boolean" + }, "Webda/CoreModel": { "___class": "CoreModelDefinition", "___type": "string", @@ -652,6 +686,46 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "BinaryService" }, + "Webda/Comment": { + "type": "object", + "properties": { + "_creationDate": { + "type": "string", + "format": "date-time", + "description": "Creation date", + "readOnly": true + }, + "_lastUpdate": { + "type": "string", + "format": "date-time", + "description": "Last update date", + "readOnly": true + }, + "target": { + "type": "string" + }, + "author": { + "type": "string" + }, + "title": { + "type": "string", + "description": "Title" + }, + "description": { + "type": "string", + "description": "Comment content" + } + }, + "required": [ + "target", + "author", + "title", + "description" + ], + "description": "Generic comment class", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Comment" + }, "Webda/ConfigurationService": { "type": "object", "properties": { diff --git a/packages/gcp/webda.module.json b/packages/gcp/webda.module.json index b22b84439..5b3150529 100644 --- a/packages/gcp/webda.module.json +++ b/packages/gcp/webda.module.json @@ -486,194 +486,6 @@ ], "description": "Subscription bigqueryConfig" }, - "cloudStorageConfig": { - "anyOf": [ - { - "type": "object", - "properties": { - "bucket": { - "type": [ - "string", - "null" - ], - "description": "CloudStorageConfig bucket" - }, - "filenamePrefix": { - "type": [ - "string", - "null" - ], - "description": "CloudStorageConfig filenamePrefix" - }, - "filenameSuffix": { - "type": [ - "string", - "null" - ], - "description": "CloudStorageConfig filenameSuffix" - }, - "textConfig": { - "anyOf": [ - { - "type": "object", - "description": "Properties of a TextConfig." - }, - { - "type": "null" - } - ], - "description": "CloudStorageConfig textConfig" - }, - "avroConfig": { - "anyOf": [ - { - "type": "object", - "properties": { - "writeMetadata": { - "type": [ - "boolean", - "null" - ], - "description": "AvroConfig writeMetadata" - } - }, - "description": "Properties of an AvroConfig." - }, - { - "type": "null" - } - ], - "description": "CloudStorageConfig avroConfig" - }, - "maxDuration": { - "anyOf": [ - { - "type": "object", - "properties": { - "seconds": { - "anyOf": [ - { - "type": "number" - }, - { - "type": "object", - "properties": { - "high": { - "type": "number", - "description": "The high 32 bits as a signed value." - }, - "low": { - "type": "number", - "description": "The low 32 bits as a signed value." - }, - "unsigned": { - "type": "boolean", - "description": "Whether unsigned or not." - } - }, - "required": [ - "high", - "low", - "unsigned" - ] - }, - { - "type": "string" - }, - { - "type": "null" - } - ], - "description": "Duration seconds" - }, - "nanos": { - "type": [ - "number", - "null" - ], - "description": "Duration nanos" - } - }, - "description": "Properties of a Duration." - }, - { - "type": "null" - } - ], - "description": "CloudStorageConfig maxDuration" - }, - "maxBytes": { - "anyOf": [ - { - "type": "number" - }, - { - "type": "object", - "properties": { - "high": { - "type": "number", - "description": "The high 32 bits as a signed value." - }, - "low": { - "type": "number", - "description": "The low 32 bits as a signed value." - }, - "unsigned": { - "type": "boolean", - "description": "Whether unsigned or not." - } - }, - "required": [ - "high", - "low", - "unsigned" - ] - }, - { - "type": "string" - }, - { - "type": "null" - } - ], - "description": "CloudStorageConfig maxBytes" - }, - "state": { - "anyOf": [ - { - "$ref": "#/definitions/google.pubsub.v1.CloudStorageConfig.State" - }, - { - "type": "string", - "const": "STATE_UNSPECIFIED" - }, - { - "type": "string", - "const": "ACTIVE" - }, - { - "type": "string", - "const": "PERMISSION_DENIED" - }, - { - "type": "string", - "const": "NOT_FOUND" - }, - { - "type": "null" - } - ], - "description": "CloudStorageConfig state" - } - }, - "description": "Properties of a CloudStorageConfig." - }, - { - "type": "null" - } - ], - "description": "Subscription cloudStorageConfig" - }, "ackDeadlineSeconds": { "type": [ "number", @@ -1169,16 +981,6 @@ ], "description": "State enum." }, - "google.pubsub.v1.CloudStorageConfig.State": { - "type": "number", - "enum": [ - 0, - 1, - 2, - 3 - ], - "description": "State enum." - }, "google.pubsub.v1.Subscription.State": { "type": "number", "enum": [ diff --git a/packages/graphql/webda.module.json b/packages/graphql/webda.module.json index 18a429de0..173de92df 100644 --- a/packages/graphql/webda.module.json +++ b/packages/graphql/webda.module.json @@ -1,16 +1,10 @@ { - "beans": {}, - "deployers": {}, "moddas": { "Webda/GraphQLService": "lib/graphql:GraphQLService" }, - "models": { - "graph": {}, - "tree": {}, - "plurals": {}, - "list": {}, - "reflections": {} - }, + "beans": {}, + "models": {}, + "deployers": {}, "schemas": { "Webda/GraphQLService": { "type": "object", diff --git a/packages/mongodb/webda.module.json b/packages/mongodb/webda.module.json index b4d62bfe6..3a3a4ee8a 100644 --- a/packages/mongodb/webda.module.json +++ b/packages/mongodb/webda.module.json @@ -155,7 +155,7 @@ }, "useBigInt64": { "type": "boolean", - "description": "when deserializing a Long return as a BigInt." + "description": "when deserializing a Long will return as a BigInt." }, "promoteLongs": { "type": "boolean", @@ -187,11 +187,11 @@ }, "serializeFunctions": { "type": "boolean", - "description": "serialize the javascript functions" + "description": "serialize the javascript functions **(default:false)**." }, "ignoreUndefined": { "type": "boolean", - "description": "serialize will not emit undefined fields note that the driver sets this to `false`" + "description": "serialize will not emit undefined fields **(default:true)**" }, "enableUtf8Validation": { "type": "boolean", diff --git a/sample-app/package.json b/sample-app/package.json index dda45b1c0..255c704d9 100644 --- a/sample-app/package.json +++ b/sample-app/package.json @@ -49,4 +49,4 @@ "@webda/shell" ] } -} +} \ No newline at end of file