Browse files

New datacollector

  • Loading branch information...
1 parent 44dc056 commit 941b58723499d1a9c93253d48735a3f6d09ecfbe @bergie bergie committed Oct 8, 2013
Showing with 81 additions and 0 deletions.
  1. +2 −0 component.json
  2. +78 −0 components/CollectObject.coffee
  3. +1 −0 package.json
View
2 component.json
@@ -24,6 +24,7 @@
"components/ReadGroup.coffee",
"components/SendByGroup.coffee",
"components/CollectGroups.coffee",
+ "components/CollectObject.coffee",
"components/FirstGroup.coffee",
"components/MapGroup.coffee",
"components/MergeGroups.coffee",
@@ -45,6 +46,7 @@
"ReadGroup": "components/ReadGroup.coffee",
"SendByGroup": "components/SendByGroup.coffee",
"CollectGroups": "components/CollectGroups.coffee",
+ "CollectObject": "components/CollectObject.coffee",
"FirstGroup": "components/FirstGroup.coffee",
"MapGroup": "components/MapGroup.coffee",
"MergeGroups": "components/MergeGroups.coffee",
View
78 components/CollectObject.coffee
@@ -0,0 +1,78 @@
+noflo = require 'noflo'
+
+class CollectObject extends noflo.Component
+ description: 'Collect packets to an object identified by keys organized
+ by connection'
+
+ constructor: ->
+ @keys = []
+ @data = {}
+ @groups = {}
+
+ @inPorts =
+ keys: new noflo.ArrayPort 'string'
+ collect: new noflo.ArrayPort 'all'
+ release: new noflo.Port 'bang'
+ clear: new noflo.Port 'bang'
+ @outPorts =
+ out: new noflo.Port 'object'
+
+ @inPorts.keys.on 'data', (key) =>
+ keys = key.split ','
+ if keys.length > 1
+ @keys = []
+ for key in keys
+ @keys.push key
+
+ @inPorts.collect.once 'connect', =>
+ @subscribeSockets()
+
+ @inPorts.release.on 'data', =>
+ do @release
+ @inPorts.clear.on 'data', =>
+ do @clear
+
+ release: ->
+ @outPorts.out.send @data
+ @outPorts.out.disconnect()
+ @data = @clone @data
+
+ subscribeSockets: ->
+ # Subscribe to sockets individually
+ @inPorts.collect.sockets.forEach (socket, idx) =>
+ @subscribeSocket socket, idx
+
+ subscribeSocket: (socket, id) ->
+ socket.on 'begingroup', (group) =>
+ unless @groups[id]
+ @groups[id] = []
+ @groups[id].push group
+ socket.on 'data', (data) =>
+ return unless @keys[id]
+ groupId = @groupId @groups[id]
+ unless @data[groupId]
+ @data[groupId] = {}
+ @data[groupId][@keys[id]] = data
+ socket.on 'endgroup', =>
+ return unless @groups[id]
+ @groups[id].pop()
+
+ groupId: (groups) ->
+ unless groups.length
+ return 'ungrouped'
+ groups[0]
+
+ clone: (data) ->
+ newData = {}
+ for groupName, groupedData of data
+ newData[groupName] = {}
+ for name, value of groupedData
+ continue unless groupedData.hasOwnProperty name
+ newData[groupName][name] = value
+ newData
+
+ clear: ->
+ @data = {}
+ @groups = {}
+
+exports.getComponent = -> new CollectObject
View
1 package.json
@@ -61,6 +61,7 @@
"ReadGroup": "./components/ReadGroup.coffee",
"SendByGroup": "components/SendByGroup.coffee",
"CollectGroups": "./components/CollectGroups.coffee",
+ "CollectObject": "components/CollectObject.coffee",
"FirstGroup": "./components/FirstGroup.coffee",
"MapGroup": "./components/MapGroup.coffee",
"MergeGroups": "./components/MergeGroups.coffee",

0 comments on commit 941b587

Please sign in to comment.