-
Notifications
You must be signed in to change notification settings - Fork 11
/
RemoteSubGraph.coffee
162 lines (141 loc) · 5.11 KB
/
RemoteSubGraph.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
noflo = require 'noflo'
connection = require './connection'
fbpClient = require 'fbp-protocol-client'
debug = require('debug') 'noflo-runtime:remotesubgraph'
class RemoteSubGraph extends noflo.Component
constructor: (metadata) ->
metadata = {} unless metadata
super()
@runtime = null
@ready = false
@graph = null
@graphName = null
isReady: ->
@ready
setReady: (ready) ->
debug("#{@nodeId} setting ready to #{ready}")
@ready = ready
@emit 'ready' if ready
setUp: (callback) ->
@runtime.start()
do callback
tearDown: (callback) ->
@runtime.stop()
@runtime.disconnect()
do callback
setDefinition: (definition) ->
@definition = definition
try
Runtime = fbpClient.getTransport @definition.protocol
catch e
throw new Error "'#{@definition.protocol}' protocol not supported: " + e.message
@runtime = new Runtime @definition
@description = definition.description || ''
@setIcon definition.icon if definition.icon
@runtime.on 'runtime', (msg) =>
if msg.command is 'runtime'
@handleRuntime definition, msg.payload
if msg.command == 'ports'
@setupPorts msg.payload
else if msg.command == 'packet'
@onPacketReceived msg.payload
ready = false
@runtime.on 'error', (err) ->
throw err unless ready
console.error err
@runtime.once 'capabilities', ->
ready = true
# Attempt to connect
@runtime.connect()
handleRuntime: (definition, payload) ->
if 'protocol:runtime' not in payload.capabilities
throw new Error "runtime #{definition.id} does not allow protocol:runtime"
if payload.graph and payload.graph is definition.graph
debug "#{@nodeId} runtime is already running desired graph #{payload.graph}"
@graphName = payload.graph
# Already running the desired graph
@graph = new noflo.Graph payload.graph
@runtime.setMain @graph
return
unless definition.graph
# No graph to upload, accept what runtime has
return
# Prepare to upload graph
if 'protocol:graph' not in payload.capabilities
throw new Error "runtime #{definition.id} does not allow protocol:graph"
debug "#{@nodeId} sending graph #{definition.graph} to runtime (had #{payload.graph})"
noflo.graph.loadFile definition.graph, (err, graph) =>
throw err if err
graph.properties.id = definition.graph unless graph.properties.id
@setGraph graph, (err) ->
throw err if err
setGraph: (graph, callback) ->
@graph = graph
@graphName = graph.name or graph.properties.id
@runtime.setMain graph
connection.sendGraph graph, @runtime, callback, true
setupPorts: (ports) ->
return if @definition?.graph and not @graph
if @graph
# We should only emit ready once the remote runtime sent us at least all the ports that
# the graph exports
for exported, metadata of @graph.inports
matching = ports.inPorts.filter (port) -> port.id is exported
return unless matching.length
for exported, metadata of @graph.outports
matching = ports.outPorts.filter (port) -> port.id is exported
return unless matching.length
inportNames = ports.inPorts.map (p) -> p.id
outportNames = ports.outPorts.map (p) -> p.id
debug "#{@nodeId} received inports #{inportNames.join(', ')}"
debug "#{@nodeId} received outports #{outportNames.join(', ')}"
@setReady false
# Expose remote graph's exported ports as node ports
@prepareInport port for port in ports.inPorts
@prepareOutport port for port in ports.outPorts
@setReady true
normalizePort: (definition) ->
type = definition.type or 'all'
type = 'all' if type is 'any'
return def =
datatype: type
required: definition.required or false
addressable: definition.addressable or false
prepareInport: (definition) ->
name = definition.id
return if @inPorts.ports[name]
# Send data across to remote graph
@inPorts.add name, @normalizePort definition
@inPorts.ports[name].on 'ip', (ip) =>
switch ip.type
when 'data'
event = 'data'
when 'openBracket'
event = 'begingroup'
when 'closeBracket'
event = 'endgroup'
@runtime.sendRuntime 'packet',
port: name
event: event
payload: ip.data
graph: @graphName
prepareOutport: (definition) ->
name = definition.id
port = @outPorts.add name, @normalizePort definition
onPacketReceived: (packet) ->
name = packet.port
port = @outPorts[name]
switch packet.event
when 'connect' then port.connect()
when 'begingroup' then port.beginGroup packet.payload
when 'data' then port.send packet.payload
when 'endgroup' then port.endGroup packet.payload
when 'disconnect' then port.disconnect()
exports.RemoteSubGraph = RemoteSubGraph
exports.getComponent = (metadata) -> new RemoteSubGraph metadata
exports.getComponentForRuntime = (runtime, baseDir) ->
return (metadata) ->
instance = exports.getComponent metadata
instance.baseDir = baseDir
instance.setDefinition runtime
return instance