-
Notifications
You must be signed in to change notification settings - Fork 8
/
CollectGroups.coffee
94 lines (82 loc) · 3.23 KB
/
CollectGroups.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
noflo = require 'noflo'
class CollectGroups extends noflo.Component
description: 'Collect packets into object keyed by its groups'
constructor: ->
# Working variable for incoming IPs
@data = {}
# Breadcrumb of incoming groups
@groups = []
# Breadcrumb of each level of IPs as partitioned by groups
@parents = []
@inPorts = new noflo.InPorts
in:
datatype: 'all'
description: 'IPs to collect'
@outPorts = new noflo.OutPorts
out:
datatype: 'object'
description: 'An object containing input IPs sorted by their group
names'
error:
datatype: 'object'
@inPorts.in.on 'connect', =>
# Make sure working memory is clean
@data = {}
@inPorts.in.on 'begingroup', (group) =>
# We use the attribute name `$data` to indicate data IPs in the outgoing
# structure, so no `$data` please
if group is '$data'
@error 'groups cannot be named \'$data\''
return
# Save whatever in the working memory right now into its own level
@parents.push @data
# Save the current group
@groups.push group
# Clear working memory for new level
@data = {}
@inPorts.in.on 'data', (data) =>
@setData data
@inPorts.in.on 'endgroup', =>
# Temporarily save working memory. Yes, you read me right! This is the
# working memory of working memory. :)
data = @data
# Take out the previous level
@data = @parents.pop()
# Take the working memory (`data`) and put it into the previous level
# (`@data`) by a group name (`@groups.pop()`)
@addChild @data, @groups.pop(), data
# NOTE: it may sound odd that collating into working memory (`@data`)
# works. It does because this is ending a group (i.e. level). If what
# follows is a disconnect, then it flushes the working memory, which is
# the entire data structure anyway. If what follows is a new group, the
# working memory is pushed into the level breadcrumbs (`@parents`)
# anyway. If it's a data IP, it's saved into the `$data` attribute, not
# affecting the data structure.
@inPorts.in.on 'disconnect', =>
# Flush everything down the drain
@outPorts.out.send @data
@outPorts.out.disconnect()
# Put whatever in the working memory (`data`) into the last level (`parent`)
# by the group (`child`)
addChild: (parent, child, data) ->
# If `child` (i.e. the group) doesn't exist, simply put working memory in
# as-is
return parent[child] = data unless child of parent
# *OR*, if it's already an array, append to it
return parent[child].push data if Array.isArray parent[child]
# *OR*, if something already exists in place but isn't appendable, make it
# so by having whatever in it as the first element of the array
parent[child] = [ parent[child], data ]
setData: (data) ->
# Initialize our data IPs storage as an array if it doesn't exist
@data.$data ?= []
# Save the IP
@data.$data.push data
# Error handling
error: (msg) ->
if @outPorts.error.isAttached()
@outPorts.error.send new Error msg
@outPorts.error.disconnect()
return
throw new Error msg
exports.getComponent = -> new CollectGroups