Skip to content

Commit

Permalink
Merge 83593e2 into ac7af28
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed Jun 29, 2016
2 parents ac7af28 + 83593e2 commit f1b1b39
Show file tree
Hide file tree
Showing 45 changed files with 1,291 additions and 564 deletions.
8 changes: 8 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"extends": [
"standard"
],
"env": {
"node": true
}
}
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ before_install:
before_script:
- npm prune
- npm run lint
- npm run coverage
after_success:
- npm run semantic-release
branches:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/)
[![semantic-release](https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg)](https://github.com/semantic-release/semantic-release)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg)](http://standardjs.com/)
[![Coverage Status](https://coveralls.io/repos/github/tusharmath/Multi-threaded-downloader/badge.svg)](https://coveralls.io/github/tusharmath/Multi-threaded-downloader)

This is a nodejs based module that helps you in performing **resumable**, **multi-threaded** downloads via Http. The module is highly inspired by Speedbit's — [Download Accelerator Plus](http://www.speedbit.com/dap/).

Expand Down
14 changes: 8 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,38 @@
"build": "babel src --out-dir .dist",
"test": "ava",
"lint": "standard --verbose | snazzy",
"coverage": "nyc npm test && nyc report --reporter=text-lcov | coveralls",
"semantic-release": "semantic-release pre && npm publish && semantic-release post"
},
"engines": {
"node": ">= 5.0.0"
},
"main": ".dist/index.js",
"main": ".dist/Main.js",
"dependencies": {
"graceful-fs": "^4.1.3",
"humanize-plus": "^1.8.1",
"immutable": "^3.7.5",
"lodash": "^4.0.0",
"meow": "^3.7.0",
"muxer": "^1.0.1",
"progress": "^1.1.8",
"reactive-storage": "^3.0.0",
"ramda": "^0.21.0",
"request": "^2.60.0",
"rx": "^4.0.7",
"valid-url": "^1.0.9"
},
"license": "MIT",
"devDependencies": {
"eslint": "^2.11.1",
"ava": "^0.15.0",
"babel-cli": "^6.9.0",
"babel-plugin-transform-es2015-modules-commonjs": "^6.8.0",
"babel-register": "^6.9.0",
"coveralls": "^2.11.9",
"cz-conventional-changelog": "^1.1.5",
"eslint": "^2.11.1",
"express": "^4.13.1",
"ghooks": "^1.0.3",
"nyc": "^6.6.1",
"semantic-release": "^4.3.5",
"sinon": "^1.17.2",
"sinon": "^1.17.4",
"snazzy": "^4.0.0",
"standard": "^7.0.1",
"validate-commit-msg": "^2.0.0"
Expand Down
File renamed without changes.
41 changes: 41 additions & 0 deletions src/CreateMTDFile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Created by tushar.mathur on 29/06/16.
*/

'use strict'
import {mux} from 'muxer'
import {Observable as O} from 'rx'
import {
RemoteFileSize$,
CreateMeta$,
CreateWriteBufferAtParams,
JSToBuffer$
} from './Utils'

export const CreateMTDFile = ({FILE, HTTP, options}) => {
/**
* Create a new file
*/
const fd$ = FILE.open(O.just([options.mtdPath, 'w']))

/**
* Retrieve file size on remote server
*/
const size$ = RemoteFileSize$({HTTP, options})

/**
* Create initial meta data
*/
const meta$ = CreateMeta$({options, size$})

/**
* Create a new file with meta info appended at the end
*/
const written$ = FILE.write(CreateWriteBufferAtParams({
FILE,
fd$: fd$,
buffer$: JSToBuffer$(meta$),
position$: size$
}))
return mux({written$, meta$, remoteFileSize$: size$, fdW$: fd$})
}
74 changes: 74 additions & 0 deletions src/DownloadFromMTDFile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Created by tushar.mathur on 29/06/16.
*/

'use strict'

import {mux} from 'muxer'
import {Observable as O} from 'rx'
import {
CreateWriteBufferAtParams,
JSToBuffer$,
LocalFileSize$,
MetaPosition$,
ReadJSON$,
demuxFPH,
RequestWithMeta,
WriteBuffer,
SetMetaOffsets,
RxThrottleComplete
} from './Utils'

export const DownloadFromMTDFile = ({FILE, HTTP, mtdPath}) => {
/**
* Open file to read+append
*/
const fd$ = FILE.open(O.just([mtdPath, 'r+']))

/**
* Retrieve File size on disk
*/
const size$ = LocalFileSize$({FILE, fd$})

/**
* Retrieve Meta info
*/
const metaPosition$ = MetaPosition$({size$})
const meta$ = ReadJSON$({FILE, fd$, position$: metaPosition$})

/**
* Make a HTTP request for each thread
*/
const {response$, buffer$} = demuxFPH(
['buffer$', 'response$'], RequestWithMeta(HTTP, meta$)
)

/**
* Create write params and save buffer+offset to disk
*/
const bufferWritten$ = WriteBuffer({FILE, fd$, buffer$})

/**
* Update META info
*/
const nMeta$ = SetMetaOffsets({meta$, bufferWritten$})

/**
* Persist META to disk
*/
const metaWritten$ = FILE.write(CreateWriteBufferAtParams({
fd$,
buffer$: JSToBuffer$(RxThrottleComplete(meta$.pluck('metaWrite'), nMeta$)),
position$: size$
}))

/**
* Create sink$
*/
return mux({
metaWritten$, response$,
localFileSize$: size$,
fdR$: fd$, metaPosition$,
meta$: O.merge(nMeta$, meta$)
})
}
40 changes: 40 additions & 0 deletions src/IO.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict'

import {Observable as O} from 'rx'
import * as Rx from './RxFP'
import {demux} from 'muxer'
import R from 'ramda'
import {Request} from './Request'

export const fromCB = R.compose(R.apply, O.fromNodeCallback)
export const toOB = cb => R.compose(
Rx.shareReplay(1),
Rx.flatMap(fromCB(cb))
)
export const FILE = R.curry((fs) => {
return {
// New Methods
open: toOB(fs.open),
fstat: toOB(fs.fstat),
read: toOB(fs.read),
write: toOB(fs.write),
close: toOB(fs.close),
truncate: toOB(fs.truncate),
rename: toOB(fs.rename)
}
})

export const HTTP = R.curry((_request) => {
const request = Request(_request)
const requestHead = (params) => {
const [{response$}] = demux(request(params), 'response$')
return response$.first().tap(x => x.destroy()).share()
}

const select = R.curry((event, request$) => request$.filter(x => x.event === event).pluck('message'))
return {
requestHead,
select,
request
}
})
58 changes: 58 additions & 0 deletions src/Main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Created by tusharmathur on 5/15/15.
*/
'use strict'
import request from 'request'
import fs from 'graceful-fs'
import {Observable as O} from 'rx'
import R from 'ramda'
import * as U from './Utils'
import {CreateMTDFile} from './CreateMTDFile'
import {DownloadFromMTDFile} from './DownloadFromMTDFile'
import * as T from './IO'
import {mux, demux} from 'muxer'

export const UTILS = U
export const createDownload = (_options) => {
const HTTP = T.HTTP(request)
const FILE = T.FILE(fs)
const options = U.MergeDefaultOptions(_options)

/**
* Create MTD File
*/
const createMTDFile$ = CreateMTDFile({FILE, HTTP, options}).share()
const [{fdW$}] = demux(createMTDFile$, 'fdW$')

/**
* Download From MTD File
*/
const downloadFromMTDFile$ = createMTDFile$.last()
.map({HTTP, FILE, mtdPath: options.mtdPath})
.flatMap(DownloadFromMTDFile)
.share()

const [{fdR$, meta$, response$}] = demux(downloadFromMTDFile$, 'meta$', 'fdR$', 'response$')

/**
* Finalize Downloaded FILE
*/
const finalizeDownload$ = downloadFromMTDFile$.last()
.withLatestFrom(fdR$, meta$, (_, fd, meta) => ({
FILE,
fd$: O.just(fd),
meta$: O.just(meta)
}))
.flatMap(U.FinalizeDownload)
.share()
.last()

/**
* Close File Descriptors
*/
const fd$ = finalizeDownload$.withLatestFrom(fdW$, fdR$)
.map(R.tail)
.flatMap(R.map(R.of))
const closed$ = FILE.close(fd$)
return [mux({response$, meta$, closed$}), {FILE, HTTP, UTILS}]
}
13 changes: 0 additions & 13 deletions src/NewDownload.js

This file was deleted.

28 changes: 28 additions & 0 deletions src/Request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Created by tushar.mathur on 18/06/16.
*/

'use strict'

import {Observable as O} from 'rx'
import {mux} from 'muxer'
import R from 'ramda'

export const ev = R.curry(($, event) => $.filter(R.whereEq({event})).pluck('message'))

export const RequestParams = R.curry((request, params) => {
return O.create((observer) => request(params)
.on('data', (message) => observer.onNext({event: 'data', message}))
.on('response', (message) => observer.onNext({event: 'response', message}))
.on('complete', () => observer.onCompleted())
.on('error', (error) => observer.onError(error))
)
})

export const Request = R.curry((request, params) => {
const Response$ = ev(RequestParams(request, params))
return mux({
response$: Response$('response'),
data$: Response$('data')
})
})
15 changes: 0 additions & 15 deletions src/ResumeDownload.js

This file was deleted.

23 changes: 23 additions & 0 deletions src/RxFP.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Created by tushar.mathur on 10/06/16.
*/

'use strict'

import R from 'ramda'
import {Observable as O} from 'rx'
export const map = R.curry((func, $) => $.map(func))
export const flatMap = R.curry((func, $) => $.flatMap(func))
export const withLatestFrom = R.curry((list, $) => $.withLatestFrom(...list))
export const zip = R.curry((list, $) => $.zip(...list))
export const zipWith = R.curry((func, list, $) => $.zip(...list, func))
export const filter = R.curry((func, $) => $.filter(func))
export const distinctUntilChanged = $ => $.distinctUntilChanged()
export const pluck = R.curry((path, $) => $.pluck(path))
export const scan = R.curry((func, $) => $.scan(func))
export const scanWith = R.curry((func, m, $) => $.scan(func, m))
export const shareReplay = R.curry((count, $) => $.shareReplay(count))
export const repeat = R.curry((value, count) => O.repeat(value, count))
export const trace = R.curry((msg, $) => $.tap(x => console.log(msg, x)))
export const tap = R.curry((func, $) => $.tap(func))
export const share = ($) => $.share()

0 comments on commit f1b1b39

Please sign in to comment.