Skip to content

Commit

Permalink
Merge e48989d into 4d44d6d
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed Mar 18, 2018
2 parents 4d44d6d + e48989d commit 72b194d
Show file tree
Hide file tree
Showing 32 changed files with 1,947 additions and 1,144 deletions.
4 changes: 3 additions & 1 deletion package.json
Expand Up @@ -13,6 +13,7 @@
"url": "https://github.com/tusharmath/Multi-threaded-downloader.git"
},
"scripts": {
"prettier": "git ls-files | grep .js$ | xargs prettier --print-width 80 --single-quote --no-semi --no-bracket-spacing --write",
"prepublish": "npm run build",
"build": "babel src --out-dir .dist --source-maps true",
"test": "ava",
Expand Down Expand Up @@ -41,14 +42,15 @@
"ava": "^0.17.0",
"babel-cli": "^6.18.0",
"babel-plugin-transform-es2015-modules-commonjs": "^6.8.0",
"cz-conventional-changelog": "^1.2.0",
"babel-register": "^6.18.0",
"coveralls": "^2.11.15",
"cz-conventional-changelog": "^1.2.0",
"eslint": "^3.10.2",
"express": "^4.13.1",
"ghooks": "^1.3.2",
"jsdoc-to-markdown": "^2.0.1",
"nyc": "^10.0.0",
"prettier": "^1.11.1",
"semantic-release": "^6.3.2",
"sinon": "^2.3.2",
"snazzy": "^5.0.0",
Expand Down
32 changes: 19 additions & 13 deletions perf/TestHelpers.js
Expand Up @@ -16,22 +16,24 @@ import {
} from '../src'
import {demux} from 'muxer'

export const removeFile = (x) => Rx.Observable.fromCallback(fs.unlink)(x).toPromise()
export const removeFile = x =>
Rx.Observable.fromCallback(fs.unlink)(x).toPromise()

export const createFileDigest = (path) => {
export const createFileDigest = path => {
const hash = crypto.createHash('sha1')
return new Promise((resolve) => fs
.createReadStream(path)
.on('data', (x) => hash.update(x))
.on('end', () => resolve(hash.digest('hex').toUpperCase()))
return new Promise(resolve =>
fs
.createReadStream(path)
.on('data', x => hash.update(x))
.on('end', () => resolve(hash.digest('hex').toUpperCase()))
)
}

export const fsStat = (x) => Rx.Observable.fromCallback(fs.stat)(x).toPromise()
export const fsStat = x => Rx.Observable.fromCallback(fs.stat)(x).toPromise()

export const createTestObserver = (stream) => {
export const createTestObserver = stream => {
const out = []
stream.subscribe((x) => out.push(x))
stream.subscribe(x => out.push(x))
return out
}

Expand All @@ -40,7 +42,7 @@ export const createTestObserver = (stream) => {
* @param options
* @returns {Observable}
*/
export const createDownload = (options) => {
export const createDownload = options => {
/**
* Create MTD File
*/
Expand All @@ -50,15 +52,19 @@ export const createDownload = (options) => {
/**
* Download From MTD File
*/
const downloadFromMTDFile$ = createMTDFile$.last()
.map(MTDPath(options.path)).flatMap(DownloadFromMTDFile).share()
const downloadFromMTDFile$ = createMTDFile$
.last()
.map(MTDPath(options.path))
.flatMap(DownloadFromMTDFile)
.share()

const [{fdR$, meta$}] = demux(downloadFromMTDFile$, 'meta$', 'fdR$')

/**
* Finalize Downloaded FILE
*/
const finalizeDownload$ = downloadFromMTDFile$.last()
const finalizeDownload$ = downloadFromMTDFile$
.last()
.withLatestFrom(fdR$, meta$, (_, fd, meta) => ({
fd$: O.just(fd),
meta$: O.just(meta)
Expand Down
26 changes: 15 additions & 11 deletions perf/server.js
Expand Up @@ -13,16 +13,20 @@ var httpServer = http.createServer(app)
var httpsServer = https.createServer(options, app)
app.use('/files', express.static(path.join(__dirname, '/files')))
app.head('/fixed-size', (req, res) => res.status(403).send())
app.get('/fixed-size', (req, res) => res.send('a quick brown fox jumps over the lazy dog'))
app.get('/fixed-size', (req, res) =>
res.send('a quick brown fox jumps over the lazy dog')
)

const startServer = (app, port) => new Promise((i) => { // eslint-disable-line
const onClose = () => new Promise((i) => server.close(i)) // eslint-disable-line
const onStart = () => i(onClose)
const server = app.listen(port, onStart)
})

exports.server = (port) => Promise.all([
startServer(httpServer, port),
startServer(httpsServer, port + 1)
]).then((x) => () => Promise.all(x.map((x) => x())))
const startServer = (app, port) =>
new Promise(resolve => {
// eslint-disable-line
const onClose = () => new Promise(i => server.close(i)) // eslint-disable-line
const onStart = () => resolve(onClose)
const server = app.listen(port, onStart)
})

exports.server = port =>
Promise.all([
startServer(httpServer, port),
startServer(httpsServer, port + 1)
]).then(x => () => Promise.all(x.map(x => x())))
14 changes: 8 additions & 6 deletions src/CreateMTDFile.js
Expand Up @@ -49,11 +49,13 @@ export const CreateMTDFile = R.curry(({FILE, HTTP}, options) => {
/**
* Create a new file with meta info appended at the end
*/
const written$ = FILE.write(CreateWriteBufferAtParams({
FILE,
fd$: fd$,
buffer$: JSToBuffer$(meta$),
position$: size$
}))
const written$ = FILE.write(
CreateWriteBufferAtParams({
FILE,
fd$: fd$,
buffer$: JSToBuffer$(meta$),
position$: size$
})
)
return mux({written$, meta$, remoteFileSize$: size$, fdW$: fd$})
})
117 changes: 63 additions & 54 deletions src/DownloadFromMTDFile.js
Expand Up @@ -38,65 +38,74 @@ import {
* - `fdR$` - File Descriptor in `r+` mode.
* - `meta$` - Download meta information.
*/
export const DownloadFromMTDFile = R.curryN(2, ({FILE, HTTP}, mtdPath, _meta) => {
/**
* Open file to read+append
*/
const fd$ = FILE.open(O.just([mtdPath, 'r+']))
export const DownloadFromMTDFile = R.curryN(
2,
({FILE, HTTP}, mtdPath, _meta) => {
/**
* Open file to read+append
*/
const fd$ = FILE.open(O.just([mtdPath, 'r+']))

/**
* Retrieve File size on disk
*/
const size$ = LocalFileSize$({FILE, fd$})
/**
* Retrieve File size on disk
*/
const size$ = LocalFileSize$({FILE, fd$})

/**
* Retrieve Meta info
*/
const metaPosition$ = MetaPosition$({size$})
const meta$ = ReadJSON$({FILE, fd$, position$: metaPosition$})
.map(meta => R.merge(meta, _meta))
/**
* Retrieve Meta info
*/
const metaPosition$ = MetaPosition$({size$})
const meta$ = ReadJSON$({FILE, fd$, position$: metaPosition$}).map(meta =>
R.merge(meta, _meta)
)

/**
* Make a HTTP request for each thread
*/
const {response$, buffer$} = demuxFPH(
['buffer$', 'response$'], RequestWithMeta(HTTP, meta$).share()
)
/**
* Make a HTTP request for each thread
*/
const {response$, buffer$} = demuxFPH(
['buffer$', 'response$'],
RequestWithMeta(HTTP, meta$).share()
)

/**
* Select all the responses
*/
const responses$ = RxTakeN(meta$.map(GetThreadCount), response$)
/**
* Select all the responses
*/
const responses$ = RxTakeN(meta$.map(GetThreadCount), response$)

/**
* Create write params and save buffer+offset to disk
*/
const bufferWritten$ = WriteBuffer({FILE, fd$, buffer$})
/**
* Create write params and save buffer+offset to disk
*/
const bufferWritten$ = WriteBuffer({FILE, fd$, buffer$})

/**
* Update META info
*/
const nMeta$ = SetMetaOffsets({meta$, bufferWritten$})
/**
* Update META info
*/
const nMeta$ = SetMetaOffsets({meta$, bufferWritten$})

/**
* Persist META to disk
*/
const metaWritten$ = FILE.write(CreateWriteBufferAtParams({
fd$,
buffer$: JSToBuffer$(RxThrottleComplete(meta$.pluck('metaWrite'), nMeta$)),
position$: size$
}))
/**
* Persist META to disk
*/
const metaWritten$ = FILE.write(
CreateWriteBufferAtParams({
fd$,
buffer$: JSToBuffer$(
RxThrottleComplete(meta$.pluck('metaWrite'), nMeta$)
),
position$: size$
})
)

/**
* Create sink$
*/
return mux({
metaWritten$,
response$,
responses$,
localFileSize$: size$,
fdR$: fd$,
metaPosition$,
meta$: O.merge(nMeta$, meta$)
})
})
/**
* Create sink$
*/
return mux({
metaWritten$,
response$,
responses$,
localFileSize$: size$,
fdR$: fd$,
metaPosition$,
meta$: O.merge(nMeta$, meta$)
})
}
)
22 changes: 12 additions & 10 deletions src/IO.js
Expand Up @@ -7,10 +7,7 @@ import R from 'ramda'
import {Request} from './Request'

export const fromCB = R.compose(R.apply, O.fromNodeCallback)
export const toOB = cb => R.compose(
Rx.shareReplay(1),
Rx.flatMap(fromCB(cb))
)
export const toOB = cb => R.compose(Rx.shareReplay(1), Rx.flatMap(fromCB(cb)))

/**
* Provides wrappers over the async utils inside the
Expand All @@ -19,7 +16,7 @@ export const toOB = cb => R.compose(
* and returns the result of function call as another stream.
* @namespace FILE
*/
export const FILE = R.curry((fs) => {
export const FILE = R.curry(fs => {
return {
/**
* @function
Expand Down Expand Up @@ -88,14 +85,19 @@ export const FILE = R.curry((fs) => {
/**
* @namespace HTTP
*/
export const HTTP = R.curry((_request) => {
export const HTTP = R.curry(_request => {
const request = Request(_request)
const requestHead = (params) => {
const requestHead = params => {
const [{response$}] = demux(request(params), 'response$')
return response$.first().tap(x => x.destroy()).share()
return response$
.first()
.tap(x => x.destroy())
.share()
}

const select = R.curry((event, request$) => request$.filter(x => x.event === event).pluck('message'))
const select = R.curry((event, request$) =>
request$.filter(x => x.event === event).pluck('message')
)
return {
requestHead,
select,
Expand All @@ -110,7 +112,7 @@ export const HTTP = R.curry((_request) => {
}
})

export const BAR = R.curry((ProgressBar) => {
export const BAR = R.curry(ProgressBar => {
const bar = new ProgressBar(':bar :percent ', {
total: 1000,
complete: '█',
Expand Down
15 changes: 9 additions & 6 deletions src/Request.js
Expand Up @@ -8,14 +8,17 @@ import {Observable as O} from 'rx'
import {mux} from 'muxer'
import R from 'ramda'

export const ev = R.curry(($, event) => $.filter(R.whereEq({event})).pluck('message'))
export const ev = R.curry(($, event) =>
$.filter(R.whereEq({event})).pluck('message')
)

export const RequestParams = R.curry((request, params) => {
return O.create((observer) => request(params)
.on('data', (message) => observer.onNext({event: 'data', message}))
.on('response', (message) => observer.onNext({event: 'response', message}))
.on('complete', () => observer.onCompleted())
.on('error', (error) => observer.onError(error))
return O.create(observer =>
request(params)
.on('data', message => observer.onNext({event: 'data', message}))
.on('response', message => observer.onNext({event: 'response', message}))
.on('complete', () => observer.onCompleted())
.on('error', error => observer.onError(error))
)
})

Expand Down
6 changes: 4 additions & 2 deletions src/RxFP.js
Expand Up @@ -20,8 +20,10 @@ export const shareReplay = R.curry((count, $) => $.shareReplay(count))
export const repeat = R.curry((value, count) => O.repeat(value, count))
export const trace = R.curry((msg, $) => $.tap(x => console.log(msg, x)))
export const tap = R.curry((func, $) => $.tap(func))
export const share = ($) => $.share()
export const share = $ => $.share()
export const partition = R.curry((func, $) => $.partition(func))
export const first = $ => $.first()
export const subscribe = R.curry((observer, $) => $.subscribe(observer))
export const sample = R.curry((a$$, b$) => b$.withLatestFrom(...a$$).map(R.tail))
export const sample = R.curry((a$$, b$) =>
b$.withLatestFrom(...a$$).map(R.tail)
)

0 comments on commit 72b194d

Please sign in to comment.