Skip to content

Commit

Permalink
Propagate edge connection errors to caller
Browse files Browse the repository at this point in the history
  • Loading branch information
bergie committed Oct 20, 2017
1 parent f4cdef9 commit 1ffcec8
Showing 1 changed file with 40 additions and 30 deletions.
70 changes: 40 additions & 30 deletions src/lib/Network.coffee
Expand Up @@ -242,32 +242,40 @@ class Network extends EventEmitter
# Start with node creators
nodes "Node"

connectPort: (socket, process, port, index, inbound) ->
connectPort: (socket, process, port, index, inbound, callback) ->
if inbound
socket.to =
process: process
port: port
index: index

unless process.component.inPorts and process.component.inPorts[port]
throw new Error "No inport '#{port}' defined in process #{process.id} (#{socket.getId()})"
callback new Error "No inport '#{port}' defined in process #{process.id} (#{socket.getId()})"
return
if process.component.inPorts[port].isAddressable()
return process.component.inPorts[port].attach socket, index
return process.component.inPorts[port].attach socket
process.component.inPorts[port].attach socket, index
do callback
return
process.component.inPorts[port].attach socket
do callback
return

socket.from =
process: process
port: port
index: index

unless process.component.outPorts and process.component.outPorts[port]
throw new Error "No outport '#{port}' defined in process #{process.id} (#{socket.getId()})"
callback new Error "No outport '#{port}' defined in process #{process.id} (#{socket.getId()})"
return

if process.component.outPorts[port].isAddressable()
return process.component.outPorts[port].attach socket, index
process.component.outPorts[port].attach socket, index
do callback
return
process.component.outPorts[port].attach socket
do callback
return

subscribeGraph: ->
# A NoFlo graph may change after network initialization.
Expand Down Expand Up @@ -448,11 +456,13 @@ class Network extends EventEmitter
# Subscribe to events from the socket
@subscribeSocket socket, from

@connectPort socket, to, edge.to.port, edge.to.index, true
@connectPort socket, from, edge.from.port, edge.from.index, false
@connectPort socket, to, edge.to.port, edge.to.index, true, (err) =>
return callback err if err
@connectPort socket, from, edge.from.port, edge.from.index, false, (err) =>
return callback err if err

@connections.push socket
callback()
@connections.push socket
callback()

removeEdge: (edge, callback) ->
for connection in @connections
Expand All @@ -466,7 +476,6 @@ class Network extends EventEmitter
do callback

addDefaults: (node, callback) ->

process = @processes[node.id]

unless process.component.isReady()
Expand All @@ -486,7 +495,7 @@ class Network extends EventEmitter
# Subscribe to events from the socket
@subscribeSocket socket

@connectPort socket, process, key, undefined, true
@connectPort socket, process, key, undefined, true, ->

@connections.push socket

Expand All @@ -511,26 +520,27 @@ class Network extends EventEmitter
@addInitial initializer, callback
return

@connectPort socket, to, initializer.to.port, initializer.to.index, true

@connections.push socket

init =
socket: socket
data: initializer.from.data
@initials.push init
@nextInitials.push init
@connectPort socket, to, initializer.to.port, initializer.to.index, true, (err) =>
return callback err if err

if @isRunning()
# Network is running now, send initials immediately
do @sendInitials
else if not @isStopped()
# Network has finished but hasn't been stopped, set
# started and set
@setStarted true
do @sendInitials
@connections.push socket

callback()
init =
socket: socket
data: initializer.from.data
@initials.push init
@nextInitials.push init

if @isRunning()
# Network is running now, send initials immediately
do @sendInitials
else if not @isStopped()
# Network has finished but hasn't been stopped, set
# started and set
@setStarted true
do @sendInitials

callback()

removeInitial: (initializer, callback) ->
for connection in @connections
Expand Down

0 comments on commit 1ffcec8

Please sign in to comment.