Skip to content

Commit

Permalink
Merge branch 'websocket'
Browse files Browse the repository at this point in the history
  • Loading branch information
tedeh committed May 23, 2021
2 parents 1955f3f + 33ee22c commit 85c5b48
Show file tree
Hide file tree
Showing 22 changed files with 930 additions and 560 deletions.
13 changes: 13 additions & 0 deletions examples/websocket/client.js
@@ -0,0 +1,13 @@
const WebSocket = require('isomorphic-ws');
const jayson = require('../../');

const client = jayson.client.websocket({
url: 'ws://localhost:12345',
});

client.ws.on('open', function () {
client.request('add', [1,2,3,4], function (err, result) {
console.log(err, result);
client.ws.close();
});
});
13 changes: 13 additions & 0 deletions examples/websocket/server.js
@@ -0,0 +1,13 @@
const WebSocket = require('ws');
const jayson = require('../../');

const server = new jayson.Server({
add: function (args, done) {
const sum = args.reduce((sum, val) => sum + val, 0);
done(null, sum);
},
});

const wss = server.websocket({
port: 12345,
});
21 changes: 21 additions & 0 deletions index.d.ts
Expand Up @@ -4,6 +4,7 @@ import https = require('https');
import http = require('http');
import events = require('events');
import Stream = require('stream');
import WebSocket = require('isomorphic-ws');
import * as connect from 'connect';

