Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
DamonOehlman committed Oct 24, 2014
0 parents commit 2723d5b
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -0,0 +1 @@
node_modules
Empty file added index.js
Empty file.
38 changes: 38 additions & 0 deletions package.json
@@ -0,0 +1,38 @@
{
"name": "pull-ws",
"version": "1.0.0",
"description": "Simple pull-streams for websocket client connections",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": "https://github.com/DamonOehlman/pull-ws.git"
},
"keywords": [
"pull-stream",
"websocket",
"ws"
],
"author": "Damon Oehlman <damon.oehlman@gmail.com>",
"license": "ISC",
"bugs": {
"url": "https://github.com/DamonOehlman/pull-ws/issues"
},
"homepage": "https://github.com/DamonOehlman/pull-ws",
"devDependencies": {
"mapleTree": "^0.5.1",
"pull-stream": "^2.26.0",
"tape": "^3.0.1",
"ws": "git://github.com/DamonOehlman/ws#close-terminate-initial-request",
"wsurl": "^1.0.0"
},
"dependencies": {
"pull-core": "^1.0.0"
},
"testling": {
"files": "test/all.js",
"server": "test/server.js"
}
}
70 changes: 70 additions & 0 deletions source.js
@@ -0,0 +1,70 @@
var pull = require('pull-core');
var EOF = [];

/**
### `pull-ws/source(socket)`
Create a pull-stream `Source` that will read data from the `socket`.
<<< examples/read.js
**/
module.exports = pull.Source(function(socket) {
var buffer = [];
var receiver;

socket.addEventListener('message', function(evt) {
if (receiver) {
return receiver(null, evt.data);
}

buffer[buffer.length] = evt.data;
});

socket.addEventListener('close', function(evt) {
if (receiver) {
return receiver(true);
}

buffer.push(EOF);
});

function read(end, cb) {

function handleOpen(evt) {
socket.removeEventListener('open', handleOpen);
read(end, cb);
}

// reset the receiver
receiver = null;

// if ended, abort
if (end) {
return cb && cb(end);
}

// if connecting then wait
if (socket.readyState === 0) {
return socket.addEventListener('open', handleOpen);
}

// if the socket is closing or closed, return end
if (socket.readyState >= 2) {
return cb(true);
}

// read from the socket
if (buffer.length > 0) {
if (buffer[0] === EOF) {
return cb(true);
}

return cb(null, buffer.shift());
}

receiver = cb;
};

return read;
});
1 change: 1 addition & 0 deletions test/all.js
@@ -0,0 +1 @@
require('./read');
3 changes: 3 additions & 0 deletions test/helpers/wsurl.js
@@ -0,0 +1,3 @@
var wsurl = require('wsurl');

module.exports = wsurl(typeof window != 'undefined' ? location.origin : 'http://localhost:3000');
22 changes: 22 additions & 0 deletions test/read.js
@@ -0,0 +1,22 @@
var test = require('tape');
var WebSocket = require('ws');
var endpoint = require('./helpers/wsurl') + '/read';
var pull = require('pull-stream');
var ws = require('../source');
var socket;

test('create a websocket connection to the server', function(t) {
t.plan(1);

socket = new WebSocket(endpoint);
socket.onopen = t.pass.bind(t, 'socket ready');
});

test('read values from the socket and end normally', function(t) {
t.plan(2);

ws(socket).pipe(pull.collect(function(err, values) {
t.ifError(err);
t.deepEqual(values, ['a', 'b', 'c', 'd']);
}));
});
26 changes: 26 additions & 0 deletions test/server.js
@@ -0,0 +1,26 @@
var WebSocketServer = require('ws').Server;
var mapleTree = require('mapleTree');
var port = process.env.ZUUL_PORT || process.env.PORT || 3000;
var wss = new WebSocketServer({ port: port });
var router = new mapleTree.RouteTree();

router.define('/read', function(ws) {
var values = ['a', 'b', 'c', 'd'];
var timer = setInterval(function() {
var next = values.shift();
if (next) {
ws.send(next);
}
else {
clearInterval(next);
ws.close();
}
}, 100);
});

wss.on('connection', function(ws) {
var match = router.match(ws.upgradeReq.url);
if (match && typeof match.fn == 'function') {
match.fn(ws);
}
});

0 comments on commit 2723d5b

Please sign in to comment.