Skip to content

Commit

Permalink
Merge pull request #40 from telehash/0.3.28
Browse files Browse the repository at this point in the history
npmjs updated to 0.3.29
  • Loading branch information
Ryan Bennett committed Jul 9, 2015
2 parents d61a39d + 8127d2a commit 9251f4a
Show file tree
Hide file tree
Showing 22 changed files with 1,778 additions and 1,030 deletions.
9 changes: 6 additions & 3 deletions ext/path.js
Expand Up @@ -24,10 +24,13 @@ exports.mesh = function(mesh, cbExt)
link.addPath(path,pong);
});
}

ext.link = function(link, cbLink)
{
// util to force a path sync
/** force a path sync to confirm connectivity and record latency for paths
* @memberOf TLink
* @param {function} done - called when/if the first 'pong' is received, receives status and latency as arguments
*/
link.ping = function(done)
{
if(typeof done != 'function') done = function(){};
Expand All @@ -53,7 +56,7 @@ exports.mesh = function(mesh, cbExt)
}

// auto-ping on first status
link.onStatus.push(link.ping);
link.on('status',link.ping);

cbLink();
}
Expand Down
15 changes: 10 additions & 5 deletions ext/peer.js
Expand Up @@ -12,7 +12,7 @@ exports.mesh = function(mesh, cbExt)
var pipes = [];
var peer = {};
peer.open = {};

// actually create/return the pipe
function piper(to, cbPiper)
{
Expand All @@ -24,13 +24,13 @@ exports.mesh = function(mesh, cbExt)
pipe.path = {type:'peer',hn:to};

// handle any peer delivery through the router
pipe.onSend = function(packet, link, cbSend)
function peer_send(packet, link, cbSend)
{
var router = mesh.index[to];
if(!router) return cbSend('cannot peer to an unknown router: '+pipe.to);
if(!router.x) return cbSend('cannot peer yet via this router: '+pipe.to);
if(!link) return cbSend('requires link');

// no packet means try to send our keys
if(!packet)
{
Expand All @@ -44,7 +44,7 @@ exports.mesh = function(mesh, cbExt)
});
return;
}

// if it's an encrypted channel packet, pass through direct to router
if(packet.head.length == 0) return router.x.sending(packet);

Expand All @@ -56,6 +56,11 @@ exports.mesh = function(mesh, cbExt)
cbSend();
}


pipe.on('send', function(context,a1, a2, a3, a4){
peer_send.call(context,a1,a2,a3,a4)
})

cbPiper(pipe);
}

Expand Down Expand Up @@ -94,7 +99,7 @@ exports.mesh = function(mesh, cbExt)
log.debug('sending connect to',to.hashname,json,open.body);
to.x.send({json:json,body:open.body});
}

