Permalink
Browse files

Base system for FBP

  • Loading branch information...
bergie committed Jun 6, 2011
0 parents commit 04698a77272d9cd552ac57ca511ec8f05696ea40
Showing with 98 additions and 0 deletions.
  1. +25 −0 internalSocket.coffee
  2. +73 −0 noflo.coffee
@@ -0,0 +1,25 @@
# By default we use NoFlo-internal sockets for connections between workers
events = require "events"

class internalSocket extends events.EventEmitter
connected: false

initialize: ->
@emit "initialize", @

connect: ->
@connected = true
@emit "connect", @

send: (data) ->
@emit "data", data

disconnect: ->
@connected = false
@emit "disconnect", @

isConnected: ->
@connected

exports.createSocket = ->
new internalSocket()
@@ -0,0 +1,73 @@
# The main NoFlo runner

internalSocket = require "./internalSocket"

processes = []
connections = []
sockets = []

initializeNode = (node) ->
process = {}

if node.component
process.component = require "./components/#{node.component}"

process.id = node.id

unless node.out
node.out = []

for port of process.component.getOutputs()
if node[port]
connections.push
process: process
from: port
to: node[port]

if node.config and process.component.initialize
process.component.initialize node.config

processes.push process

getProcess = (id) ->
for process in processes
if process.id is id
return process
null

connectProcess = (connection) ->
socket = internalSocket.createSocket()

outputs = connection.process.component.getOutputs()
unless outputs[connection.from]
console.error "No such outbound port #{connection.from} in #{process.id}"
return
outputs[connection.from] socket

target = getProcess connection.to[0]
unless target
console.error "No such process #{connection.to[0]}"
return
inputs = target.component.getInputs()
unless inputs[connection.to[1]]
console.error "No such inbound port #{connection.to[1]} in #{target.id}"
inputs[connection.to[1]] socket

sockets.push socket

socket.on "connect", ->
console.error " CONN #{connection.process.id}:#{connection.from} -> #{target.id}:#{connection.to[1]}"

socket.on "data", (data) ->
console.error " DATA #{connection.process.id}:#{connection.from} -> #{target.id}:#{connection.to[1]}"

socket.on "disconnect", ->
console.error " DISC #{connection.process.id}:#{connection.from} -> #{target.id}:#{connection.to[1]}"

exports.createNetwork = (graph) ->
initializeNode node for node in graph

connectProcess connection for connection in connections

for socket in sockets
socket.initialize()

0 comments on commit 04698a7

Please sign in to comment.