Skip to content

Commit

Permalink
ZAPI-662: Initial changefeed module implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
richardkiene committed Oct 29, 2015
1 parent 69a3902 commit 9590e79
Show file tree
Hide file tree
Showing 13 changed files with 1,051 additions and 27 deletions.
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
[submodule "deps/jsstyle"]
path = deps/jsstyle
url = https://github.com/davepacheco/jsstyle.git
[submodule "javascriptlint"]
path = javascriptlint
url = https://github.com/davepacheco/javascriptlint.git
[submodule "deps/javascriptlint"]
path = deps/javascriptlint
url = https://github.com/davepacheco/javascriptlint.git
7 changes: 2 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ TAP := ./node_modules/.bin/tap
#
# Files
#
DOC_FILES = index.md boilerplateapi.md
DOC_FILES = index.md
JS_FILES := $(shell ls *.js) $(shell find lib test -name '*.js')
JSON_FILES = package.json
JSL_CONF_NODE = tools/jsl.node.conf
JSL_FILES_NODE = $(JS_FILES)
JSSTYLE_FILES = $(JS_FILES)
JSSTYLE_FLAGS = -f tools/jsstyle.conf
REPO_MODULES = src/node-dummy
SMF_MANIFESTS_IN = smf/manifests/bapi.xml.in

NODE_PREBUILT_VERSION=v0.8.28

Expand All @@ -53,13 +52,12 @@ ifeq ($(shell uname -s),SunOS)
else
include ./tools/mk/Makefile.node.defs
endif
include ./tools/mk/Makefile.smf.defs

#
# Repo-specific targets
#
.PHONY: all
all: $(SMF_MANIFESTS) | $(TAP) $(REPO_DEPS)
all: $(TAP) $(REPO_DEPS)
$(NPM) rebuild

$(TAP): | $(NPM_EXEC)
Expand All @@ -77,5 +75,4 @@ ifeq ($(shell uname -s),SunOS)
else
include ./tools/mk/Makefile.node.targ
endif
include ./tools/mk/Makefile.smf.targ
include ./tools/mk/Makefile.targ
1 change: 1 addition & 0 deletions deps/javascriptlint
Submodule javascriptlint added at 040bf5
130 changes: 130 additions & 0 deletions lib/listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

/*
* Copyright (c) 2015, Joyent, Inc.
*/

var mod_assert = require('assert');
var mod_assertplus = require('assert');
var mod_bunyan = require('bunyan');
var mod_http = require('http');
var mod_inherits = require('util').inherits;
var mod_readable = require('stream').Readable;
var mod_watershed = require('watershed').Watershed;

var shed = new mod_watershed();

/*
* Listener module constructor that takes an options object.
*
* Example options object:
* var options = {
* log: new mod_bunyan({
* name: 'my_logger',
* level: 'info',
* stream: process.stderr
* }),
* endpoint: '127.0.0.1',
* instance: '<UUID>',
* service: '<service name>',
* changeKind: {
* resource: '<resource name>',
* subResources: ['subresource1', 'subresource2']
* }
* };
*/
function Listener(options) {
var self = this;

mod_readable.call(self, { objectMode: true });

self.log = options.log;
self.endpoint = options.endpoint;
self.instance = options.instance;
self.service = options.service;
self.changeKind = options.changeKind;

self.initBootstrap = true;
}

mod_inherits(Listener, mod_readable);

/*
* Registers the listener with publisher endpoint specified in options.
* Once registered, this method also handles pushing stream data to the consumer
* of the Listener object.
*/
Listener.prototype.register = function register() {
var self = this;
var registration = {
instance: self.instance,
service: self.service,
changeKind: self.changeKind
};
var wskey = shed.generateKey();
var options = {
port: 8080,
hostname: self.endpoint,
headers: {
'connection': 'upgrade',
'upgrade': 'websocket',
'Sec-WebSocket-Key': wskey
}
};
var req = mod_http.request(options);
req.end();
req.on('upgrade', function _upgrade(res, socket, head) {
var wsc = self.wsc = shed.connect(res, socket, head, wskey);

// Send registration
try {
wsc.send(JSON.stringify(registration));
} catch (ex) {
self.log.error('ex: %s', ex.message);
}

wsc.on('end', function _end() {
self.emit('connection-end');
self.log.info('websocket ended');
});

wsc.on('connectionReset', function _connectionReset() {
self.emit('connection-end');
self.log.info('websocket ended');
});

// Handles publisher change feed items and bootstrap response.
// The only valid response from the publisher when the listener is in
// the initBootstrap state, is a bootstrap object. From that point
// forward it is expected that all items are change feed items.
wsc.on('text', function _recieveRegistration(text) {
var item = JSON.parse(text);
if (self.initBootstrap && item.hasOwnProperty('bootstrapRoute')) {
self.initBootstrap = false;
self.emit('bootstrap', item);
} else if (!self.initBootstrap) {
self.push(item);
} else {
self.log.error(
'Invalid socket state! text: %s initBootstrap: %s',
text,
self.initBootstrap);
self.emit('error');
}
});
});
};

Listener.prototype._read = function _read() {
// This function is required, but I'm not sure we should do anything
};

Listener.prototype._endSocket = function _endSocket() {
this.wsc.end();
};

module.exports = Listener;
Loading

0 comments on commit 9590e79

Please sign in to comment.