// handle incoming connect requests
peer.open.connect = function(args, open, cbOpen){
var via = this;
Expand Down
137 changes: 137 additions & 0 deletions ext/stream.class.js
@@ -0,0 +1,137 @@
var Duplex = require('stream').Duplex;
var util = require("util");
var lob = require('lob-enc');

module.exports = ChanStream


function ChanStream(chan, encoding){
if(!encoding) encoding = 'binary';
if(typeof chan != 'object' || !chan.isChannel)
{
mesh.log.warn('invalid channel passed to streamize');
return false;
}

var allowHalfOpen = (chan.type === "thtp") ? true : false;

Duplex.call(this,{allowHalfOpen: allowHalfOpen, objectMode:true})
this.on('finish',function(){
console.log("finish")
chan.send({json:{end:true}});
});

this.on('error',function(err){
if(err == chan.err) return; // ignore our own generated errors
console.log('streamized error',err);
chan.send({json:{err:err.toString()}});
});
var stream = this

this.on('pipe', function(from){
from.on('end',function(){
console.log("srteam from pipe end")
stream.end()
})
})


chan.receiving = chan_to_stream(this);

this._chan = chan;
this._encoding = encoding;

return this;
}

util.inherits(ChanStream, Duplex)



ChanStream.prototype._read = function(size){
if(this._getNextPacket) this._getNextPacket();

this._getNextPacket = false;
};

ChanStream.prototype._write = function(data,enc,cbWrite)
{
if(this._chan.state == 'gone') return cbWrite('closed');
// switch to our default encoding syntax
// dynamically detect object streams and change encoding
if(!Buffer.isBuffer(data) && typeof data != 'string')
{
data = JSON.stringify(data);
this._encoding = 'json';
}
// fragment it
while(data.length)
{
var frag = data.slice(0,1000);
data = data.slice(1000);
var packet = {json:{},body:frag};
// last packet gets continuation callback
if(!data.length)
{
if(enc != 'binary') packet.json.enc = this._encoding;
packet.callback = cbWrite;
}else{
packet.json.frag = true;
}
this._chan.send(packet);
}
}

function chan_to_stream (stream){
var data = new Buffer(0);

return function receiving(err, packet, getNextPacket) {
// was a wait writing, let it through

if(err)
stream.emit('error',err);

if(packet.body.length || data.length)
{
data = Buffer.concat([data,packet.body]);
if(!packet.json.frag)
{
var body = data;
data = new Buffer(0);
if(packet.json.enc == 'json') try{
body = JSON.parse(body)
}catch(E){
console.log('stream json frag parse error',E,body.toString());
err = E;
}
if(packet.json.enc == 'lob')
{
var packet = lob.decode(body);
if(!packet)
{
mesh.log.warn('stream lob frag decode error',body.toString('hex'));
err = 'lob decode failed';
}else{
body = packet;
}
}


// stream consumer is not ready for another packet yet, so hold on
// before getting more to send to readable...
if(!err && !stream.push(body))
stream._getNextPacket = getNextPacket;
}
}

//the packet has been read by stream consumer, so get the next one
if(!stream._getNextPacket)
getNextPacket();

//close the stream if this is the last packet
if(packet.json.end)
stream.push(null);


};
}
114 changes: 19 additions & 95 deletions ext/stream.js
@@ -1,5 +1,6 @@
var Duplex = require('stream').Duplex;

var lob = require('lob-enc');
var ChannelStream = require("./stream.class.js")

// implements https://github.com/telehash/telehash.org/blob/v3/v3/channels/stream.md
exports.name = 'stream';
Expand All @@ -8,107 +9,25 @@ exports.mesh = function(mesh, cbExt)
{
var ext = {open:{}};

// incoming stream requests go here
/** attach an incoming stream handler to the mesh
* @memberOf Mesh
* @param {function} onStream - handler for incoming streams
*/
mesh.stream = function(onStream)
{
mesh.log.debug('adding onStream handler',typeof onStream);
ext.onStream = onStream;
}

// takes any channel and returns a Duplex stream, oneshot is thtp style (one packet/channel)
/** takes any channel and returns a Duplex stream,
* @memberOf Mesh
* @param {Channel} channel - the channel to streamify
* @param {string} encoding - 'binary' or 'json'
* @return {ChannelStream}
*/
mesh.streamize = function(chan, encoding)
{
if(!encoding) encoding = 'binary';
if(typeof chan != 'object' || !chan.isChannel)
{
mesh.log.warn('invalid channel passed to streamize');
return false;
}

var stream = new Duplex({allowHalfOpen:true, objectMode:true});
stream.on('finish',function(){
chan.send({json:{end:true}});
});

stream.on('error',function(err){
if(err == chan.err) return; // ignore our own generated errors
mesh.log.debug('streamized error',err);
chan.send({json:{err:err.toString()}});
});

stream._write = function(data,enc,cbWrite)
{
if(chan.state == 'gone') return cbWrite('closed');
// switch to our default encoding syntax
enc = encoding;
// dynamically detect object streams and change encoding
if(!Buffer.isBuffer(data) && typeof data != 'string')
{
data = JSON.stringify(data);
enc = 'json';
}
// fragment it
while(data.length)
{
var frag = data.slice(0,1000);
data = data.slice(1000);
var packet = {json:{},body:frag};
// last packet gets continuation callback
if(!data.length)
{
if(enc != 'binary') packet.json.enc = enc;
packet.callback = cbWrite;
}else{
packet.json.frag = true;
}
chan.send(packet);
}
}

// handle backpressure flag from the pipe.push()
var more = false;
stream._read = function(size){
if(more) more();
more = false;
};

var data = new Buffer(0);
chan.receiving = function(err, packet, cbMore) {
// was a wait writing, let it through
if(packet.body.length || data.length)
{
data = Buffer.concat([data,packet.body]);
if(!packet.json.frag)
{
var body = data;
data = new Buffer(0);
if(packet.json.enc == 'json') try{
body = JSON.parse(body)
}catch(E){
mesh.log.warn('stream json frag parse error',E,body.toString());
err = E;
}
if(packet.json.enc == 'lob')
{
var packet = mesh.lib.lob.decode(body);
if(!packet)
{
mesh.log.warn('stream lob frag decode error',body.toString('hex'));
err = 'lob decode failed';
}else{
body = packet;
}
}

if(!err && !stream.push(body)) more = cbMore;
}
}
if(err) return stream.emit('error',err);
if(packet.json.end) stream.push(null);
if(!more) cbMore();
}

return stream;
return new ChannelStream(chan, encoding);
}

// new incoming stream open request
Expand All @@ -126,7 +45,12 @@ exports.mesh = function(mesh, cbExt)

ext.link = function(link, cbLink)
{
// create a new stream to this link
/** create a new stream to this link, and send the first packet
* @memberOf TLink
* @param {Buffer|object=} packet - binary/json packet body
* @param {string} encoding - 'binary' or 'json'
* @return {ChannelStream}
*/
link.stream = function(packet, encoding)
{
var open = {json:{type:'stream'},body:packet};
Expand Down

0 comments on commit 9251f4a

Please sign in to comment.