Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Box: support realtime event stream #222

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion box/box.html
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@
<label for="node-input-filepattern"><i class="fa fa-file"></i> <span data-i18n="box.label.pattern"></span></label>
<input type="text" id="node-input-filepattern" data-i18n="[placeholder]box.placeholder.pattern">
</div>
<div class="form-row">
<label for="node-input-interval"><i class="fa fa-clock"></i> <span data-i18n="box.label.interval"></span></label>
<input type="number" id="node-input-interval"/>
</div>
<div class="form-row">
<label for="node-input-longpolling"><i class="fa fa-rocket"></i> <span data-i18n="box.label.longpolling"></span></label>
<input type="checkbox" id="node-input-longpolling"/>
</div>
<div class="form-row">
<label for="node-input-name"><i class="fa fa-tag"></i> <span data-i18n="box.label.name"></span></label>
<input type="text" id="node-input-name" data-i18n="[placeholder]box.placeholder.name">
Expand All @@ -176,13 +184,20 @@
defaults: {
box: {type:"box-credentials",required:true},
filepattern: {value:""},
name: {value:""}
name: {value:""},
longpolling: {value: false},
interval: {value: 600}
},
inputs:0,
outputs:1,
icon: "box.png",
label: function() {
return this.name||this.filepattern||'Box';
},
oneditprepare: function() {
$('#node-input-longpolling').change(function () {
$('#node-input-interval').prop('disabled', $(this).prop('checked'));
});
}
});
</script>
Expand Down
254 changes: 191 additions & 63 deletions box/box.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ module.exports = function(RED) {
return;
}
if (data.error) {
node.error(RED._("box.error.refresh-token-error",{message:data.error.message}));
node.error(RED._("box.error.refresh-token-error",{message:data.error}));
return;
}
// console.log("refreshed: " + require('util').inspect(data));
Expand Down Expand Up @@ -210,11 +210,14 @@ module.exports = function(RED) {
};

function constructFullPath(entry) {
var parentPath = entry.path_collection.entries
.filter(function (e) { return e.id !== "0"; })
.map(function (e) { return e.name; })
.join('/');
return (parentPath !== "" ? parentPath+'/' : "") + entry.name;
if (entry.path_collection) {
var parentPath = entry.path_collection.entries
.filter(function (e) { return e.id !== "0"; })
.map(function (e) { return e.name; })
.join('/');
return (parentPath !== "" ? parentPath+'/' : "") + entry.name;
}
return entry.name;
}

RED.httpAdmin.get('/box-credentials/auth', function(req, res) {
Expand Down Expand Up @@ -315,80 +318,85 @@ module.exports = function(RED) {
res.send(RED._("box.error.authorized"));
});
});
});
1 });

