diff --git a/.travis.yml b/.travis.yml index 2060f4d..bf31eea 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,5 +4,4 @@ language: node_js node_js: - "node" - - "8" - - "6" + - "10" diff --git a/index.js b/index.js index 77ca861..2c7aa48 100644 --- a/index.js +++ b/index.js @@ -2,7 +2,7 @@ var hat = require('hat') var pws = require('peer-wire-swarm') var bncode = require('bncode') var crypto = require('crypto') -var bitfield = require('bitfield') +var Bitfield = require('bitfield') var parseTorrent = require('parse-torrent') var mkdirp = require('mkdirp') var events = require('events') @@ -10,11 +10,11 @@ var path = require('path') var fs = require('fs') var os = require('os') var eos = require('end-of-stream') -var piece = require('torrent-piece') +var Piece = require('torrent-piece') var rimraf = require('rimraf') var FSChunkStore = require('fs-chunk-store') var ImmediateChunkStore = require('immediate-chunk-store') -var peerDiscovery = require('torrent-discovery') +var PeerDiscovery = require('torrent-discovery') var bufferFrom = require('buffer-from') var blocklist = require('ip-set') @@ -24,7 +24,7 @@ var fileStream = require('./lib/file-stream') var MAX_REQUESTS = 5 var CHOKE_TIMEOUT = 5000 var REQUEST_TIMEOUT = 30000 -var SPEED_THRESHOLD = 3 * piece.BLOCK_LENGTH +var SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH var DEFAULT_PORT = 6881 var BAD_PIECE_STRIKES_MAX = 3 @@ -33,7 +33,7 @@ var BAD_PIECE_STRIKES_DURATION = 120000 // 2 minutes var RECHOKE_INTERVAL = 10000 var RECHOKE_OPTIMISTIC_DURATION = 2 -var TMP = fs.existsSync('/tmp') ? '/tmp' : os.tmpDir() +var TMP = fs.existsSync('/tmp') ? '/tmp' : os.tmpdir() var noop = function () {} @@ -108,27 +108,39 @@ var torrentStream = function (link, opts, cb) { engine._flood = opts.flood engine._pulse = opts.pulse - var discovery = peerDiscovery({ - peerId: bufferFrom(opts.id), - dht: (opts.dht !== undefined) ? opts.dht : true, - tracker: (opts.tracker !== undefined) ? opts.tracker : true, - port: DEFAULT_PORT, - announce: opts.trackers - }) + var discovery = null + var blocked = blocklist(opts.blocklist) - discovery.on('peer', function (addr) { - if (blocked.contains(addr.split(':')[0])) { - engine.emit('blocked-peer', addr) - } else { - engine.emit('peer', addr) - engine.connect(addr) - } + var startDiscovery = function () { + discovery = new PeerDiscovery({ + infoHash: engine.infoHash, + peerId: bufferFrom(opts.id), + dht: (opts.dht !== undefined) ? opts.dht : true, + tracker: (opts.tracker !== undefined) ? opts.tracker : true, + port: engine.port || DEFAULT_PORT, + announce: opts.trackers || link.announce + }) + + discovery.on('peer', function (addr) { + if (blocked.contains(addr.split(':')[0])) { + engine.emit('blocked-peer', addr) + } else { + engine.emit('peer', addr) + engine.connect(addr) + } + }) + } + + process.nextTick(function () { + // Gives the user a chance to call engine.listen(PORT) on the same tick, + // so discovery will start using the correct torrent port. + startDiscovery() }) var ontorrent = function (torrent) { var storage = opts.storage || FSChunkStore - engine.store = ImmediateChunkStore(storage(torrent.pieceLength, { + engine.store = new ImmediateChunkStore(storage(torrent.pieceLength, { files: torrent.files.map(function (file) { return { path: path.join(opts.path, file.path), @@ -138,24 +150,18 @@ var torrentStream = function (link, opts, cb) { }) })) engine.torrent = torrent - engine.bitfield = bitfield(torrent.pieces.length) + engine.bitfield = new Bitfield(torrent.pieces.length) var pieceLength = torrent.pieceLength var pieceRemainder = (torrent.length % pieceLength) || pieceLength var pieces = torrent.pieces.map(function (hash, i) { - return piece(i === torrent.pieces.length - 1 ? pieceRemainder : pieceLength) + return new Piece(i === torrent.pieces.length - 1 ? pieceRemainder : pieceLength) }) var reservations = torrent.pieces.map(function () { return [] }) - process.nextTick(function () { - // Gives the user a chance to call engine.listen(PORT) on the same tick, - // so discovery will start using the correct torrent port. - discovery.setTorrent(torrent) - }) - engine.files = torrent.files.map(function (file) { file = Object.create(file) var offsetPiece = (file.offset / torrent.pieceLength) | 0 @@ -236,7 +242,7 @@ var torrentStream = function (link, opts, cb) { var onhotswap = opts.hotswap === false ? falsy : function (wire, index) { var speed = wire.downloadSpeed() - if (speed < piece.BLOCK_LENGTH) return + if (speed < Piece.BLOCK_LENGTH) return if (!reservations[index] || !pieces[index]) return var r = reservations[index] @@ -264,7 +270,7 @@ var torrentStream = function (link, opts, cb) { for (i = 0; i < min.requests.length; i++) { var req = min.requests[i] if (req.piece !== index) continue - pieces[index].cancel((req.offset / piece.BLOCK_SIZE) | 0) + pieces[index].cancel((req.offset / Piece.BLOCK_SIZE) | 0) } engine.emit('hotswap', min, wire, index) @@ -309,7 +315,7 @@ var torrentStream = function (link, opts, cb) { var buffer = p.flush() if (sha1(buffer) !== torrent.pieces[index]) { - pieces[index] = piece(p.length) + pieces[index] = new Piece(p.length) engine.emit('invalid-piece', index, buffer) onupdatetick() @@ -353,7 +359,7 @@ var torrentStream = function (link, opts, cb) { var speed = wire.downloadSpeed() || 1 if (speed > SPEED_THRESHOLD) return thruthy - var secs = MAX_REQUESTS * piece.BLOCK_LENGTH / speed + var secs = MAX_REQUESTS * Piece.BLOCK_LENGTH / speed var tries = 10 var ptr = 0 @@ -614,14 +620,12 @@ var torrentStream = function (link, opts, cb) { if (destroyed) return swarm.resume() - // We know only infoHash here, not full infoDictionary. - // But infoHash is enough to connect to trackers and get peers. - if (!buf) return discovery.setTorrent(link) + if (!buf) return var torrent = parseTorrent(buf) // Bad cache file - fetch it again - if (torrent.infoHash !== infoHash) return discovery.setTorrent(link) + if (torrent.infoHash !== infoHash) return if (!torrent.announce || !torrent.announce.length) { opts.trackers = [].concat(opts.trackers || []).concat(link.announce || []) @@ -751,7 +755,7 @@ var torrentStream = function (link, opts, cb) { destroyed = true swarm.destroy() clearInterval(rechokeIntervalId) - discovery.stop() + if (discovery) discovery.destroy() if (engine.store && engine.store.close) { engine.store.close(cb) } else if (cb) { @@ -780,7 +784,6 @@ var torrentStream = function (link, opts, cb) { if (!port) return findPort(opts.port || DEFAULT_PORT, cb) engine.port = port swarm.listen(engine.port, cb) - discovery.updatePort(engine.port) } return engine diff --git a/lib/exchange-metadata.js b/lib/exchange-metadata.js index 96c8af8..bc8ca92 100644 --- a/lib/exchange-metadata.js +++ b/lib/exchange-metadata.js @@ -47,10 +47,10 @@ module.exports = function (engine, callback) { if (message.msg_type === 2) return if (message.msg_type === 0) { - if (!metadata) return wire.extended(channel, {msg_type: 2, piece: piece}) + if (!metadata) return wire.extended(channel, { msg_type: 2, piece: piece }) var offset = piece * METADATA_BLOCK_SIZE var buf = metadata.slice(offset, offset + METADATA_BLOCK_SIZE) - wire.extended(channel, Buffer.concat([bncode.encode({msg_type: 1, piece: piece}), buf])) + wire.extended(channel, Buffer.concat([bncode.encode({ msg_type: 1, piece: piece }), buf])) return } @@ -77,11 +77,11 @@ module.exports = function (engine, callback) { for (var i = 0; i * METADATA_BLOCK_SIZE < size; i++) { if (metadataPieces[i]) continue - wire.extended(channel, {msg_type: 0, piece: i}) + wire.extended(channel, { msg_type: 0, piece: i }) } }) if (!wire.peerExtensions.extended) return - wire.extended(0, metadata ? {m: {ut_metadata: 1}, metadata_size: metadata.length} : {m: {ut_metadata: 1}}) + wire.extended(0, metadata ? { m: { ut_metadata: 1 }, metadata_size: metadata.length } : { m: { ut_metadata: 1 } }) } } diff --git a/package.json b/package.json index 5866ffa..ec52b57 100644 --- a/package.json +++ b/package.json @@ -4,29 +4,29 @@ "description": "Low level streaming torrent client that exposes files as node.js streams and downloads pieces based on demand", "repository": "git://github.com/mafintosh/torrent-stream.git", "scripts": { - "test": "standard && tap test/*.js" + "test": "standard && tap --no-coverage --jobs=1 test/*.js" }, "dependencies": { - "bitfield": "^0.1.0", - "bncode": "^0.5.2", - "buffer-from": "^1.0.0", - "end-of-stream": "^0.1.4", - "fs-chunk-store": "^1.3.0", + "bitfield": "^3.0.0", + "bncode": "^0.5.3", + "buffer-from": "^1.1.1", + "end-of-stream": "^1.4.4", + "fs-chunk-store": "^2.0.1", "hat": "0.0.3", - "immediate-chunk-store": "^1.0.5", - "ip-set": "^1.0.0", - "mkdirp": "^0.3.5", - "parse-torrent": "^4.0.0", - "peer-wire-swarm": "^0.12.0", - "rimraf": "^2.2.5", - "torrent-discovery": "^5.2.0", - "torrent-piece": "^1.0.0" + "immediate-chunk-store": "^2.1.0", + "ip-set": "^1.0.2", + "mkdirp": "^0.5.1", + "parse-torrent": "^7.0.1", + "peer-wire-swarm": "^0.12.2", + "rimraf": "^3.0.0", + "torrent-discovery": "^9.2.1", + "torrent-piece": "^2.0.0" }, "devDependencies": { - "buffer-alloc": "^1.1.0", - "bittorrent-tracker": "^7.7.0", - "fs-extra": "^0.26.4", - "standard": "^5.1.0", - "tap": "^0.4.8" + "buffer-alloc": "^1.2.0", + "bittorrent-tracker": "^9.14.4", + "fs-extra": "^8.1.0", + "standard": "^14.3.1", + "tap": "^14.10.5" } } diff --git a/test/auto-block.js b/test/auto-block.js index 6617847..7908796 100644 --- a/test/auto-block.js +++ b/test/auto-block.js @@ -21,7 +21,7 @@ test('fixture can verify the torrent', function (t) { t.plan(2) fixture.on('ready', function () { t.ok(true, 'seed should be ready') - t.deepEqual(fixture.bitfield.buffer.toString('hex'), 'c0', 'should verify all the pieces') + t.deepEqual(fixture.bitfield.buffer[0], 192, 'should verify all the pieces') }) }) diff --git a/test/meta-exchange.js b/test/meta-exchange.js index 01721da..62e3720 100644 --- a/test/meta-exchange.js +++ b/test/meta-exchange.js @@ -25,7 +25,7 @@ test('fixture should be ready', function (t) { }) test('torrent should be emitted', function (t) { - t.plan(1) + t.plan(2) var e = engine() e.on('torrent', function (torrent) { t.equal(torrent.infoHash, 'ef330b39f4801d25b4245212e75a38634bfc856e') diff --git a/test/storage.js b/test/storage.js index a2d7137..2115e45 100644 --- a/test/storage.js +++ b/test/storage.js @@ -15,7 +15,7 @@ test('fixture can verify the torrent', function (t) { t.plan(2) fixture.once('ready', function () { t.ok(true, 'should be ready') - t.deepEqual(fixture.bitfield.buffer.toString('hex'), 'c0', 'should verify all the pieces') + t.deepEqual(fixture.bitfield.buffer[0], 192, 'should verify all the pieces') }) }) @@ -31,7 +31,7 @@ test('fixture can read the file contents', function (t) { }) }) t.test('can read from stream with offset', function (t) { - var stream = file.createReadStream({start: 36109}) + var stream = file.createReadStream({ start: 36109 }) stream.setEncoding('ascii') t.plan(1) stream.once('readable', function () { @@ -55,15 +55,15 @@ test('fixture can read the file contents', function (t) { }) t.test('can read from storage with offset', function (t) { t.plan(6) - fixture.store.get(0, {length: 11}, function (_, buffer) { + fixture.store.get(0, { length: 11 }, function (_, buffer) { t.equal(buffer.length, 11) t.equal(buffer.toString('ascii'), 'Lorem ipsum') }) - fixture.store.get(0, {offset: 588, length: 10}, function (_, buffer) { + fixture.store.get(0, { offset: 588, length: 10 }, function (_, buffer) { t.equal(buffer.length, 10) t.equal(buffer.toString('ascii'), 'Vestibulum') }) - fixture.store.get(1, {offset: 3341}, function (_, buffer) { + fixture.store.get(1, { offset: 3341 }, function (_, buffer) { t.equal(buffer.length, 6) t.equal(buffer.toString('ascii'), 'amet. ') })