From 2723d5beb4a7bd093aa15470b6fda405a059f171 Mon Sep 17 00:00:00 2001 From: Damon Oehlman Date: Fri, 24 Oct 2014 12:47:04 +1100 Subject: [PATCH] Initial commit --- .gitignore | 1 + index.js | 0 package.json | 38 +++++++++++++++++++++++ source.js | 70 +++++++++++++++++++++++++++++++++++++++++++ test/all.js | 1 + test/helpers/wsurl.js | 3 ++ test/read.js | 22 ++++++++++++++ test/server.js | 26 ++++++++++++++++ 8 files changed, 161 insertions(+) create mode 100644 .gitignore create mode 100644 index.js create mode 100644 package.json create mode 100644 source.js create mode 100644 test/all.js create mode 100644 test/helpers/wsurl.js create mode 100644 test/read.js create mode 100644 test/server.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/index.js b/index.js new file mode 100644 index 0000000..e69de29 diff --git a/package.json b/package.json new file mode 100644 index 0000000..f77e699 --- /dev/null +++ b/package.json @@ -0,0 +1,38 @@ +{ + "name": "pull-ws", + "version": "1.0.0", + "description": "Simple pull-streams for websocket client connections", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "repository": { + "type": "git", + "url": "https://github.com/DamonOehlman/pull-ws.git" + }, + "keywords": [ + "pull-stream", + "websocket", + "ws" + ], + "author": "Damon Oehlman ", + "license": "ISC", + "bugs": { + "url": "https://github.com/DamonOehlman/pull-ws/issues" + }, + "homepage": "https://github.com/DamonOehlman/pull-ws", + "devDependencies": { + "mapleTree": "^0.5.1", + "pull-stream": "^2.26.0", + "tape": "^3.0.1", + "ws": "git://github.com/DamonOehlman/ws#close-terminate-initial-request", + "wsurl": "^1.0.0" + }, + "dependencies": { + "pull-core": "^1.0.0" + }, + "testling": { + "files": "test/all.js", + "server": "test/server.js" + } +} diff --git a/source.js b/source.js new file mode 100644 index 0000000..b7ec923 --- /dev/null +++ b/source.js @@ -0,0 +1,70 @@ +var pull = require('pull-core'); +var EOF = []; + +/** + ### `pull-ws/source(socket)` + + Create a pull-stream `Source` that will read data from the `socket`. + + <<< examples/read.js + +**/ +module.exports = pull.Source(function(socket) { + var buffer = []; + var receiver; + + socket.addEventListener('message', function(evt) { + if (receiver) { + return receiver(null, evt.data); + } + + buffer[buffer.length] = evt.data; + }); + + socket.addEventListener('close', function(evt) { + if (receiver) { + return receiver(true); + } + + buffer.push(EOF); + }); + + function read(end, cb) { + + function handleOpen(evt) { + socket.removeEventListener('open', handleOpen); + read(end, cb); + } + + // reset the receiver + receiver = null; + + // if ended, abort + if (end) { + return cb && cb(end); + } + + // if connecting then wait + if (socket.readyState === 0) { + return socket.addEventListener('open', handleOpen); + } + + // if the socket is closing or closed, return end + if (socket.readyState >= 2) { + return cb(true); + } + + // read from the socket + if (buffer.length > 0) { + if (buffer[0] === EOF) { + return cb(true); + } + + return cb(null, buffer.shift()); + } + + receiver = cb; + }; + + return read; +}); diff --git a/test/all.js b/test/all.js new file mode 100644 index 0000000..b057609 --- /dev/null +++ b/test/all.js @@ -0,0 +1 @@ +require('./read'); diff --git a/test/helpers/wsurl.js b/test/helpers/wsurl.js new file mode 100644 index 0000000..9ef0fed --- /dev/null +++ b/test/helpers/wsurl.js @@ -0,0 +1,3 @@ +var wsurl = require('wsurl'); + +module.exports = wsurl(typeof window != 'undefined' ? location.origin : 'http://localhost:3000'); diff --git a/test/read.js b/test/read.js new file mode 100644 index 0000000..b243213 --- /dev/null +++ b/test/read.js @@ -0,0 +1,22 @@ +var test = require('tape'); +var WebSocket = require('ws'); +var endpoint = require('./helpers/wsurl') + '/read'; +var pull = require('pull-stream'); +var ws = require('../source'); +var socket; + +test('create a websocket connection to the server', function(t) { + t.plan(1); + + socket = new WebSocket(endpoint); + socket.onopen = t.pass.bind(t, 'socket ready'); +}); + +test('read values from the socket and end normally', function(t) { + t.plan(2); + + ws(socket).pipe(pull.collect(function(err, values) { + t.ifError(err); + t.deepEqual(values, ['a', 'b', 'c', 'd']); + })); +}); diff --git a/test/server.js b/test/server.js new file mode 100644 index 0000000..7087e1f --- /dev/null +++ b/test/server.js @@ -0,0 +1,26 @@ +var WebSocketServer = require('ws').Server; +var mapleTree = require('mapleTree'); +var port = process.env.ZUUL_PORT || process.env.PORT || 3000; +var wss = new WebSocketServer({ port: port }); +var router = new mapleTree.RouteTree(); + +router.define('/read', function(ws) { + var values = ['a', 'b', 'c', 'd']; + var timer = setInterval(function() { + var next = values.shift(); + if (next) { + ws.send(next); + } + else { + clearInterval(next); + ws.close(); + } + }, 100); +}); + +wss.on('connection', function(ws) { + var match = router.match(ws.upgradeReq.url); + if (match && typeof match.fn == 'function') { + match.fn(ws); + } +});