Skip to content

Commit

Permalink
feat(transport): bump up msgpack to latest official (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj authored and billyvg committed Dec 30, 2019
1 parent 3894156 commit 669e1d9
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 59 deletions.
3 changes: 1 addition & 2 deletions packages/neovim/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
"main": "./lib/index",
"typings": "./lib/index",
"dependencies": {
"@msgpack/msgpack": "^1.9.3",
"lodash.defaults": "^4.2.0",
"lodash.omit": "^4.5.0",
"msgpack-lite": "^0.1.26",
"semver": "^7.1.1",
"winston": "3.2.1"
},
Expand All @@ -55,7 +55,6 @@
"@types/jest": "^24.0.24",
"@types/lodash.defaults": "^4.2.6",
"@types/lodash.omit": "^4.5.6",
"@types/msgpack-lite": "^0.1.7",
"@types/node": "10.17.x",
"@types/which": "^1.3.2",
"babel-jest": "^24.9.0",
Expand Down
120 changes: 63 additions & 57 deletions packages/neovim/src/utils/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

import { EventEmitter } from 'events';

import * as msgpack from 'msgpack-lite';

import Buffered from './buffered';
import { encode, decode, ExtensionCodec, decodeStream } from '@msgpack/msgpack';
import { Metadata } from '../api/types';

class Response {
Expand All @@ -25,13 +23,16 @@ class Response {
if (this.sent) {
throw new Error(`Response to id ${this.requestId} already sent`);
}

const encoded = encode([
1,
this.requestId,
isError ? resp : null,
!isError ? resp : null,
]);

this.encoder.write(
msgpack.encode([
1,
this.requestId,
isError ? resp : null,
!isError ? resp : null,
])
Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength)
);
this.sent = true;
}
Expand All @@ -42,86 +43,89 @@ class Transport extends EventEmitter {

private nextRequestId = 1;

private encodeStream: any;

private decodeStream: any;

private reader: NodeJS.ReadableStream;

private writer: NodeJS.WritableStream;

protected codec: msgpack.Codec;
private readonly extensionCodec: ExtensionCodec = this.initializeExtensionCodec();

// Neovim client that holds state
private client: any;

constructor() {
super();

const codec = this.setupCodec();
this.encodeStream = msgpack.createEncodeStream({ codec });
this.decodeStream = msgpack.createDecodeStream({ codec });
this.decodeStream.on('data', (msg: any[]) => {
this.parseMessage(msg);
});
this.decodeStream.on('end', () => {
this.detach();
this.emit('detach');
});
}

setupCodec() {
const codec = msgpack.createCodec();
private initializeExtensionCodec() {
const codec = new ExtensionCodec();

Metadata.forEach(({ constructor }, id: number): void => {
codec.addExtPacker(id, constructor, (obj: any) =>
msgpack.encode(obj.data)
);
codec.addExtUnpacker(
id,
data =>
new constructor({
codec.register({
type: id,
encode: (input: any) => {
if (input instanceof constructor) {
return encode(input.data);
}
return null;
},
decode: data => {
return new constructor({
transport: this,
client: this.client,
data: msgpack.decode(data),
})
);
data: decode(data),
});
},
});
});

this.codec = codec;
return this.codec;
return codec;
}

private encodeToBuffer(value: unknown) {
const encoded = encode(value, { extensionCodec: this.extensionCodec });
return Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength);
}

attach(
writer: NodeJS.WritableStream,
reader: NodeJS.ReadableStream,
client: any
) {
this.encodeStream = this.encodeStream.pipe(writer);
const buffered = new Buffered();
reader.pipe(buffered).pipe(this.decodeStream);
this.writer = writer;
this.reader = reader;
this.client = client;
}

detach() {
this.encodeStream.unpipe(this.writer);
this.reader.unpipe(this.decodeStream);
this.reader.on('end', () => {
this.emit('detach');
});

const asyncDecodeGenerator = decodeStream(this.reader as any, {
extensionCodec: this.extensionCodec,
});

// naively iterate async generator created via decodeStream.
// when runtime / polyfill allows replace to `for await (const val of asyncDecodeGenerator)`
// syntax instead.
const resolveGeneratorRecursively = (iter: AsyncGenerator) => {
iter.next().then(resolved => {
if (!resolved.done) {
this.parseMessage(resolved.value);
return resolveGeneratorRecursively(iter);
}
return Promise.resolve();
});
};

resolveGeneratorRecursively(asyncDecodeGenerator);
}

request(method: string, args: any[], cb: Function) {
this.nextRequestId = this.nextRequestId + 1;
this.encodeStream.write(
msgpack.encode([0, this.nextRequestId, method, args], {
codec: this.codec,
})
this.writer.write(
this.encodeToBuffer([0, this.nextRequestId, method, args])
);

this.pending.set(this.nextRequestId, cb);
}

notify(method: string, args: any[]) {
this.encodeStream.write([2, method, args]);
this.writer.write(this.encodeToBuffer([2, method, args]));
}

parseMessage(msg: any[]) {
Expand All @@ -136,7 +140,7 @@ class Transport extends EventEmitter {
'request',
msg[2].toString(),
msg[3],
new Response(this.encodeStream, msg[1])
new Response(this.writer, msg[1])
);
} else if (msgType === 1) {
// response to a previous request:
Expand All @@ -153,7 +157,9 @@ class Transport extends EventEmitter {
// - msg[2]: arguments
this.emit('notification', msg[1].toString(), msg[2]);
} else {
this.encodeStream.write([1, 0, 'Invalid message type', null]);
this.writer.write(
this.encodeToBuffer([1, 0, 'Invalid message type', null])
);
}
}
}
Expand Down

0 comments on commit 669e1d9

Please sign in to comment.