export interface UtilsJSONParseOptions {
Expand Down Expand Up @@ -221,6 +222,7 @@ export declare class Server extends events.EventEmitter {
https(options?: HttpsServerOptions): HttpsServer;
tcp(options?: TcpServerOptions): TcpServer;
tls(options?: TlsServerOptions): TlsServer;
websocket(options?: WebsocketServerOptions): WebsocketServer;
middleware(options?: MiddlewareServerOptions): connect.HandleFunction;

method(name: string, definition: MethodLike): void;
Expand Down Expand Up @@ -265,6 +267,14 @@ declare class TlsServer extends tls.Server {
constructor(server: Server, options?: TlsServerOptions);
}

export interface WebsocketServerOptions extends ServerOptions, WebSocket.ServerOptions {
wss?: WebSocket.Server;
}

declare class WebsocketServer {
constructor(server: Server, options?: WebsocketServerOptions);
}

type JSONParseReviver = (key: string, value: any) => any;
type JSONStringifyReplacer = (key: string, value: any) => any;

Expand Down Expand Up @@ -306,6 +316,16 @@ declare class HttpsClient extends Client {
constructor(options?: HttpsClientOptions);
}

export interface WebsocketClientOptions extends ClientOptions {
url?: string;
ws?: WebSocket;
timeout?: number;
}

declare class WebsocketClient extends Client {
constructor(options?: WebsocketClientOptions);
}

export declare class Client extends events.EventEmitter {
constructor(server: Server, options?: ClientOptions);
constructor(options: ClientOptions);
Expand All @@ -314,6 +334,7 @@ export declare class Client extends events.EventEmitter {
static https(options?: HttpsClientOptions): HttpsClient;
static tcp(options?: TcpClientOptions): TcpClient;
static tls(options?: TlsClientOptions): TlsClient;
static websocket(options?: WebsocketClientOptions): WebsocketClient;

request(method: string, params: RequestParamsLike, id?: string | null, callback?: JSONRPCCallbackType): JSONRPCRequest;
request(method: string, params: RequestParamsLike, callback?: JSONRPCCallbackType): JSONRPCRequest;
Expand Down
9 changes: 8 additions & 1 deletion lib/client/index.js
Expand Up @@ -80,6 +80,13 @@ Client.tls = require('./tls');
*/
Client.browser = require('./browser');

/**
* Websocket client constructor
* @type ClientWebsocket
* @static
*/
Client.websocket = require('./websocket');

/**
* Creates a request and dispatches it if given a callback.
* @param {String|Array} method A batch request if passed an Array, or a method name if passed a String
Expand Down Expand Up @@ -173,7 +180,7 @@ Client.prototype._request = function(request, callback) {
};

/**
* Parses a response from a server
* Parses a response from a server, taking care of sugaring
* @param {Object} err Error to pass on that is unrelated to the actual response
* @param {Object} response JSON-RPC 1.0 or 2.0 response
* @param {Function} callback Callback that will receive different arguments depending on the amount of parameters
Expand Down
133 changes: 133 additions & 0 deletions lib/client/websocket.js
@@ -0,0 +1,133 @@
'use strict';

const WebSocket = require('isomorphic-ws');
const utils = require('../utils');
const delay = require('delay');
const Client = require('../client');

/**
* Constructor for a Jayson Websocket Client
* @class ClientWebsocket
* @constructor
* @extends Client
* @param {Object} [options]
* @param {String} [options.url] When options.ws not provided this will be the URL to open the websocket to
* @param {ws.WebSocket} [options.ws] When not provided will create a WebSocket instance with options.url
* @param {Number} [options.timeout] Will wait this long in ms until callbacking with an error
* @return {ClientWebsocket}
*/
const ClientWebsocket = function(options) {
if(!(this instanceof ClientWebsocket)) {
return new ClientWebsocket(options);
}
Client.call(this, options);

const defaults = utils.merge(this.options, {});
this.options = utils.merge(defaults, options || {});

const self = this;

this.ws = this.options.ws || new WebSocket(this.options.url);
this.outstandingRequests = [];
this.handlers = {};

this.handlers.message = function (str) {
utils.JSON.parse(str, self.options, function(err, response) {
if (err) {
// invalid JSON is ignored
return;
}

if (Array.isArray(response)) {

// we have a batch reply
const matchingRequest = self.outstandingRequests.find(function ([request]) {
if (Array.isArray(request)) {
// a batch is considered matching if at least one response id matches one request id
return response.some(function (resp) {
if (utils.Response.isValidResponse(resp)) {
return request.some(function (req) {
return req.id === resp.id;
});
}
return false;
});
}
});

if (matchingRequest) {
const [ request, resolve ] = matchingRequest;
return resolve(response);
}

} else if (utils.Response.isValidResponse(response)) {

const matchingRequest = self.outstandingRequests.find(function ([request]) {
return !Array.isArray(request) && request.id === response.id;
});

if (matchingRequest) {
const [ request, resolve ] = matchingRequest;
return resolve(response);
}
}

});
};

this.ws.on('message', this.handlers.message);
};
require('util').inherits(ClientWebsocket, Client);

module.exports = ClientWebsocket;

/**
* @desc Removes all event listeners from Websocket instance which cancels all outstanding requests too
*/
ClientWebsocket.prototype.unlisten = function () {
for (const eventName in this.handlers) {
this.ws.off(eventName, this.handlers[eventName]);
}
};

ClientWebsocket.prototype._request = function(request, callback) {
const self = this;
const { ws, options } = this;

// we have to remove the object representing this request when the promise resolves/rejects
let outstandingItem;

Promise.race([
options.timeout > 0 ? delay(options.timeout).then(function () {
throw new Error('timeout reached after ' + options.timeout + ' ms');
}) : null,
new Promise(function (resolve, reject) {
utils.JSON.stringify(request, options, function(err, body) {
if (err) {
return resolve(err);
}

ws.send(body);

if (utils.Request.isNotification(request)) {
// notifications callback immediately since they don't have a reply
return resolve();
}

outstandingItem = [request, resolve, reject];
self.outstandingRequests.push(outstandingItem);
});
}),
].filter(v => v !== null)).then(function (result) {
removeOutstandingRequest();
callback(null, result);
}).catch(function (err) {
removeOutstandingRequest();
callback(err);
});

function removeOutstandingRequest () {
if (!outstandingItem) return;
self.outstandingRequests = self.outstandingRequests.filter(v => v !== outstandingItem);
}
};
10 changes: 5 additions & 5 deletions lib/server/http.js
Expand Up @@ -11,16 +11,16 @@ const utils = require('../utils');
* @param {Object} [options] Options for this instance
* @return {ServerHttp}
*/
const HttpServer = function(server, options) {
if(!(this instanceof HttpServer)) {
return new HttpServer(server, options);
const ServerHttp = function(server, options) {
if(!(this instanceof ServerHttp)) {
return new ServerHttp(server, options);
}

this.options = utils.merge(server.options, options || {});

const listener = utils.getHttpListener(this, server);
http.Server.call(this, listener);
};
require('util').inherits(HttpServer, http.Server);
require('util').inherits(ServerHttp, http.Server);

module.exports = HttpServer;
module.exports = ServerHttp;
10 changes: 5 additions & 5 deletions lib/server/https.js
Expand Up @@ -11,16 +11,16 @@ const utils = require('../utils');
* @param {Object} [options] Options for this instance
* @return {ServerHttps}
*/
const HttpsServer = function(server, options) {
if(!(this instanceof HttpsServer)) {
return new HttpsServer(server, options);
const ServerHttps = function(server, options) {
if(!(this instanceof ServerHttps)) {
return new ServerHttps(server, options);
}

this.options = utils.merge(server.options, options || {});

const listener = utils.getHttpListener(this, server);
https.Server.call(this, this.options, listener);
};
require('util').inherits(HttpsServer, https.Server);
require('util').inherits(ServerHttps, https.Server);

module.exports = HttpsServer;
module.exports = ServerHttps;
1 change: 1 addition & 0 deletions lib/server/index.js
Expand Up @@ -83,6 +83,7 @@ Server.interfaces = {
https: require('./https'),
tcp: require('./tcp'),
tls: require('./tls'),
websocket: require('./websocket'),
middleware: require('./middleware')
};

Expand Down
13 changes: 6 additions & 7 deletions lib/server/tcp.js
Expand Up @@ -11,18 +11,18 @@ const utils = require('../utils');
* @param {Object} [options] Options for this instance
* @return {ServerTcp}
*/
const TcpServer = function(server, options) {
if(!(this instanceof TcpServer)) {
return new TcpServer(server, options);
const ServerTcp = function(server, options) {
if(!(this instanceof ServerTcp)) {
return new ServerTcp(server, options);
}

this.options = utils.merge(server.options, options || {});

net.Server.call(this, getTcpListener(this, server));
};
require('util').inherits(TcpServer, net.Server);
require('util').inherits(ServerTcp, net.Server);

module.exports = TcpServer;
module.exports = ServerTcp;

/**
* Returns a TCP connection listener bound to the server in the argument.
Expand Down Expand Up @@ -58,8 +58,7 @@ function getTcpListener(self, server) {

// ends the request with an error code
function respondError(err) {
const Server = require('../server');
const error = server.error(Server.errors.PARSE_ERROR, null, String(err));
const error = server.error(-32700, null, String(err));
const response = utils.response(error, undefined, undefined, self.options.version);
utils.JSON.stringify(response, options, function(err, body) {
if(err) {
Expand Down
13 changes: 6 additions & 7 deletions lib/server/tls.js
Expand Up @@ -11,18 +11,18 @@ const utils = require('../utils');
* @param {Object} [options] Options for this instance
* @return {ServerTls}
*/
const TlsServer = function(server, options) {
if(!(this instanceof TlsServer)) {
return new TlsServer(server, options);
const ServerTls = function(server, options) {
if(!(this instanceof ServerTls)) {
return new ServerTls(server, options);
}

this.options = utils.merge(server.options, options || {});

tls.Server.call(this, this.options, getTlsListener(this, server));
};
require('util').inherits(TlsServer, tls.Server);
require('util').inherits(ServerTls, tls.Server);

module.exports = TlsServer;
module.exports = ServerTls;

/**
* Returns a TLS-encrypted TCP connection listener bound to the server in the argument.
Expand Down Expand Up @@ -58,8 +58,7 @@ function getTlsListener(self, server) {

// ends the request with an error code
function respondError(err) {
const Server = require('../server');
const error = server.error(Server.errors.PARSE_ERROR, null, String(err));
const error = server.error(-32700, null, String(err));
const response = utils.response(error, undefined, undefined, self.options.version);
utils.JSON.stringify(response, options, function(err, body) {
if(err) {
Expand Down

0 comments on commit 85c5b48

Please sign in to comment.