Skip to content
This repository has been archived by the owner on Sep 23, 2021. It is now read-only.

Commit

Permalink
Starting to develop improved network definitions - currently broken!
Browse files Browse the repository at this point in the history
  • Loading branch information
jpaulm committed Jun 1, 2016
1 parent 200da1d commit 361a4ac
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 25 deletions.
7 changes: 7 additions & 0 deletions .project
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>jsfbp</name>
<comment>JavaScript Implementation of Flow-Based Programming (FBP), which is a particular form of dataflow programming based on bounded buffers, information packets with defined lifetimes, named ports, and separate definition of connections.</comment>
<projects>
</projects>
</projectDescription>
2 changes: 1 addition & 1 deletion core/InputPort.js
Expand Up @@ -8,7 +8,7 @@ var IP = require('./IP')
var InputPort = module.exports = function () {
this.name = null;
this.conn = null; // either ProcessConnection or IIPConnection
//this.closed = false;
//this.closed = false;
};

InputPort.prototype.setRuntime = function (runtime) {
Expand Down
81 changes: 76 additions & 5 deletions core/Network.js
Expand Up @@ -3,13 +3,80 @@ var IIPConnection = require('./IIPConnection')
, OutputPort = require('./OutputPort')
, path = require('path')
, Process = require('./Process')
, ProcessConnection = require('./ProcessConnection');
, ProcessConnection = require('./ProcessConnection')
, parseFBP = require('parsefbp')
, trace = require('./trace');

var Network = module.exports = function () {
this._processes = [];
};

Network.prototype.run = function (runtime, options, callback) {
/*
* This function provides support for loading
* - components that come _with_ this module -> './components/copier.js'
* - components that are inside a package -> 'package/component'
* - components that are simply a node module -> 'component'
*/
function loadComponent(componentName) {
var moduleLocation = componentName;
var componentField;
if(componentName.match('^[.]{1,2}/')) {
moduleLocation = path.resolve(path.join(__dirname, '..', componentName));
} else if(componentName.indexOf('/') >= 0) {
moduleLocation = componentName.slice(0,componentName.indexOf('/'));
componentField = componentName.slice(componentName.indexOf('/')+1);
if(moduleLocation === 'jsfbp') {
moduleLocation = path.resolve(path.join(__dirname, '../components/', componentField +'.js'));
componentField = undefined;
}
}
var component = require(moduleLocation);
if(componentField) {
return component[componentField]
} else {
return component;
}
}

function getPort(connectionEnd) {
var port = connectionEnd.port;
if('index' in connectionEnd) {
port += '['+connectionEnd.index+']';
}
return port;
}

Network.createFromGraph = function(graphString) {
var graphDefinition = parseFBP(graphString, {caseSensitive: true});

var network = new Network();
var processes = {};

Object.keys(graphDefinition.processes).forEach(function(processName) {
var processDefinition = graphDefinition.processes[processName];
processes[processName] = network.defProc(processDefinition.component, processName);
});

graphDefinition.connections.forEach(function(connection){
var target = connection.tgt;
if('data' in connection) {
network.initialize(processes[target.process], getPort(target), connection.data);
} else {
var source = connection.src;
network.connect(processes[source.process], getPort(source), processes[target.process], getPort(target));
}

});
return network;
};

Network.prototype.getProcessByName = function (processName) {
return this._processes.find(function(currentProcess) {
return currentProcess.name === processName;
});
};

Network.prototype.run = function(runtime, options, callback) {
options = options || {};
function setPortRuntime(port) {
port[1].setRuntime(runtime);
Expand All @@ -19,15 +86,19 @@ Network.prototype.run = function (runtime, options, callback) {
process.inports.forEach(setPortRuntime);
process.outports.forEach(setPortRuntime);
});
runtime.run(this._processes, options, callback || function () {
});
runtime.run(this._processes, options, callback || function(){});
};

Network.prototype.defProc = function (func, name) {
if (typeof func === "string") {
func = require(path.resolve(path.join(__dirname, '..', func)));
func = loadComponent(func);
}
if(!func) {
throw new Error("No function passed to defProc");
}
var proc = new Process(name || func.name, func);
trace('Created Process with name: ' + proc.name);

this._processes.push(proc);
return proc;
};
Expand Down
4 changes: 2 additions & 2 deletions core/OutputPort.js
Expand Up @@ -6,7 +6,7 @@ var Fiber = require('fibers')
var OutputPort = module.exports = function () {
this.name = null;
this.conn = null;
this.closed = false;
this.closed = false;
};

OutputPort.prototype.setRuntime = function (runtime) {
Expand Down Expand Up @@ -37,7 +37,7 @@ OutputPort.prototype.send = function (ip) {
conn.down.status == ProcessStatus.DORMANT ||
conn.down.status == ProcessStatus.WAITING_TO_FIPE) {
conn.down.status = ProcessStatus.READY_TO_EXECUTE;
this._runtime.pushToQueue(conn.down);
this._runtime.pushToQueue(conn.down); // xxxxxx
}
if (conn.usedslots == conn.array.length) {
proc.status = ProcessStatus.WAITING_TO_SEND;
Expand Down
44 changes: 31 additions & 13 deletions core/runtimes/FiberRuntime/index.js
Expand Up @@ -7,7 +7,7 @@ Fiber.prototype.fbpProc = null;
var FiberRuntime = module.exports = function () {
this._queue = [];
this._count = null;
this._tracing = false;
this._tracing = false;
};

FiberRuntime.prototype.isTracing = function () {
Expand Down Expand Up @@ -92,9 +92,9 @@ FiberRuntime.prototype.runAsyncCallback = function (cb) {
};

FiberRuntime.prototype.run = function (processes, options, callback) {
this._list = processes;
this._list = processes;
this._count = this._list.length;

this._tracing = global.tracing = Boolean(options.trace);

var self = this;
Expand All @@ -119,7 +119,7 @@ FiberRuntime.prototype._createFiber = function (process) {
process.fiber = new Fiber(process.func.bind(process, this));
process.fiber.fbpProc = process;
process.status = Process.Status.ACTIVE;

return process;
};

Expand All @@ -139,20 +139,28 @@ FiberRuntime.prototype._hasDeadLock = function () {
FiberRuntime.prototype._genInitialQueue = function () {
var self = this;
var queue = [];

for (var i = 0; i < self._list.length; i++) {
var keys = Object.keys(self._list);
var m = keys.length;
//console.log(m); //xxxx
//console.log(self._list);
for (var i = 0; i < m; i++) {
var selfstarting = true;
for (var j = 0; j < self._list[i].inports.length; j++) {
var k = self._list[i].inports[j];
var prop = keys[i];
//console.log(self._list[prop]); //?
for (var j = 0; j < Object.keys(self._list[prop].inports).length; j++) {
var k = self._list[prop].inports[j];
//console.log(k); //xxx
if (!(k[1].conn instanceof IIPConnection)) {
selfstarting = false;
}
}

if (selfstarting) {
queue.push(self._list[i]);
//console.log(self._list[prop]);
queue.push(self._list[prop]);
}
}

return queue;
};

Expand Down Expand Up @@ -189,7 +197,17 @@ FiberRuntime.prototype._procState = function (proc) {
// Fibre running scheduler
FiberRuntime.prototype._actualRun = function () {
this._queue = this._genInitialQueue();

function setPortRuntime(port) {
console.log('setportruntime');
console.log(runtime);
port[1].setRuntime(runtime);
}
console.log('setportruntimes');
console.log(this._list);
this._list.forEach(function (process) {
process.inports.forEach(setPortRuntime).bind(this);
process.outports.forEach(setPortRuntime).bind(this);
}.bind(this));
while (true) {
this._tick();

Expand All @@ -199,7 +217,7 @@ FiberRuntime.prototype._actualRun = function () {

if (this._hasDeadLock()) {
console.log('Deadlock detected');
for (var i = 0; i < this._list.length; i++) {
for (var i = 0; i < Object.keys(this._list).length; i++) {
console.log('- Process status: '
+ this._list[i].getStatusString() + ' - '
+ this._list[i].name);
Expand All @@ -211,8 +229,8 @@ FiberRuntime.prototype._actualRun = function () {
};

FiberRuntime.prototype._tick = function () {

var x = this._queue.shift();
var x = this._queue.shift();

while (x != undefined) {

Expand Down
1 change: 1 addition & 0 deletions examples/components/gendata.js
Expand Up @@ -9,6 +9,7 @@ module.exports = function gendata() {
//console.log(count);
for (var i = 0; i < count; i++) {
ip = this.createIP(i + 'abcd');
console.log(outport);
if (-1 == outport.send(ip)) {
return;
}
Expand Down
11 changes: 7 additions & 4 deletions examples/fbptest01.js
Expand Up @@ -3,15 +3,18 @@ var fbp = require('..');
// --- define network ---
var network = new fbp.Network();

var gendata = network.defProc('./examples/components/gendata.js');
var copier = network.defProc('./components/copier.js');
var recvr = network.defProc('./components/recvr.js');
// var recvr = fbp.defProc(require('../components/recvr.js'), 'recvr'); // equivalent
var gendata = network.defProc('./examples/components/gendata.js', 'Gen');
var copier = network.defProc('./components/copier.js', 'Copy');
var recvr = network.defProc('./components/recvr.js', 'Recvr');

network.initialize(gendata, 'COUNT', '2000');
network.connect(gendata, 'OUT', copier, 'IN', 5);
network.connect(copier, 'OUT', recvr, 'IN', 5);

//network.sinitialize('Gen.COUNT', '2000');
//network.sconnect('Gen.OUT', 'Copy.IN', 5);
//network.sconnect('Copy.OUT', 'Recvr.IN', 5);

// --- run ---
var fiberRuntime = new fbp.FiberRuntime();
network.run(fiberRuntime, {trace: false});

0 comments on commit 361a4ac

Please sign in to comment.