diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..66a4d2a --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2015 Mathias Buus + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..29a5a9d --- /dev/null +++ b/README.md @@ -0,0 +1,64 @@ +# discovery-swarm + +A network swarm that uses discovery-channel to find peers + +``` +npm install discovery-swarm +``` + +## Usage + +``` js +var swarm = require('discovery-swarm') + +var sw = swarm() + +sw.listen(1000) +sw.add('hello') // a name + +sw.on('connection', function (connection) { + console.log('found peer') +}) +``` + +## API + +#### `var sw = swarm()` + +Create a new swarm + +#### `sw.add(name)` + +Join a swarm + +#### `sw.remove(name)` + +Leave a swarm + +#### `sw.peersQueued` + +Number of peers discovered but not connected to yet + +#### `sw.peersConnecting` + +Number of peers we are trying to connect to + +#### `sw.peersConnected` + +Number of peers we are actively connected to. Same as `sw.connections.length`. + +#### `sw.connections` + +List of active connections to other peers + +#### `sw.on('connection', connection)` + +Emitted when you connect to another peer + +#### `sw.listen(port)` + +Listen on a specific port. Should be called before add + +## License + +MIT diff --git a/example.js b/example.js new file mode 100644 index 0000000..488fc1e --- /dev/null +++ b/example.js @@ -0,0 +1,14 @@ +var swarm = require('./') + +var ids = [1,2,3,4,5] + +ids.forEach(function (id) { + var s = swarm({maxConnections: 2}) + + s.listen(10000 + id) + s.add(Buffer('hello')) + + s.on('connection', function (connection, type) { + console.log(id, 'connect', type) + }) +}) diff --git a/index.js b/index.js new file mode 100644 index 0000000..43dec10 --- /dev/null +++ b/index.js @@ -0,0 +1,179 @@ +var DC = require('discovery-channel') +var net = require('net') +try { + var utp = require('utp-native') +} catch { + var utp = null +} +var connections = require('connections') +var lpmessage = require('length-prefixed-message') +var crypto = require('crypto') +var events = require('events') +var util = require('util') + +var CONNECT_TIMEOUT = 3000 + +module.exports = Swarm + +function Swarm (opts) { + if (!(this instanceof Swarm)) return new Swarm(opts) + if (!opts) opts = {} + + events.EventEmitter.call(this) + + var self = this + + this._discovery = opts.discovery || DC(opts) + this._discovery.on('peer', function (hash, peer) { + var peerId = peer.host + ':' + peer.port + + if (self._peersSeen[peerId]) return + self._peersSeen[peerId] = true + self._peersQueued.push(peer) + connectPeer() + }) + + this._peersQueued = [] + this._peersSeen = {} + + this._tcpServer = net.createServer(onconnection) + this._utpServer = utp && opts.utp && utp.createServer(onconnection) + this._outboundConnections = {} + this._inboundConnections = {} + this._banned = {} + this._port = 0 + + this.maxConnections = opts.maxConnections || 100 + this._tcpServer.maxConnections = this.maxConnections + if (this._utpServer) this._utpServer.maxConnections = this.maxConnections + + this.id = opts.id || crypto.randomBytes(32) + var servers = this._utpServer ? [this._tcpServer, this._utpServer] : [this._tcpServer] + + this.allConnections = connections(servers) + this._connections = connections([]) + + this.allConnections.on('close', connectPeer) + + this._connections.on('close', function (connection) { + delete self._outboundConnections[connection.remoteId] + delete self._inboundConnections[connection.remoteId] + connectPeer() + }) + + this._connections.on('connection', function (connection) { + var type = self._utpServer && (connection._utp === self._utpServer ? 'utp' : 'tcp') + self.emit('connection', connection, type) + }) + + this.connections = this._connections.sockets + + function connectPeer () { + if (self.connections.length >= self.maxConnections) return + if (self.allConnections.length >= self.maxConnections) return + + var peer = self._peersQueued.shift() + if (!peer) return + + var tcpSocket = net.connect(peer.port, peer.host) + var utpSocket = self._utpServer && self._utpServer.connect(peer.port, peer.host) + var timeout = setTimeout(ontimeout, CONNECT_TIMEOUT) + + if (utpSocket) { + self.allConnections.add(utpSocket) + utpSocket.on('connect', onconnect) + utpSocket.on('error', onerror) + } + + self.allConnections.add(tcpSocket) + tcpSocket.on('connect', onconnect) + tcpSocket.on('error', onerror) + + function onconnect () { + clearTimeout(timeout) + var other = this === tcpSocket ? utpSocket : tcpSocket + if (other) other.destroy() + self.allConnections.add(this) + onconnection(this, peer) + } + + function onerror (err) { + this.destroy() + } + + function ontimeout () { + tcpSocket.destroy() + utpSocket.destroy() + } + } + + function onconnection (connection, peer) { + connection.on('error', function () { + connection.destroy() + }) + + lpmessage.write(connection, self.id) + lpmessage.read(connection, function (remoteId) { + var idHex = self.id.toString('hex') + var remoteIdHex = remoteId.toString('hex') + + if (idHex === remoteIdHex) { + if (peer) self.banPeer(peer) + connection.destroy() + return + } + + if (self._inboundConnections[remoteIdHex] || self._outboundConnections[remoteIdHex]) { + // TODO: maybe destroy the old one? + connection.destroy() + return + } + + if (peer) { + self._outboundConnections[remoteIdHex] = true + } else { + self._inboundConnections[remoteIdHex] = true + } + + connection.peer = peer + connection.remoteId = remoteId + self._connections.add(connection) + }) + } +} + +util.inherits(Swarm, events.EventEmitter) + +Swarm.prototype.__defineGetter__('peersQueued', function () { + return this._peersQueued.length +}) + +Swarm.prototype.__defineGetter__('peersConnected', function () { + return this.connections.length +}) + +Swarm.prototype.__defineGetter__('peersConnecting', function () { + return this.allConnections.length - this.connections.length +}) + +Swarm.prototype.banPeer = function (peer) { + this._banned[peer.host + ':' + peer.port] = true +} + +Swarm.prototype.addPeer = function (peer) { + this._peersQueued.push(peer) +} + +Swarm.prototype.add = function (hash) { + this._discovery.add(hash, this._port) +} + +Swarm.prototype.remove = function (hash) { + this._discovery.remove(hash, this._port) +} + +Swarm.prototype.listen = function (port) { + this._port = port + this._tcpServer.listen(port) + if (this._utpServer) this._utpServer.listen(port) +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..2380cb6 --- /dev/null +++ b/package.json @@ -0,0 +1,20 @@ +{ + "name": "discovery-swarm", + "version": "0.0.0", + "description": "A network swarm that uses discovery-channel to find peers", + "main": "index.js", + "dependencies": { + "discovery-channel": "^2.1.0" + }, + "devDependencies": {}, + "repository": { + "type": "git", + "url": "https://github.com/mafintosh/discovery-swarm.git" + }, + "author": "Mathias Buus (@mafintosh)", + "license": "MIT", + "bugs": { + "url": "https://github.com/mafintosh/discovery-swarm/issues" + }, + "homepage": "https://github.com/mafintosh/discovery-swarm" +}