Skip to content

Commit

Permalink
Merge 1241d7e into fa43e3b
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed Mar 13, 2016
2 parents fa43e3b + 1241d7e commit 8dcce62
Show file tree
Hide file tree
Showing 30 changed files with 100 additions and 85 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -11,7 +11,9 @@ before_install:
- npm i -g npm@^2.0.0
before_script:
- npm prune
- npm run lint
after_success:
- nyc npm test && nyc report --reporter=text-lcov | coveralls
- npm run semantic-release
branches:
except:
Expand Down
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -4,6 +4,7 @@
[![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/)
[![semantic-release](https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg)](https://github.com/semantic-release/semantic-release)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg)](http://standardjs.com/)
[![Coverage Status](https://coveralls.io/repos/github/tusharmath/Multi-threaded-downloader/badge.svg?branch=master)](https://coveralls.io/github/tusharmath/Multi-threaded-downloader?branch=master)

This is a nodejs based module that helps you in performing **resumable**, **multi-threaded** downloads via Http. The module is highly inspired by Speedbit's — [Download Accelerator Plus](http://www.speedbit.com/dap/).

Expand Down Expand Up @@ -58,7 +59,7 @@ All the internal events are exposed as an observable via the `stats` property.

```javascript
const downloader = createDownload({path, url})
downloader.stats.subscribe(x => console.log(x))
downloader.stats.subscribe((x) => console.log(x))
downloader.start()

/*
Expand Down
16 changes: 8 additions & 8 deletions bin/mtd
Expand Up @@ -20,7 +20,7 @@ Usage
mtd --file big_buck_bunny_720p_1mb.mp4.mtd
`).flags

if (!_.some([flags.url, flags.file], x => x)) {
if (!_.some([flags.url, flags.file], (x) => x)) {
process.exit(0)
}

Expand All @@ -34,23 +34,23 @@ const downloads = Rx.Observable.merge(

const progress = downloads
.pluck('message', 'totalBytes')
.filter(x => x > 0)
.filter((x) => x > 0)
.first()
.map(total => new ProgressBar(':bar :percent', {total, complete: '█', incomplete: '░'}))
.tap(x => console.log(`SIZE: ${x.total} bytes`)).share()
.tap((x) => console.log(`SIZE: ${x.total} bytes`)).share()

downloads
.filter(x => x)
.filter(x => x.event === 'DATA')
.filter((x) => x)
.filter((x) => x.event === 'DATA')
.pluck('message')

.map(x => _.sum(_.map(x.offsets, (o, i) => o - x.threads[i][0])) / x.totalBytes)
.map((x) => _.sum(_.map(x.offsets, (o, i) => o - x.threads[i][0])) / x.totalBytes)
.withLatestFrom(progress, (bytes, progress) => ({bytes, progress}))
.subscribe(x => x.progress.update(x.bytes))
.subscribe((x) => x.progress.update(x.bytes))

downloads.last()
.withLatestFrom(progress, (a, b) => b)
.subscribe(x => {
.subscribe((x) => {
x.update(x.total)
console.log('Download Completed!')
})
10 changes: 10 additions & 0 deletions package.json
Expand Up @@ -14,6 +14,7 @@
},
"scripts": {
"test": "./node_modules/ava/cli.js",
"lint": "standard --verbose | snazzy",
"semantic-release": "semantic-release pre && npm publish && semantic-release post"
},
"engines": {
Expand All @@ -33,11 +34,15 @@
"license": "MIT",
"devDependencies": {
"ava": "^0.13.0",
"coveralls": "^2.11.8",
"cz-conventional-changelog": "^1.1.5",
"express": "^4.13.1",
"ghooks": "^1.0.3",
"nyc": "^6.0.0",
"semantic-release": "^4.3.5",
"sinon": "^1.17.2",
"snazzy": "^3.0.0",
"standard": "^6.0.8",
"validate-commit-msg": "^2.0.0"
},
"config": {
Expand All @@ -63,5 +68,10 @@
},
"publishConfig": {
"tag": "next"
},
"standard": {
"ignore": [
"test/integration/**"
]
}
}
17 changes: 9 additions & 8 deletions perf/server.js
@@ -1,25 +1,26 @@
'use strict'
var path = require('path')
var express = require('express')
var app = express()
var http = require('http')
var https = require('https')
var fs = require('fs')
var options = {
key: fs.readFileSync(__dirname + '/key.pem'),
cert: fs.readFileSync(__dirname + '/key-cert.pem')
key: fs.readFileSync(path.join(__dirname, '/key.pem')),
cert: fs.readFileSync(path.join(__dirname, '/key-cert.pem'))
}
var httpServer = http.createServer(app)
var httpsServer = https.createServer(options, app)
app.use('/files', express.static(path.join(__dirname, '/files')))

app.use('/files', express.static(__dirname + '/files'))

const startServer = (app, port) => new Promise(i => {
const onClose = () => new Promise(i => server.close(i))
const startServer = (app, port) => new Promise((i) => { // eslint-disable-line
const onClose = () => new Promise((i) => server.close(i)) // eslint-disable-line
const onStart = () => i(onClose)
const server = app.listen(port, onStart)
})

exports.server = port => Promise.all([
exports.server = (port) => Promise.all([
startServer(httpServer, port),
startServer(httpsServer, port + 1)
]).then(x => () => Promise.all(x.map(x => x())))
]).then((x) => () => Promise.all(x.map((x) => x())))

11 changes: 5 additions & 6 deletions perf/utils.js
Expand Up @@ -3,26 +3,25 @@
*/

'use strict'
const Download = require('../index').Download
const crypto = require('crypto')
const fs = require('fs')
const Rx = require('rx')

exports.removeFile = (x) => Rx.Observable.fromCallback(fs.unlink)(x).toPromise()

const createFileDigest = exports.createFileDigest = path => {
exports.createFileDigest = (path) => {
const hash = crypto.createHash('sha1')
return new Promise(resolve => fs
return new Promise((resolve) => fs
.createReadStream(path)
.on('data', x => hash.update(x))
.on('data', (x) => hash.update(x))
.on('end', () => resolve(hash.digest('hex').toUpperCase()))
)
}

exports.fsStat = (x) => Rx.Observable.fromCallback(fs.stat)(x).toPromise()

exports.createTestObserver = stream => {
exports.createTestObserver = (stream) => {
const out = []
stream.subscribe(x => out.push(x))
stream.subscribe((x) => out.push(x))
return out
}
4 changes: 2 additions & 2 deletions src/bufferOffset.js
Expand Up @@ -9,6 +9,6 @@ module.exports = (buffer, offset) => {
offset = 0
}
return buffer
.map(buffer => ({buffer, offset}))
.tap(x => offset += x.buffer.length)
.map((buffer) => ({buffer, offset}))
.tap((x) => (offset += x.buffer.length))
}
2 changes: 1 addition & 1 deletion src/bufferSave.js
Expand Up @@ -8,5 +8,5 @@ const _ = require('lodash')
module.exports = function (ob, fileDescriptor, content) {
return content
.combineLatest(fileDescriptor, (content, fd) => _.assign(content, {fd}))
.flatMap(x => ob.fsWriteBuffer(x).map(x))
.flatMap((x) => ob.fsWriteBuffer(x).map(x))
}
10 changes: 5 additions & 5 deletions src/contentLoad.js
Expand Up @@ -8,16 +8,16 @@ const rangeHeader = require('./rangeHeader')
const _ = require('lodash')

module.exports = (ob, metaStream) => metaStream
.flatMap(meta => meta.threads.map((range, index) => ({range, meta, index})))
.map(x => {
.flatMap((meta) => meta.threads.map((range, index) => ({range, meta, index})))
.map((x) => {
const offset = x.meta.offsets[x.index]
const range = rangeHeader([offset, x.range[1]])
const params = _.omit(x.meta, 'threads', 'offsets')
params.headers = _.assign({}, params.headers, range)
return {params, range: x.range, index: x.index, offset}
})
.flatMap(x => bufferOffset(ob.requestBody(x.params)
.filter(x => x.event === 'data')
.flatMap((x) => bufferOffset(ob.requestBody(x.params)
.filter((x) => x.event === 'data')
.pluck('message'), x.offset)
.map(i => _.assign({}, i, {range: x.range, index: x.index}))
.map((i) => _.assign({}, i, {range: x.range, index: x.index}))
)
4 changes: 2 additions & 2 deletions src/downloadMTD.js
Expand Up @@ -15,10 +15,10 @@ const Immutable = require('immutable')
module.exports = (ob, fd) => {
const offsets = create(Immutable.List([]))
const loadedMETA = metaLoad(fd)
.tap(x => offsets.set(i => i.merge(x.offsets)))
.tap((x) => offsets.set((i) => i.merge(x.offsets)))
const loadedContent = contentLoad(ob, loadedMETA)
const savedContent = bufferSave(ob, fd, loadedContent)
.tap(x => offsets.set(i => i.set(x.index, x.offset)))
.tap((x) => offsets.set((i) => i.set(x.index, x.offset)))
const currentMETA = metaUpdate(loadedMETA, savedContent, offsets.stream)
return metaSave(ob, fd, currentMETA)
}
6 changes: 3 additions & 3 deletions src/initMeta.js
Expand Up @@ -7,8 +7,8 @@ const PROPS = [
]
module.exports = (ob, options) => ob
.requestContentLength(options)
.map(x => {
.map((x) => {
const threads = splitRange(x, options.range)
return _.assign({}, options, {totalBytes: x, threads, offsets: threads.map(x => x[0])})
return _.assign({}, options, {totalBytes: x, threads, offsets: threads.map((x) => x[0])})
})
.map(x => _.pick(x, PROPS))
.map((x) => _.pick(x, PROPS))
2 changes: 1 addition & 1 deletion src/initParams.js
@@ -1,7 +1,7 @@
const _ = require('lodash')
const defaultOptions = {range: 3}

module.exports = options => _.defaults(
module.exports = (options) => _.defaults(
options,
defaultOptions,
{mtdPath: options.path + '.mtd'}
Expand Down
4 changes: 2 additions & 2 deletions src/metaLoad.js
Expand Up @@ -7,8 +7,8 @@ const Rx = require('rx')
const ob = require('./observables')
const u = require('./utils')

module.exports = fileDescriptor => {
const contentLength = fileDescriptor.flatMap(x => ob.fsStat(x)).pluck('size').map(x => x - 512)
module.exports = (fileDescriptor) => {
const contentLength = fileDescriptor.flatMap((x) => ob.fsStat(x)).pluck('size').map((x) => x - 512)
return Rx.Observable.combineLatest(
contentLength,
fileDescriptor,
Expand Down
4 changes: 2 additions & 2 deletions src/metaSave.js
Expand Up @@ -16,7 +16,7 @@ const u = require('./utils')
*/
module.exports = (ob, fileDescriptor, metaJSON) => metaJSON
.combineLatest(fileDescriptor, u.selectAs('json', 'fd'))
.map(x => _.assign(x, {offset: x.json.totalBytes}))
.map((x) => _.assign(x, {offset: x.json.totalBytes}))
.flatMap(ob.fsWriteJSON)
.map(x => JSON.parse(x[1].toString()))
.map((x) => JSON.parse(x[1].toString()))

2 changes: 1 addition & 1 deletion src/metaUpdate.js
Expand Up @@ -8,5 +8,5 @@ const _ = require('lodash')
const u = require('./utils')
module.exports = (baseMeta, bytesSaved, offsets) => bytesSaved
.withLatestFrom(baseMeta, offsets, u.selectAs('content', 'meta', 'offsets'))
.map(x => _.assign({}, x.meta, {offsets: x.offsets.toJS()}))
.map((x) => _.assign({}, x.meta, {offsets: x.offsets.toJS()}))
.distinctUntilChanged()
4 changes: 2 additions & 2 deletions src/mtd.js
Expand Up @@ -37,11 +37,11 @@ class Download {
return downloadMTD(ob, fd)
.tap(this.toStat('DATA'))
.last()
.flatMap(x => ob.fsTruncate(options.mtdPath, x.totalBytes))
.flatMap((x) => ob.fsTruncate(options.mtdPath, x.totalBytes))
.tap(this.toStat('TRUNCATE'))
.flatMap(() => ob.fsRename(options.mtdPath, options.path))
.tap(this.toStat('RENAME'))
.tapOnCompleted(x => this.stats.onCompleted())
.tapOnCompleted((x) => this.stats.onCompleted())
}

stop () {}
Expand Down
4 changes: 2 additions & 2 deletions src/newDownload.js
Expand Up @@ -4,8 +4,8 @@ const _ = require('lodash')
const pathGenerator = require('../src/utils').pathGenerator

module.exports = (createDownload, pFlags) => pFlags
.filter(x => VALID_URL.isUri(x.url))
.flatMap(x => {
.filter((x) => VALID_URL.isUri(x.url))
.flatMap((x) => {
const defaultParams = {path: pathGenerator(x.url)}
_.defaults(x, defaultParams)
const source = createDownload(x)
Expand Down
22 changes: 11 additions & 11 deletions src/observables.js
Expand Up @@ -4,11 +4,11 @@ const fs = require('fs')
const _ = require('lodash')
const u = require('./utils')

const requestBody = params => Rx.Observable.create(observer => request(params)
.on('data', message => observer.onNext({event: 'data', message}))
.on('response', message => observer.onNext({event: 'response', message}))
.on('complete', message => observer.onCompleted({event: 'completed', message}))
.on('error', error => observer.onError(error))
const requestBody = (params) => Rx.Observable.create((observer) => request(params)
.on('data', (message) => observer.onNext({event: 'data', message}))
.on('response', (message) => observer.onNext({event: 'response', message}))
.on('complete', (message) => observer.onCompleted({event: 'completed', message}))
.on('error', (error) => observer.onError(error))
)

const fsOpen = Rx.Observable.fromNodeCallback(fs.open)
Expand All @@ -17,16 +17,16 @@ const fsTruncate = Rx.Observable.fromNodeCallback(fs.truncate)
const fsRename = Rx.Observable.fromNodeCallback(fs.rename)
const fsStat = Rx.Observable.fromNodeCallback(fs.fstat)
const fsRead = Rx.Observable.fromNodeCallback(fs.read)
const fsReadBuffer = x => fsRead(x.fd, x.buffer, 0, x.buffer.length, x.offset)
const fsWriteBuffer = x => fsWrite(x.fd, x.buffer, 0, x.buffer.length, x.offset)
const fsWriteJSON = x => fsWriteBuffer(_.assign({}, x, {buffer: u.toBuffer(x.json)}))
const fsReadJSON = x => fsReadBuffer(x).map(x => JSON.parse(x[1].toString()))
const fsReadBuffer = (x) => fsRead(x.fd, x.buffer, 0, x.buffer.length, x.offset)
const fsWriteBuffer = (x) => fsWrite(x.fd, x.buffer, 0, x.buffer.length, x.offset)
const fsWriteJSON = (x) => fsWriteBuffer(_.assign({}, x, {buffer: u.toBuffer(x.json)}))
const fsReadJSON = (x) => fsReadBuffer(x).map((x) => JSON.parse(x[1].toString()))
const buffer = (size) => Rx.Observable.just(u.createEmptyBuffer(size))
module.exports = {
requestBody,
requestContentLength: x => requestBody(_.assign({}, x, {method: 'HEAD'}))
requestContentLength: (x) => requestBody(_.assign({}, x, {method: 'HEAD'}))
.pluck('message', 'headers', 'content-length')
.map(x => parseInt(x, 10)),
.map((x) => parseInt(x, 10)),
fsOpen,
fsWrite,
fsWriteBuffer,
Expand Down
2 changes: 1 addition & 1 deletion src/rangeHeader.js
Expand Up @@ -4,4 +4,4 @@

'use strict'

module.exports = range => ({range: `bytes=${range[0]}-${range[1]}`})
module.exports = (range) => ({range: `bytes=${range[0]}-${range[1]}`})
4 changes: 2 additions & 2 deletions src/resumeDownload.js
Expand Up @@ -3,8 +3,8 @@ const _ = require('lodash')
const Rx = require('rx')

module.exports = (createDownload, pFlags) => pFlags
.skipWhile(x => x.url)
.flatMap(x => {
.skipWhile((x) => x.url)
.flatMap((x) => {
const defaultParams = {
mtdPath: normalizePath(x.file),
path: normalizePath(x.file.replace('.mtd', ''))
Expand Down
4 changes: 2 additions & 2 deletions src/splitRange.js
Expand Up @@ -7,8 +7,8 @@ const _ = require('lodash')

module.exports = (range, count) => {
const delta = Math.round(range / count)
const start = _.times(count, x => x * delta)
const end = _.times(count, x => (x + 1) * delta - 1)
const start = _.times(count, (x) => x * delta)
const end = _.times(count, (x) => (x + 1) * delta - 1)
end[count - 1] = range
return _.zip(start, end)
}
Expand Down
8 changes: 4 additions & 4 deletions src/utils.js
Expand Up @@ -19,7 +19,7 @@ e.selectAs = function () {
const keys = _.toArray(arguments)
return function () {
const values = _.toArray(arguments)
const merge = (m, k, i) => m[k] = values[i]
const merge = (m, k, i) => (m[k] = values[i])
return _.transform(keys, merge, {})
}
}
Expand All @@ -33,6 +33,6 @@ e.createEmptyBuffer = function (size) {
buffer.fill(' ')
return buffer
}
e.normalizePath = path => PATH.resolve(process.cwd(), path)
e.fileNameGenerator = x => _.last(URL.parse(x).pathname.split('/')) || Date.now()
e.pathGenerator = x => e.normalizePath(e.fileNameGenerator(x))
e.normalizePath = (path) => PATH.resolve(process.cwd(), path)
e.fileNameGenerator = (x) => _.last(URL.parse(x).pathname.split('/')) || Date.now()
e.pathGenerator = (x) => e.normalizePath(e.fileNameGenerator(x))

0 comments on commit 8dcce62

Please sign in to comment.