diff --git a/src/GremlinClient.js b/src/GremlinClient.js index 29201b9..23f4b55 100644 --- a/src/GremlinClient.js +++ b/src/GremlinClient.js @@ -17,6 +17,22 @@ const hasCode = (filterCode) => ({ status: { code } }) => code === filterCode; const isErrorMessage = ({ status: { code }}) => [200, 204, 206].indexOf(code) === -1; +const serializeToBinary = (message, accept) => { + let serializedMessage = accept + JSON.stringify(message); + serializedMessage = unescape(encodeURIComponent(serializedMessage)); + + // Let's start packing the message into binary + // mimeLength(1) + mimeType Length + serializedMessage Length + let binaryMessage = new Uint8Array(1 + serializedMessage.length); + binaryMessage[0] = accept.length; + + for (let i = 0; i < serializedMessage.length; i++) { + binaryMessage[i + 1] = serializedMessage.charCodeAt(i); + } + + return binaryMessage; +} + class GremlinClient extends EventEmitter { constructor(port = 8182, host = 'localhost', options = {}) { super(); @@ -49,11 +65,6 @@ class GremlinClient extends EventEmitter { this.commands = {}; - const connection = this.createConnection({ - port, - host, - path: this.options.path - }); this.commands$ = new Rx.Subject(); this.commands$.subscribe((command) => { @@ -61,26 +72,30 @@ class GremlinClient extends EventEmitter { this.commands[requestId] = command }); - this.registerConnection(connection); - } + const connection = this.createConnection({ + port, + host, + path: this.options.path + }); - createConnection({ port, host, path }) { - return new WebSocketGremlinConnection({ port, host, path }); - } + const connections$ = Rx.Observable.create((observer) => observer.next(connection)); - registerConnection(connection) { - this.connection = connection; + const open$ = connections$ + .flatMap((connection) => Rx.Observable.fromEvent(connection, 'open')); - const open$ = Rx.Observable.fromEvent(connection, 'open'); - const error$ = Rx.Observable.fromEvent(connection, 'error'); - const incomingMessages$ = Rx.Observable.fromEvent(connection, 'message') + const error$ = connections$ + .flatMap((connection) => Rx.Observable.fromEvent(connection, 'error')); + + const incomingMessages$ = connections$ + .flatMap((connection) => Rx.Observable.fromEvent(connection, 'message')) .map(({ data }) => { const buffer = new Buffer(data, 'binary'); const rawMessage = JSON.parse(buffer.toString('utf-8')); return rawMessage; }); - const close$ = Rx.Observable.fromEvent(connection, 'close'); + const close$ = connections$ + .flatMap((connection) => Rx.Observable.fromEvent(connection, 'close')); const canSend$ = Rx.Observable.merge( open$.map(true), @@ -97,11 +112,18 @@ class GremlinClient extends EventEmitter { close$.subscribe((event) => this.handleDisconnection(event)); const outgoingMessages$ = this.commands$ - .map(({ message }) => message) - .pausableBuffered(canSend$); + .map(({ message }) => serializeToBinary(message, this.options.accept)) + .pausableBuffered(canSend$) + .combineLatest(connections$); outgoingMessages$ - .subscribe((message) => this.sendMessage(message)); + .subscribe(([binaryMessage, connection]) => + connection.sendMessage(binaryMessage) + ); + } + + createConnection({ port, host, path }) { + return new WebSocketGremlinConnection({ port, host, path }); } closeConnection() { @@ -190,22 +212,6 @@ class GremlinClient extends EventEmitter { return message; }; - sendMessage(message) { - let serializedMessage = this.options.accept + JSON.stringify(message); - serializedMessage = unescape(encodeURIComponent(serializedMessage)); - - // Let's start packing the message into binary - // mimeLength(1) + mimeType Length + serializedMessage Length - let binaryMessage = new Uint8Array(1 + serializedMessage.length); - binaryMessage[0] = this.options.accept.length; - - for (let i = 0; i < serializedMessage.length; i++) { - binaryMessage[i + 1] = serializedMessage.charCodeAt(i); - } - - this.connection.sendMessage(binaryMessage); - }; - /** * Asynchronously send a script to Gremlin Server for execution and fire * the provided callback when all results have been fetched.