function BoxInNode(n) {
RED.nodes.createNode(this,n);
this.filepattern = n.filepattern || "";
this.interval = n.interval || 60;
this.longpolling = Boolean(n.longpolling);
this.box = RED.nodes.getNode(n.box);
this.seenEvents = {};
var node = this;
if (!this.box || !this.box.credentials.accessToken) {
this.warn(RED._("box.warn.missing-credentials"));
return;
}
node.status({fill:"blue",shape:"dot",text:"box.status.initializing"});
this.box.request({
url: 'https://api.box.com/2.0/events?stream_position=now&stream_type=changes',
}, function (err, data) {
if (err) {
node.error(RED._("box.error.event-stream-initialize-failed",{err:err.toString()}));
node.status({fill:"red",shape:"ring",text:"box.status.failed"});
return;

// this will fire once the initial stream position is determined
node.on('ready', function () {
if (node.longpolling) {
node.startLongPolling();
} else {
node.startIntervalPolling();
}
node.state = data.next_stream_position;
node.status({});
node.on("input", function(msg) {
node.status({fill:"blue",shape:"dot",text:"box.status.checking-for-events"});
node.box.request({
url: 'https://api.box.com/2.0/events?stream_position='+node.state+'&stream_type=changes',
}, function(err, data) {
if (err) {
node.error(RED._("box.error.events-fetch-failed",{err:err.toString()}),msg);
node.status({});
return;
}
});

// this fires if either the long-poller says we should have data, or on an interval.
node.on('check-events', function() {
node.status({fill:"blue",shape:"dot",text:"box.status.checking-for-events"});
node.box.request({
url: 'https://api.box.com/2.0/events?stream_position='+node.state+'&stream_type=changes',
}, function(err, data) {
if (err) {
node.error(RED._("box.error.events-fetch-failed",{err:err.toString()}),{});
node.status({});
node.state = data.next_stream_position;
for (var i = 0; i < data.entries.length; i++) {
// TODO: support other event types
// TODO: suppress duplicate events
// for both of the above see:
// https://developers.box.com/docs/#events
var event;
if (data.entries[i].event_type === 'ITEM_CREATE') {
event = 'add';
} else if (data.entries[i].event_type === 'ITEM_UPLOAD') {
event = 'add';
} else if (data.entries[i].event_type === 'ITEM_RENAME') {
event = 'add';
// TODO: emit delete event?
} else if (data.entries[i].event_type === 'ITEM_TRASH') {
// need to find old path
node.lookupOldPath({}, data.entries[i], 'delete');
/* strictly speaking the {} argument above should
* be clone(msg) but:
* - it must be {}
* - if there was any possibility of a different
* msg then it should be cloned using the
* node-red/red/nodes/Node.js cloning function
*/
continue;
} else {
event = 'unknown';
}
//console.log(JSON.stringify(data.entries[i], null, 2));
node.sendEvent(msg, data.entries[i], event);
return;
}
node.status({});
node.state = data.next_stream_position;
node.emit('ready');
for (var i = 0; i < data.entries.length; i++) {
// TODO: support other event types
// TODO: suppress duplicate events
// for both of the above see:
// https://developers.box.com/docs/#events
var event;
if (node.seenEvents[data.entries[i].event_id]) {
continue;
}
});
});
var interval = setInterval(function() {
node.emit("input", {});
}, 600000); // 10 minutes
node.on("close", function() {
if (interval !== null) { clearInterval(interval); }
node.seenEvents[data.entries[i].event_id] = true;
if (data.entries[i].event_type === 'ITEM_CREATE') {
event = 'add';
} else if (data.entries[i].event_type === 'ITEM_UPLOAD') {
event = 'add';
} else if (data.entries[i].event_type === 'ITEM_RENAME') {
event = 'add';
// TODO: emit delete event?
} else if (data.entries[i].event_type === 'ITEM_TRASH') {
// need to find old path
node.lookupOldPath({}, data.entries[i], 'delete');
/* strictly speaking the {} argument above should
* be clone(msg) but:
* - it must be {}
* - if there was any possibility of a different
* msg then it should be cloned using the
* node-red/red/nodes/Node.js cloning function
*/
continue;
} else {
event = 'unknown';
}
//console.log(JSON.stringify(data.entries[i], null, 2));
node.sendEvent({}, data.entries[i], event);
}
});
});

node.getInitialStreamPosition();
}

RED.nodes.registerType("box in", BoxInNode);

BoxInNode.prototype.sendEvent = function(msg, entry, event, path) {
Expand Down Expand Up @@ -424,6 +432,126 @@ module.exports = function(RED) {
});
};

/**
* Init long-polling to retrieve events from Box in real-time
*
* If long polling is enabled, we make an OPTIONS request to the "events"
* endpoint. this endpoint will return another URL to poll, in addition
* to a "timeout" value. we will then make a GET request to the supplied URL
* and wait for a response. if the response is "new_change", we hit the
* "events" endpoint with the usual GET request (and stream position). if the
* response is "retry_timeout", we retry the operation again from the OPTIONS
* request. if the "timeout" value is exceeded, we retry the GET request.
* if "max_retries" is exceeded, we start again from OPTIONS.
* @see https://developer.box.com/v2.0/reference#long-polling
* @private
*/
BoxInNode.prototype.startLongPolling = function() {
var node = this;
node.box.request({
url: 'https://api.box.com/2.0/events',
method: 'OPTIONS'
}, function (err, data) {
if (err) {
node.error(RED._('box.error.long-polling-failed'), {
err: err.toString()
});
return;
}
if (!(data.entries && data.entries.length)) {
node.error(RED._('box.error.invalid-response'), {
err: err.toString()
});
return;
}
node.longpoll(data.entries.shift());
});
}

/**
* Initializes default (interval-based) polling
*/
BoxInNode.prototype.startIntervalPolling = function () {
var node = this;

if (!node.pollingInterval) {
node.pollingInterval = setInterval(function() {
node.emit('check-events');
}, node.interval * 1000); // interval in ms

node.on("close", function() {
clearInterval(node.pollingInterval);
});
}
};

/**
* Gets initial stream position from events endpoint
* Saves as "state" property. When ready, emits "ready" event, which will
* initialize long-polling or interval-based polling, depending on Node config.
*/
BoxInNode.prototype.getInitialStreamPosition = function () {
var node = this;
node.box.request({
url: 'https://api.box.com/2.0/events?stream_position=now&stream_type=changes',
}, function (err, data) {
if (err) {
node.error(RED._('box.error.event-stream-initialize-failed', {err: err.toString()}));
node.status({
fill: 'red',
shape: 'ring',
text: 'box.status.failed'
});
return;
}
node.state = data.next_stream_position;
node.status({});
node.emit('ready');
});
};

/**
* Long-poll Box for new events
* This function calls itself recursively until config.max_retries is hit. At that point
* it will call BoxInNode#startPolling again.
* @private
* @param {Object} config Object returned by Box's "events" endpoint when hit with OPTIONS method
* @param {number} config.max_retries Number of retries allowed
* @param {number} config.retry_timeout Retry the GET request after this many seconds
* @param {string} config.url Endpoint of GET request
* @param {number} [count=0] Current retry count
*/
BoxInNode.prototype.longpoll = function (config, count) {
var node = this;
count = count || 0;
node.box.request({
url: config.url,
timeout: parseInt(config.retry_timeout, 10) * 1000
}, function (err, data) {
if (err) {
if (err.code === 'ESOCKETTIMEDOUT') {
if (count === parseInt(config.max_retries, 10) - 1) {
node.startLongPolling();
return;
}
process.nextTick(function() {
node.longpoll(config, ++count);
});
return;
}
node.error(RED._('box.error.long-polling-failed'), {err: err.toString()});
return;
}
if (data.message === 'new_change') {
node.emit('check-events');
} else if (data.message === 'reconnect') {
node.startLongPolling();
} else {
// ???
}
});
};

function BoxQueryNode(n) {
RED.nodes.createNode(this,n);
this.filename = n.filename || "";
Expand Down
11 changes: 8 additions & 3 deletions box/locales/en-US/box.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
"clientid": "Client Id",
"secret": "Secret",
"authenticate": "Authenticate with Box",
"boxuser": "Box User"
"boxuser": "Box User",
"interval": "Polling Interval (seconds)",
"longpolling": "Real-Time Updates?"
},
"placeholder": {
"pattern": "Filepattern",
Expand All @@ -30,7 +32,8 @@
"resolving-path": "resolving path",
"downloading": "downloading",
"uploading": "uploading",
"overwriting": "overwriting"
"overwriting": "overwriting",
"waiting": "waiting"
},
"warn": {
"refresh-token": "trying to refresh token due to expiry",
Expand All @@ -55,7 +58,9 @@
"no-filename-specified": "No filename specified",
"path-resolve-failed": "failed to resolve path: __err__",
"download-failed": "download failed: __err__",
"upload-failed": "failed upload: __err__"
"upload-failed": "failed upload: __err__",
"long-polling-failed": "long-polling request failed: __err__",
"invalid-response": "invalid response from Box when polling: __err__"
}
}
}
Loading