Skip to content

Commit

Permalink
Merge 5d985a3 into ac7af28
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed Jun 27, 2016
2 parents ac7af28 + 5d985a3 commit 8e66caf
Show file tree
Hide file tree
Showing 40 changed files with 1,204 additions and 451 deletions.
8 changes: 8 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"extends": [
"standard"
],
"env": {
"node": true
}
}
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ before_install:
before_script:
- npm prune
- npm run lint
- npm run coverage
after_success:
- npm run semantic-release
branches:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/)
[![semantic-release](https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg)](https://github.com/semantic-release/semantic-release)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg)](http://standardjs.com/)
[![Coverage Status](https://coveralls.io/repos/github/tusharmath/Multi-threaded-downloader/badge.svg)](https://coveralls.io/github/tusharmath/Multi-threaded-downloader)

This is a nodejs based module that helps you in performing **resumable**, **multi-threaded** downloads via Http. The module is highly inspired by Speedbit's — [Download Accelerator Plus](http://www.speedbit.com/dap/).

Expand Down
11 changes: 7 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"build": "babel src --out-dir .dist",
"test": "ava",
"lint": "standard --verbose | snazzy",
"coverage": "nyc npm test && nyc report --reporter=text-lcov | coveralls",
"semantic-release": "semantic-release pre && npm publish && semantic-release post"
},
"engines": {
Expand All @@ -26,27 +27,29 @@
"dependencies": {
"graceful-fs": "^4.1.3",
"humanize-plus": "^1.8.1",
"immutable": "^3.7.5",
"lodash": "^4.0.0",
"meow": "^3.7.0",
"muxer": "^1.0.1",
"progress": "^1.1.8",
"reactive-storage": "^3.0.0",
"ramda": "^0.21.0",
"request": "^2.60.0",
"rx": "^4.0.7",
"valid-url": "^1.0.9"
},
"license": "MIT",
"devDependencies": {
"eslint": "^2.11.1",
"ava": "^0.15.0",
"babel-cli": "^6.9.0",
"babel-plugin-transform-es2015-modules-commonjs": "^6.8.0",
"babel-register": "^6.9.0",
"coveralls": "^2.11.9",
"cz-conventional-changelog": "^1.1.5",
"eslint": "^2.11.1",
"express": "^4.13.1",
"ghooks": "^1.0.3",
"nyc": "^6.6.1",
"semantic-release": "^4.3.5",
"sinon": "^1.17.2",
"sinon": "^1.17.4",
"snazzy": "^4.0.0",
"standard": "^7.0.1",
"validate-commit-msg": "^2.0.0"
Expand Down
File renamed without changes.
28 changes: 28 additions & 0 deletions src/Request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Created by tushar.mathur on 18/06/16.
*/

'use strict'

import {Observable as O} from 'rx'
import {mux} from 'muxer'
import R from 'ramda'

export const ev = R.curry(($, event) => $.filter(R.whereEq({event})).pluck('message'))

export const RequestParams = R.curry((request, params) => {
return O.create((observer) => request(params)
.on('data', (message) => observer.onNext({event: 'data', message}))
.on('response', (message) => observer.onNext({event: 'response', message}))
.on('complete', () => observer.onCompleted())
.on('error', (error) => observer.onError(error))
)
})

export const Request = R.curry((request, params) => {
const Response$ = ev(RequestParams(request, params))
return mux({
response$: Response$('response'),
data$: Response$('data')
})
})
23 changes: 23 additions & 0 deletions src/RxFP.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Created by tushar.mathur on 10/06/16.
*/

'use strict'

import R from 'ramda'
import {Observable as O} from 'rx'
export const map = R.curry((func, $) => $.map(func))
export const flatMap = R.curry((func, $) => $.flatMap(func))
export const withLatestFrom = R.curry((list, $) => $.withLatestFrom(...list))
export const zip = R.curry((list, $) => $.zip(...list))
export const zipWith = R.curry((func, list, $) => $.zip(...list, func))
export const filter = R.curry((func, $) => $.filter(func))
export const distinctUntilChanged = $ => $.distinctUntilChanged()
export const pluck = R.curry((path, $) => $.pluck(path))
export const scan = R.curry((func, $) => $.scan(func))
export const scanWith = R.curry((func, m, $) => $.scan(func, m))
export const shareReplay = R.curry((count, $) => $.shareReplay(count))
export const repeat = R.curry((value, count) => O.repeat(value, count))
export const trace = R.curry((msg, $) => $.tap(x => console.log(msg, x)))
export const tap = R.curry((func, $) => $.tap(func))
export const share = ($) => $.share()
65 changes: 36 additions & 29 deletions src/Transformers.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,40 @@
import request from 'request'
import Rx from 'rx'
import fs from 'graceful-fs'
import _ from 'lodash'
import * as u from './Utils'
'use strict'

export const requestBody = (params) => Rx.Observable.create((observer) => request(params)
.on('data', (message) => observer.onNext({event: 'data', message}))
.on('response', (message) => observer.onNext({event: 'response', message}))
.on('complete', (message) => observer.onCompleted({event: 'completed', message}))
.on('error', (error) => observer.onError(error))
)
import {Observable as O} from 'rx'
import {demux} from 'muxer'
import R from 'ramda'
import {Request} from './Request'

export const requestHead = (params) => requestBody(params)
.first()
.pluck('message')
.tap((x) => x.destroy())
export const fromCB = R.compose(R.apply, O.fromNodeCallback)

export const requestContentLength = (params) => requestHead(params)
.pluck('headers', 'content-length')
.map((x) => parseInt(x, 10))
export const FILE = R.curry((fs) => {
return [{
// New Methods
open: signal$ => signal$.flatMap(fromCB(fs.open)).shareReplay(1),
fstat: signal$ => signal$.flatMap(fromCB(fs.fstat)).shareReplay(1),
read: signal$ => signal$.flatMap(fromCB(fs.read)).shareReplay(1),
write: signal$ => signal$.flatMap(fromCB(fs.write)).shareReplay(1),
close: signal$ => signal$.flatMap(fromCB(fs.close)).shareReplay(1),
truncate: signal$ => signal$.flatMap(fromCB(fs.truncate)).shareReplay(1),
rename: signal$ => signal$.flatMap(fromCB(fs.rename)).shareReplay(1)
}]
})

export const fsOpen = Rx.Observable.fromNodeCallback(fs.open)
export const fsWrite = Rx.Observable.fromNodeCallback(fs.write)
export const fsTruncate = Rx.Observable.fromNodeCallback(fs.truncate)
export const fsRename = Rx.Observable.fromNodeCallback(fs.rename)
export const fsStat = Rx.Observable.fromNodeCallback(fs.fstat)
export const fsRead = Rx.Observable.fromNodeCallback(fs.read)
export const fsReadBuffer = (x) => fsRead(x.fd, x.buffer, 0, x.buffer.length, x.offset)
export const fsWriteBuffer = (x) => fsWrite(x.fd, x.buffer, 0, x.buffer.length, x.offset)
export const fsWriteJSON = (x) => fsWriteBuffer(_.assign({}, x, {buffer: u.toBuffer(x.json)}))
export const fsReadJSON = (x) => fsReadBuffer(x).map((x) => JSON.parse(x[1].toString()))
export const buffer = (size) => Rx.Observable.just(u.createEmptyBuffer(size))
export const HTTP = R.curry((_request) => {
const request = Request(_request)
const requestHead = (params) => {
const [{response$}] = demux(request(params), 'response$')
return response$.first().tap(x => x.destroy()).share()
}

const select = R.curry((event, request$) => request$.filter(x => x.event === event).pluck('message'))
const executor = (signal$) => {
const [{destroy$}] = demux(signal$, 'destroy$')
destroy$.subscribe(request => request.destroy())
}
return [{
requestHead,
select,
request
}, executor]
})
Loading

0 comments on commit 8e66caf

Please sign in to comment.