Skip to content

Commit

Permalink
feat: Implement the Solid WebSocket protocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenVerborgh committed Nov 19, 2020
1 parent 2e95499 commit 3b9cda4
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 0 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Expand Up @@ -38,6 +38,7 @@ module.exports = {
'class-methods-use-this': 'off', // conflicts with functions from interfaces that sometimes don't require `this`
'comma-dangle': ['error', 'always-multiline'],
'dot-location': ['error', 'property'],
'lines-around-comment': 'off', // conflicts with padded-blocks
'lines-between-class-members': ['error', 'always', { exceptAfterSingleLine: true }],
'max-len': ['error', { code: 120, ignoreUrls: true }],
'new-cap': 'off', // used for RDF constants
Expand Down
128 changes: 128 additions & 0 deletions src/ldp/UnsecureWebSocketsProtocol.ts
@@ -0,0 +1,128 @@
import { EventEmitter } from 'events';
import type WebSocket from 'ws';
import { getLoggerFor } from '../logging/LogUtil';
import type { HttpRequest } from '../server/HttpRequest';
import type { WebSocketHandler } from '../server/WebSocketHandler';
import type { ResourceIdentifier } from './representation/ResourceIdentifier';

const VERSION = 'solid/0.1.0-alpha';

/**
* Implementation of Solid WebSockets API Spec solid/0.1.0-alpha
* at https://github.com/solid/solid-spec/blob/master/api-websockets.md
*/
class WebSocketListener extends EventEmitter {
private host = '';
private protocol = '';
private readonly socket: WebSocket;
private readonly subscribedPaths = new Set<string>();
private readonly logger = getLoggerFor(this);

public constructor(socket: WebSocket) {
super();
this.socket = socket;
socket.addListener('error', (): void => this.stop());
socket.addListener('close', (): void => this.stop());
socket.addListener('message', (message: string): void => this.onMessage(message));
}

public start(upgradeRequest: HttpRequest): void {
// Greet the client
this.sendMessage('protocol', VERSION);

// Verify the WebSocket protocol version
const protocolHeader = upgradeRequest.headers['sec-websocket-protocol'];
if (!protocolHeader) {
this.sendMessage('warning', `Missing Sec-WebSocket-Protocol header, expected value '${VERSION}'`);
} else {
const supportedProtocols = protocolHeader.split(/\s*,\s*/u);
if (!supportedProtocols.includes(VERSION)) {
this.sendMessage('error', `Client does not support protocol ${VERSION}`);
this.stop();
}
}

// Store the HTTP host and protocol
this.host = upgradeRequest.headers.host ?? '';
this.protocol = (upgradeRequest.socket as any).secure ? 'https' : 'http';
}

private stop(): void {
try {
this.socket.close();
} catch {
// Ignore
}
this.subscribedPaths.clear();
this.socket.removeAllListeners();
this.emit('closed');
}

public onResourceChanged({ path }: ResourceIdentifier): void {
if (this.subscribedPaths.has(path)) {
this.sendMessage('pub', path);
}
}

private onMessage(message: string): void {
// Parse the message
const match = /^(\w+)\s+(.+)$/u.exec(message);
if (!match) {
this.sendMessage('warning', `Unrecognized message format: ${message}`);
return;
}

// Process the message
const [ , type, value ] = match;
switch (type) {
// Subscription request for the resource with the given URL
case 'sub':
// Ensure correct protocol and host on the URL
try {
const url = new URL(value, `${this.protocol}:${this.host}`);
url.protocol = this.protocol;
url.host = this.host;
this.subscribedPaths.add(url.toString());
} catch (error: unknown) {
this.logger.warn(`Error subscribing to ${value}: ${(error as any).message}`);
}
break;

// Unsupported message type
default:
this.sendMessage('warning', `Unrecognized message type: ${type}`);
}
}

private sendMessage(type: string, value: string): void {
this.socket.send(`${type} ${value}`);
}
}

export class UnsecureWebSocketsProtocol implements WebSocketHandler {
private readonly logger = getLoggerFor(this);
private readonly listeners = new Set<WebSocketListener>();

public constructor(source: EventEmitter) {
source.on('changed', (changed: ResourceIdentifier): void => this.onResourceChanged(changed));
}

public handle(socket: WebSocket, upgradeRequest: HttpRequest): void {
const listener = new WebSocketListener(socket);
this.listeners.add(listener);
this.logger.info(`New WebSocket added, ${this.listeners.size} in total`);

listener.on('closed', (): void => {
this.listeners.delete(listener);
this.logger.info(`WebSocket closed, ${this.listeners.size} remaining`);
});

listener.start(upgradeRequest);
}

private onResourceChanged(changed: ResourceIdentifier): void {
for (const listener of this.listeners) {
listener.onResourceChanged(changed);
}
}
}
189 changes: 189 additions & 0 deletions test/unit/ldp/UnsecureWebSocketsProtocol.test.ts
@@ -0,0 +1,189 @@
import { EventEmitter } from 'events';
import type WebSocket from 'ws';
import { UnsecureWebSocketsProtocol } from '../../../src/ldp/UnsecureWebSocketsProtocol';
import type { HttpRequest } from '../../../src/server/HttpRequest';

class DummySocket extends EventEmitter {
public readonly messages = new Array<string>();
public readonly close = jest.fn();

public send(message: string): void {
this.messages.push(message);
}
}

describe('An UnsecureWebSocketsProtocol', (): void => {
const source = new EventEmitter();
const protocol = new UnsecureWebSocketsProtocol(source);

describe('after registering a socket', (): void => {
const socket = new DummySocket();

beforeAll((): void => {
const upgradeRequest = {
headers: {
host: 'mypod.example',
'sec-websocket-protocol': 'solid/0.1.0-alpha, other/1.0.0',
},
socket: {
secure: true,
},
} as any as HttpRequest;
protocol.handle(socket as any as WebSocket, upgradeRequest);
});

afterEach((): void => {
socket.messages.length = 0;
});

it('sends a protocol message.', (): void => {
expect(socket.messages).toHaveLength(1);
expect(socket.messages.shift()).toBe('protocol solid/0.1.0-alpha');
});

it('warns when receiving an unexpected message.', (): void => {
socket.emit('message', 'unexpected');
expect(socket.messages).toHaveLength(1);
expect(socket.messages.shift()).toBe('warning Unrecognized message format: unexpected');
});

it('warns when receiving an unexpected message type.', (): void => {
socket.emit('message', 'unknown 1 2 3');
expect(socket.messages).toHaveLength(1);
expect(socket.messages.shift()).toBe('warning Unrecognized message type: unknown');
});

describe('before subscribing to resources', (): void => {
it('does not emit pub messages.', (): void => {
source.emit('changed', { path: 'https://mypod.example/foo/bar' });
expect(socket.messages).toHaveLength(0);
});
});

describe('after subscribing to a resource', (): void => {
beforeAll((): void => {
socket.emit('message', 'sub https://mypod.example/foo/bar');
});

it('emits pub messages for that resource.', (): void => {
source.emit('changed', { path: 'https://mypod.example/foo/bar' });
expect(socket.messages).toHaveLength(1);
expect(socket.messages.shift()).toBe('pub https://mypod.example/foo/bar');
});
});

describe('after subscribing to a resource via a relative URL', (): void => {
beforeAll((): void => {
socket.emit('message', 'sub /relative/foo');
});

it('emits pub messages for that resource.', (): void => {
source.emit('changed', { path: 'https://mypod.example/relative/foo' });
expect(socket.messages).toHaveLength(1);
expect(socket.messages.shift()).toBe('pub https://mypod.example/relative/foo');
});
});

describe('after subscribing to a resource with the wrong host name', (): void => {
beforeAll((): void => {
socket.emit('message', 'sub https://wrong.example/host/foo');
});

it('emits pub messages for that resource.', (): void => {
source.emit('changed', { path: 'https://mypod.example/host/foo' });
expect(socket.messages).toHaveLength(1);
expect(socket.messages.shift()).toBe('pub https://mypod.example/host/foo');
});
});

describe('after subscribing to a resource with the wrong protocol', (): void => {
beforeAll((): void => {
socket.emit('message', 'sub http://pod.example/protocol/foo');
});

it('emits pub messages for that resource.', (): void => {
source.emit('changed', { path: 'https://mypod.example/protocol/foo' });
expect(socket.messages).toHaveLength(1);
expect(socket.messages.shift()).toBe('pub https://mypod.example/protocol/foo');
});
});

describe('after subscribing to an invalid URL', (): void => {
beforeAll((): void => {
socket.emit('message', 'sub \u0000');
});

it('does not emit pub messages for that resource.', (): void => {
source.emit('changed', { path: 'https://mypod.example/\u0000' });
expect(socket.messages).toHaveLength(0);
});
});

describe('after subscribing with an invalid host', (): void => {
let newSocket: DummySocket;

beforeAll((): void => {
newSocket = new DummySocket();
protocol.handle(newSocket as any as WebSocket, { headers: {}, socket: {}} as any);
newSocket.emit('message', 'sub \u0000');
});

it('does not emit pub messages for that resource.', (): void => {
source.emit('changed', { path: 'https://mypod.example/\u0000' });
expect(newSocket.messages).toHaveLength(2);
});
});
});

it('unsubscribes when a socket closes.', (): void => {
const newSocket = new DummySocket();
protocol.handle(newSocket as any as WebSocket, { headers: {}, socket: {}} as any);
expect(newSocket.listenerCount('message')).toBe(1);
newSocket.emit('close');
expect(newSocket.listenerCount('message')).toBe(0);
expect(newSocket.listenerCount('close')).toBe(0);
expect(newSocket.listenerCount('error')).toBe(0);
});

it('unsubscribes when a socket errors.', (): void => {
const newSocket = new DummySocket();
protocol.handle(newSocket as any as WebSocket, { headers: {}, socket: {}} as any);
expect(newSocket.listenerCount('message')).toBe(1);
newSocket.emit('error');
expect(newSocket.listenerCount('message')).toBe(0);
expect(newSocket.listenerCount('close')).toBe(0);
expect(newSocket.listenerCount('error')).toBe(0);
});

it('emits a warning when no Sec-WebSocket-Protocol is supplied.', (): void => {
const newSocket = new DummySocket();
const upgradeRequest = {
headers: {},
socket: {},
} as any as HttpRequest;
protocol.handle(newSocket as any as WebSocket, upgradeRequest);
expect(newSocket.messages).toHaveLength(2);
expect(newSocket.messages.shift()).toBe('protocol solid/0.1.0-alpha');
expect(newSocket.messages.shift())
.toBe('warning Missing Sec-WebSocket-Protocol header, expected value \'solid/0.1.0-alpha\'');
expect(newSocket.close).toHaveBeenCalledTimes(0);
});

it('emits an error and closes the connection when the wrong Sec-WebSocket-Protocol is supplied.', (): void => {
const newSocket = new DummySocket();
const upgradeRequest = {
headers: {
'sec-websocket-protocol': 'solid/1.0.0, other',
},
socket: {},
} as any as HttpRequest;
protocol.handle(newSocket as any as WebSocket, upgradeRequest);
expect(newSocket.messages).toHaveLength(2);
expect(newSocket.messages.shift()).toBe('protocol solid/0.1.0-alpha');
expect(newSocket.messages.shift()).toBe('error Client does not support protocol solid/0.1.0-alpha');
expect(newSocket.close).toHaveBeenCalledTimes(1);
expect(newSocket.listenerCount('message')).toBe(0);
expect(newSocket.listenerCount('close')).toBe(0);
expect(newSocket.listenerCount('error')).toBe(0);
});
});

0 comments on commit 3b9cda4

Please sign in to comment.